es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Integración de Spring: cómo hacer que varios hilos consulten mensajes desde un canal controlable a través de un activador de servicio como un servicio paralelo.

En mi código, el adaptador de canal SqsMessageDrivenChannelAdapter está configurado para leer mensajes de la cola de AWS y enviarlos a un canal pollable (cola). Para el canal pollable, un activador de servicio está apuntando para leer el mensaje y procesarlo.

Mi pregunta exacta es: ¿cómo hacer que el activador de servicio funcione como multihilo para leer mensajes del canal pollable y hacer alguna tarea paralela con un tamaño de hilo especificado?

Adaptador de canal:

@Bean
public MessageProducer sqsMessageDrivenChannelAdapterForFlights() {
    log.info("**** empezando a escuchar: " + ttFlightsXMLSqsName + " **** ");
    SqsMessageDrivenChannelAdapter adapter =
        new SqsMessageDrivenChannelAdapter(amazonSqs, ttFlightsXMLSqsName);
    adapter.setOutputChannelName(MessageChannelConstants.get_tt_flights);
    adapter.setMaxNumberOfMessages(5);
    return adapter;
}

Canal pollable:

@Bean(name = MessageChannelConstants.get_tt_flights)
public PollableChannel sqsInputChannelFlights() {
    return new QueueChannel();
}

Activador de servicio:

@ServiceActivator(inputChannel = MessageChannelConstants.get_tt_flights,
      poller = @Poller(fixedRate = "5000"))
public void processFlightData(Message<?> receive) throws PacPlusException {
    .................
    long startTime = System.currentTimeMillis();
}

Pregunta final: ¿Si hago dos activadores de servicio apuntando al mismo canal pollable, funcionará perfectamente y es bueno usar este tipo de proceso paralelo de mensajes?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para hacer que el activador del servicio sea multihilo y procese mensajes en paralelo, se puede utilizar el TaskExecutor en Spring Integration.

    Primero, es necesario configurar un bean de ejecutor de tarea en el archivo de configuración. Aquí hay un ejemplo utilizando un grupo de hilos de tamaño 5:

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(25);
        return executor;
    }
    

    Entonces, se puede modificar la anotación del activador del servicio para incluir el ejecutor de tarea:

    @ServiceActivator(inputChannel = MessageChannelConstants.get_tt_flights,
        poller = @Poller(fixedRate = "5000"), 
        executor = "taskExecutor")
    public void processFlightData(Message<?> receive) throws PacPlusException {
        //Procesa el mensaje
    }
    

    Esto hará que el activador del servicio sea multihilo y procese mensajes en paralelo utilizando el tamaño del grupo de hilos especificado.

    En cuanto a la pregunta final, si es bueno o no utilizar varios activadores de servicio que apuntan al mismo canal de encuestabilidad para el procesamiento en paralelo, depende de su caso de uso y requisitos específicos. En general, es posible utilizar varios activadores de servicio para el procesamiento en paralelo, pero puede introducir complejidad y posibles condiciones de carrera si no se manejan adecuadamente. Se recomienda probar y evaluar exhaustivamente el impacto en el rendimiento antes de implementar este enfoque.

Comments are closed.