es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Controlar los valores emitidos por Flux generado para la ejecución secuencial de tareas.

Estoy escribiendo un marco de orquestación simple utilizando el framework Reactor, que ejecuta tareas en secuencia y la siguiente tarea a ejecutar depende del resultado de las tareas anteriores. Puedo tener varias opciones para elegir en función del resultado de las tareas anteriores. Anteriormente, escribí un marco similar basado en un DAG estático en el que pasé una lista de tareas como iterables y usé Flux.fromIterable(taskList). Sin embargo, esto no me da la flexibilidad de ser dinámico debido al editor de arrays estático.

Estoy buscando enfoques alternativos como do(){}while(condition) para resolver la traversía del DAG y la decisión de tareas y se me ocurrió Flux.generate(). Evalúo el siguiente paso en el método generar y paso la siguiente tarea aguas abajo. El problema al que me enfrento ahora es que Flux.generate no espera a que se complete aguas abajo, sino que empuja hasta que la condición se establezca como no válida. Y para cuando se ejecuta la tarea 1, la tarea 2 ya habría sido empujada n veces, lo cual no es el comportamiento esperado.

¿Alguien puede señalarme en la dirección correcta, por favor?

Gracias.

Primera iteración usando una lista de tareas (DAG estático)

Flux.fromIterable(taskList)
        .publishOn(this.factory.getSharedSchedulerPool())
        .concatMap(
            reactiveTask -> {
              log.info("Ejecutando tarea =>{}", reactiveTask.getTaskName());
              return reactiveTask
                  .run(ctx);
            })
        // Evalúa el estado de la tarea anterior y finaliza o continúa el flujo. 
        .takeWhile(context -> evaluateStatus(context))
        .onErrorResume(throwable -> buildResponse(ctx, throwable))
        .doOnCancel(() -> log.info("Tarea cancelada"))
        .doOnComplete(() -> log.info("Flujo completado"))
        .subscribe(); 

Intento de DAG dinámico

Flux.generate(
            (SynchronousSink<ReactiveTask<OrchestrationContext>> synchronousSink) -> {
              ReactiveTask<OrchestrationContext> task = null;
              if (ctx.getLastExecutedStep() == null) {
                // primera tarea; 
                task = getFirstTaskFromDAG();
              } else {
                task = deriveNextStep(ctx.getLastExecutedStep(), ctx.getDecisionData()); 
          }
          if (task.getName.equals("END")) {
            synchronousSink.complete();
          }
          synchronousSink.next(task);
        })
    .publishOn(this.factory.getSharedSchedulerPool())
    .doOnNext(orchestrationContextReactiveTask -> log.info("En siguiente => {}", 
      orchestrationContextReactiveTask.getTaskName()))
    .concatMap(
        reactiveTask -> {
          log.info("Ejecutando tarea =>{}", reactiveTask.getTaskName());
          return reactiveTask
              .run(ctx);                  
        })
    .onErrorResume(throwable -> buildResponse(ctx, throwable))
    .takeUntil(context -> evaluateStatus(context, tasks))
    .doOnCancel(() -> log.info("Tarea cancelada"))
    .doOnComplete(() -> log.info("Flujo completado")).subscribe(); 

El problema en el enfoque anterior es que mientras la tarea 1 se está ejecutando, el suscriptor onNext() imprime muchas veces porque generate está publicando. Quiero que el método generate espere los resultados de la tarea anterior y presente una nueva tarea. En el mundo no reactivo, esto se puede lograr a través de un simple bucle while().

Cada tarea realizará la siguiente acción:

“`
public class ResponseTask extends AbstractBaseTask {
private TaskDefinition taskDefinition;
final String taskName;

public ResponseTask(
StateManager stateManager,
ThreadFactory factory,
) {
this.taskDefinition = taskDefinition;
this.taskName = taskName;
}

public Mono<string> transform(OrchestrationContext context) {
Any masterPayload = Any.wrap(context.getIngestionPayload());
return Mono.fromCallable(() -> stateManager.doTransformation(context, masterPayload);
}

public Mono<orchestrationcontext> execute(OrchestrationContext context, String payload) {
log.info("Executing sleep for tarea=>{}", context.getLastExecutedStep());
return Mono.delay(Duration.ofSeconds(1), factory.getSharedSchedulerPool())
.then(Mono.just(context));
}

public Mono<orchestrationcontext> run(OrchestrationContext context) {
log.info("Ejecutando tarea:{}. La última ejecutada:{}", taskName, context.getLastExecutedStep());
return transform(context)
.doOnNext((result) -> log.info("Transformación completa para la tarea=?{}", taskName);)
.flatMap(payload -> {
return execute(context, payload);
}).onErrorResume(throwable -> {
context.setStatus(FAILED);
return Mono.just(context);
});
}
}
“`

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para abordar el problema de Flux.generate que envía múltiples instancias de la tarea siguiente al downstream, lo cual no es el comportamiento deseado, una alternativa que se puede explorar es usar Mono.defer para generar la tarea siguiente en el downstream según se necesite.

    Aquí está una versión actualizada del fragmento de código que usa Mono.defer:

    Flux.generate(
        (SynchronousSink<><orchestrationcontext>> synchronousSink) -> {
            ReactiveTask<orchestrationcontext> task = null;
            if (ctx.getLastExecutedStep() == null) {
                // primera tarea;
                task = getFirstTaskFromDAG();
            } else {
                // generar la tarea siguiente en el downstream según se necesite con Mono.defer
                task = Mono.defer(() -> Mono.just(deriveNextStep(ctx.getLastExecutedStep(), ctx.getDecisionData())))
                        .block();
            }
            if (task.getName().equals("END")) {
                synchronousSink.complete();
            }
            synchronousSink.next(task);
        })
            .publishOn(this.factory.getSharedSchedulerPool())
            .doOnNext(orchestrationContextReactiveTask -> log.info("En el siguiente => {}", orchestrationContextReactiveTask.getTaskName()))
            .concatMap(reactiveTask -> {
                log.info("Ejecutando tarea =>{}", reactiveTask.getTaskName());
                return reactiveTask.run(ctx);
            })
            .onErrorResume(throwable -> buildResponse(ctx, throwable))
            .takeUntil(context -> evaluateStatus(context, tasks))
            .doOnCancel(() -> log.info("La tarea ha sido cancelada"))
            .doOnComplete(() -> log.info("Completado el flujo"))
            .subscribe();
    
    

    Con Mono.defer, la tarea siguiente en el downstream se genera solo cuando se necesite y solo cuando haya una solicitud del downstream para esta. Este enfoque garantiza que Flux.generate no envíe múltiples instancias de la tarea siguiente y esperará la respuesta de la tarea actual antes de llamar al método deriveNextStep para generar la tarea siguiente en el downstream.

    Tenga en cuenta que en lugar de llamar al método deriveNextStep () directamente dentro del método Flux.generate, se usa Mono.just() para crear un Mono que emite la tarea siguiente en el downstream según se necesite, y Mono.defer() garantiza que el Mono se regenere cada vez que haya una solicitud para el siguiente ítem en downstream.

    ¡Espero que esto ayude!

Comments are closed.