¿Cómo ocurre el cambio de contexto del slot cuando se realiza una pausa en el hilo de tarea de una fuente personalizada de Flink?
Tengo una fuente personalizada que genera algunos eventos cada x minutos. He consultado este archivo (https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-cli-test/src/main/java/org/apache/flink/streaming/tests/PeriodicStreamingJob.java) y mi código es algo similar a esto:
public class PeriodicSourceGenerator extends RichParallelSourceFunction<genericmetric> {
private transient AtomicBoolean isRunning;
@Override
public void open(final Configuration c) throws Exception {
isRunning = new AtomicBoolean(true);
}
@Override
public void run(SourceContext<genericmetric> ctx) throws Exception {
while (isRunning.get()) {
//noinspection BusyWait
Thread.sleep(300000); // 5 mins
final long ts = System.getCurrentTimeMillis();
final MetricStore.MetricPoint mp = new MetricStore.MetricPoint(ts, 1, -1);
synchronized (ctx.getCheckpointLock()) {
ctx.collectWithTimestamp(new GenericMetric(mk, MetricName.vRNI_internal_droppedTx_flow_absolute_latest_number, mp), ts);
ctx.collectWithTimestamp(new GenericMetric(mk, MetricName.vRNI_internal_droppedRx_flow_absolute_latest_number, mp), ts);
}
}
logger.info("Job cancelled. Shutting Down Periodic Source Generator");
}
@Override
public void cancel() {
isRunning.set(false);
}
}
Estoy ejecutando múltiples flujos en un solo trabajo de Flink que se ve algo así.
Estoy ejecutando Flink con el encadenamiento de operadores predeterminado y el compartimiento de ranuras habilitado. Todos mis operadores tienen la misma paralelización, 30, y tengo 5 gestores de tareas para que cada gestor de tareas tenga 6 ranuras.
¿Alguien puede decirme cómo afectará el tiempo de espera en el pipeline PeriodicSourceGenerator
al pipeline Collection Source
? Mi entendimiento es que el tiempo de espera hará que el pipeline generador PeriodicSource
se cambie de contexto por el pipeline Collection Source
y que no se pausará toda la ranura durante 5 minutos. ¿Es correcto mi entendimiento?
- Versión de Flink: 1.13.2
davy.ai
El sleep en el pipeline
PeriodicSourceGenerator
no afectará al pipelineCollection Source
, ya que ambos se ejecutan en paralelo en hilos diferentes. Mientras el pipelinePeriodicSourceGenerator
está durmiendo, el pipelineCollection Source
continuará ejecutándose y procesando eventos.Dado que has habilitado el uso compartido de slot y el encadenamiento de operadores, el pipeline
Collection Source
y otros pipelines en el trabajo se ejecutarán simultáneamente en los mismos slots, y su ejecución se intercalará. Esto significa que mientras un pipeline espera a que se generen eventos, otros pipelines pueden continuar procesando eventos.Por lo tanto, el sleep en el pipeline
PeriodicSourceGenerator
no hará que todo el slot se pause durante 5 minutos. En cambio, otros pipelines pueden utilizar el slot para seguir procesando eventos, y el pipelinePeriodicSourceGenerator
reanudará su ejecución después de los 5 minutos de sleep.En resumen, tu comprensión es incorrecta. El sleep en el pipeline
PeriodicSourceGenerator
no hará que todo el slot se pause o se cambie el contexto por otros pipelines. En su lugar, la ejecución de los pipelines en el mismo slot se intercalará, y cada pipeline se ejecutará de forma independiente sin afectar a los demás.