¿Por qué el procesamiento de tareas es en serie en Flink?
//抓包
DataStream<byte[]> logSource = env.addSource(new PacpParalleSourceFunction()).setParallelism(CAPTURE_PACKAGE_PARALLELISM);
//解析
XmlLogParse xmlLogParse = new XmlLogParse();
DataStream<jsonobject> objectDataStream = logSource.map(new Log2ObjFunction(xmlLogParse)).setParallelism(PARSE_LOG_PARALLELISM);
//写csv La paralelización del operador no paralelo debe ser 1
LogStorage logStorage = new WriteFileStorage();
objectDataStream.countWindowAll(ConstantField.BATCH_DEAL_COUNT).apply(
new AllWindowFunction<jsonobject, object,="" globalwindow="">() {
@Override
public void apply(GlobalWindow globalWindow, Iterable<jsonobject> iterable, Collector<object> collector) throws Exception {
ArrayList<jsonobject> list = Lists.newArrayList(iterable);
if (list.size() > 0) {
long start = System.currentTimeMillis();
logStorage.doStorage(list);
logger.info("Escribiendo en archivo " + list.size() + " tiempo transcurrido " + (System.currentTimeMillis() - start) + "ms " + Thread.currentThread().getName());
}
}
}
).name("salida a csv").setParallelism(1);
//写kafka
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_HOST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("topic", KAFKA_DST_TOPIC_NAME);
DataStreamSink<jsonobject> obj2kafka = objectDataStream.addSink(new KafkaSinkFunction(props)).setParallelism(WRITE_KAFKA_PARALLELISM);
obj2kafka.name("salida a kafka");
// ejecutar aplicación
env.execute("elksoc-log4p");
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?
¿Cómo puedo hacer que estas dos tareas sean paralelas?</jsonobject,></byte[]>
davy.ai
Para hacer que estas dos tareas sean paralelas, puedes aumentar la paralelización de las corrientes de datos
logSource
yobjectDataStream
. Puedes utilizar el métodosetParallelism()
para especificar el grado de paralelización para cada corriente de datos. Por ejemplo, puedes establecerlogSource.setParallelism(n)
yobjectDataStream.setParallelism(m)
para ejecutar estas tareas en paralelo utilizando n y m instancias paralelas respectivamente.Además, también puedes optimizar el código para asegurarte de que los operadores
LogStorage
yKafkaSinkFunction
puedan ejecutarse en paralelo conobjectDataStream
reduciendo las partes no paralelas del código. Por ejemplo, puedes hacer que las implementaciones deLogStorage
yKafkaSinkFunction
sean paralelas, o puedes escribir datos en múltiples temas de Kafka para lograr un procesamiento paralelo.