es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo ocurre el cambio de contexto del slot cuando se realiza una pausa en el hilo de tarea de una fuente personalizada de Flink?

Tengo una fuente personalizada que genera algunos eventos cada x minutos. He consultado este archivo (https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-cli-test/src/main/java/org/apache/flink/streaming/tests/PeriodicStreamingJob.java) y mi código es algo similar a esto:

public class PeriodicSourceGenerator extends RichParallelSourceFunction<genericmetric> {
    private transient AtomicBoolean isRunning;
    @Override
    public void open(final Configuration c) throws Exception {
        isRunning = new AtomicBoolean(true);
    }
    @Override
    public void run(SourceContext<genericmetric> ctx) throws Exception {
        while (isRunning.get()) {
            //noinspection BusyWait
            Thread.sleep(300000); // 5 mins
            final long ts = System.getCurrentTimeMillis();
            final MetricStore.MetricPoint mp = new MetricStore.MetricPoint(ts, 1, -1);
            synchronized (ctx.getCheckpointLock()) {
                ctx.collectWithTimestamp(new GenericMetric(mk, MetricName.vRNI_internal_droppedTx_flow_absolute_latest_number, mp), ts);
                ctx.collectWithTimestamp(new GenericMetric(mk, MetricName.vRNI_internal_droppedRx_flow_absolute_latest_number, mp), ts);
            }
        }
        logger.info("Job cancelled. Shutting Down Periodic Source Generator");
    }

    @Override
    public void cancel() {
        isRunning.set(false);
    }
}

Estoy ejecutando múltiples flujos en un solo trabajo de Flink que se ve algo así.

Imagen

Estoy ejecutando Flink con el encadenamiento de operadores predeterminado y el compartimiento de ranuras habilitado. Todos mis operadores tienen la misma paralelización, 30, y tengo 5 gestores de tareas para que cada gestor de tareas tenga 6 ranuras.

¿Alguien puede decirme cómo afectará el tiempo de espera en el pipeline PeriodicSourceGenerator al pipeline Collection Source? Mi entendimiento es que el tiempo de espera hará que el pipeline generador PeriodicSource se cambie de contexto por el pipeline Collection Source y que no se pausará toda la ranura durante 5 minutos. ¿Es correcto mi entendimiento?

  • Versión de Flink: 1.13.2
Tags:  

Answer

  1. Avatar for davy.ai

    El sleep en el pipeline PeriodicSourceGenerator no afectará al pipeline Collection Source, ya que ambos se ejecutan en paralelo en hilos diferentes. Mientras el pipeline PeriodicSourceGenerator está durmiendo, el pipeline Collection Source continuará ejecutándose y procesando eventos.

    Dado que has habilitado el uso compartido de slot y el encadenamiento de operadores, el pipeline Collection Source y otros pipelines en el trabajo se ejecutarán simultáneamente en los mismos slots, y su ejecución se intercalará. Esto significa que mientras un pipeline espera a que se generen eventos, otros pipelines pueden continuar procesando eventos.

    Por lo tanto, el sleep en el pipeline PeriodicSourceGenerator no hará que todo el slot se pause durante 5 minutos. En cambio, otros pipelines pueden utilizar el slot para seguir procesando eventos, y el pipeline PeriodicSourceGenerator reanudará su ejecución después de los 5 minutos de sleep.

    En resumen, tu comprensión es incorrecta. El sleep en el pipeline PeriodicSourceGenerator no hará que todo el slot se pause o se cambie el contexto por otros pipelines. En su lugar, la ejecución de los pipelines en el mismo slot se intercalará, y cada pipeline se ejecutará de forma independiente sin afectar a los demás.

Comments are closed.