es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Por qué el procesamiento de tareas es en serie en Flink?

//抓包
DataStream<byte[]> logSource = env.addSource(new PacpParalleSourceFunction()).setParallelism(CAPTURE_PACKAGE_PARALLELISM);

//解析
XmlLogParse xmlLogParse = new XmlLogParse();
DataStream<jsonobject> objectDataStream = logSource.map(new Log2ObjFunction(xmlLogParse)).setParallelism(PARSE_LOG_PARALLELISM);

//写csv   La paralelización del operador no paralelo debe ser 1
LogStorage logStorage = new WriteFileStorage();
objectDataStream.countWindowAll(ConstantField.BATCH_DEAL_COUNT).apply(
        new AllWindowFunction<jsonobject, object,="" globalwindow="">() {
            @Override
            public void apply(GlobalWindow globalWindow, Iterable<jsonobject> iterable, Collector<object> collector) throws Exception {
                ArrayList<jsonobject> list = Lists.newArrayList(iterable);
                if (list.size() > 0) {
                    long start = System.currentTimeMillis();
                    logStorage.doStorage(list);
                    logger.info("Escribiendo en archivo " + list.size() + " tiempo transcurrido " + (System.currentTimeMillis() - start) + "ms " + Thread.currentThread().getName());
                }
            }
        }
).name("salida a csv").setParallelism(1);

//写kafka
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_HOST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("topic", KAFKA_DST_TOPIC_NAME);
DataStreamSink<jsonobject> obj2kafka = objectDataStream.addSink(new KafkaSinkFunction(props)).setParallelism(WRITE_KAFKA_PARALLELISM);

obj2kafka.name("salida a kafka");

// ejecutar aplicación
env.execute("elksoc-log4p");

¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?</jsonobject,></byte[]>

Tags:  ,

Answer

  1. Avatar for davy.ai

    Para hacer que estas dos tareas sean paralelas, puedes aumentar la paralelización de las corrientes de datos logSource y objectDataStream. Puedes utilizar el método setParallelism() para especificar el grado de paralelización para cada corriente de datos. Por ejemplo, puedes establecer logSource.setParallelism(n) y objectDataStream.setParallelism(m) para ejecutar estas tareas en paralelo utilizando n y m instancias paralelas respectivamente.

    Además, también puedes optimizar el código para asegurarte de que los operadores LogStorage y KafkaSinkFunction puedan ejecutarse en paralelo con objectDataStream reduciendo las partes no paralelas del código. Por ejemplo, puedes hacer que las implementaciones de LogStorage y KafkaSinkFunction sean paralelas, o puedes escribir datos en múltiples temas de Kafka para lograr un procesamiento paralelo.

Comments are closed.