es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Integración de Spring: patrón de dispersión y recolección distribuida paralela

Necesito implementar la siguiente arquitectura:

Tengo un gran Pedido que debe dividirse en pedidos más pequeños (paralelos) y enviarlos a un punto final asíncrono de REST descendente.

El API de ordenación descendente publica un mensaje en una cola de respuesta (kafka/rabbitmq) después de completar el pedido (fallido o exitoso) con identificadores de correlación.

Necesito tener un escucha agregado para recopilar todas las respuestas y enviar el resultado final al llamante.

Estoy pensando en utilizar el patrón Scatter gather de Spring Integration y otras características útiles de Spring.

¿Puedes ayudarme a mostrar un ejemplo de cómo se puede implementar esta arquitectura con la ayuda de Spring Integration?

Tags:  ,

Answer

  1. Avatar for davy.ai

    Claro, puedo proporcionar un ejemplo de cómo se puede implementar esta arquitectura utilizando Spring-integration. Aquí tienes un breve resumen del enfoque:

    1. Utiliza Spring Batch para dividir la gran orden en órdenes más pequeñas en paralelo. Puedes usar las características de procesamiento paralelo de Spring Batch para lograr esto.
    2. Utiliza el gateway de salida HTTP de Spring Integration para enviar las órdenes más pequeñas al punto final asincrónico downstream. Es importante configurar la bandera asincrónica en true en la configuración del gateway de salida HTTP para asegurarse de que las solicitudes se envíen de forma asíncrona.

    3. La API de ordenación downstream debe publicar un mensaje en una cola de respuesta (Kafka o RabbitMQ) después de completar la orden. El mensaje debe incluir el ID de correlación que se generó al enviar la solicitud al punto final downstream.

    4. Utiliza el adaptador de canal de entrada Kafka o RabbitMQ de Spring Integration para recibir los mensajes de respuesta de la cola de respuestas.

    5. Utiliza el patrón Scatter-Gather de Spring Integration para agrupar las respuestas de la cola de respuestas. El patrón Scatter-Gather te permite enviar un solo mensaje a múltiples puntos finales y recopilar las respuestas en un solo mensaje de salida.

    6. Utiliza el Aggregator de Spring Integration para recopilar todas las respuestas y enviar la salida final al solicitante.

    Aquí tienes una configuración de muestra de Spring Integration que implementa el enfoque anterior:

    <!-- Define el gateway de salida HTTP para enviar las órdenes más pequeñas al punto final downstream -->
    <int-http:outbound-gateway id="httpOutboundGateway" request-channel="orderRequestChannel" reply-channel="orderResponseChannel" url="http://downstream-api.com/orders" http-method="POST" async="true" expected-response-type="java.lang.String">
    </int-http:outbound-gateway>
    
    <!-- Define el adaptador de canal de entrada Kafka para recibir los mensajes de respuesta -->
    <bean id="kafkaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="localhost:9092"></entry>
                        <entry key="group.id" value="replyMessageGroup"></entry>
                        <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                        <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
        <constructor-arg value="reply-topic"></constructor-arg>
    </bean>
    
    <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" auto-startup="true" channel="replyChannel" message-listener-container="kafkaMessageListenerContainer" converter="kafkaMessageConverter" auto-offset-reset="earliest" topics="reply-topic"></int-kafka:inbound-channel-adapter>
    
    <!-- Define el patrón Scatter-Gather para enviar las órdenes más pequeñas a múltiples puntos finales -->
    <int:scatter-gather id="scatterGather" input-channel="orderRequestChannel" output-channel="replyChannel" gather-channel="responseAggregatorChannel" scatterer="orderScatterer"></int:scatter-gather>
    
    <!-- Define el Aggregator para recopilar todas las respuestas y enviar la salida final al solicitante -->
    <int:aggregator input-channel="responseAggregatorChannel" output-channel="finalOutputChannel" correlation-strategy="orderCorrelationStrategy" release-strategy="orderReleaseStrategy" send-partial-result-on-expiry="true" expire-groups-upon-completion="false" group-timeout="60000"></int:aggregator>
    
    

    ¡Espero que esto te ayude!

Comments are closed.