es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

El formato de entrada `textInputFormat` de Flink no procesa archivos comprimidos en formato GZ desde el sistema de archivos `Source` en AWS S3.

Seguí (https://stackoverflow.com/questions/49122170/zip-compressed-input-for-apache-flink?newreg=4dce460b71504a16ab651b171da287de) y escribí el siguiente fragmento de código para procesar archivos de registro .gz en un directorio con un simple TextInputFormat. Funciona en mi directorio de prueba local, escanea y abre automáticamente el contenido del archivo .gz. Sin embargo, cuando lo ejecuto con una fuente de cubo de S3, no procesa archivos comprimidos .gz. Este trabajo de Flink sigue abriendo archivos .log en el cubo de S3. Parece que simplemente no descomprime los archivos .gz. ¿Cómo puedo resolver esto en el sistema de archivos de S3?

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs";
final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setGlobalJobParameters(params);

TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
textInputFormat.setNestedFileEnumeration(true);

DataStream<string> stream = env.readFile(
        textInputFormat, sourceLogDirPath,
        FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

stream.print();
env.execute();
}

Esta es mi ruta de clase jar de las bibliotecas de Flink:

/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink2.12-1.13.2.jar:/opt/flink/lib/flink-table2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentrylog4j2deploy.jar:/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::

P.D. También probé s3a://<bucket>/ sin éxito.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para manejar archivos comprimidos .gz al leer desde una fuente de cubo S3 con Flink, puedes usar la opción CompressionType en el constructor de TextInputFormat. Esto es lo que puedes modificar en tu código:

    TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
    textInputFormat.setNestedFileEnumeration(true);
    textInputFormat.setCompressionType(CompressionType.GZIP); // agregar esta línea para especificar archivos comprimidos
    

    Esto informará a Flink que los archivos de entrada están comprimidos con GZIP y, por lo tanto, debe descomprimirlos antes de procesarlos. Asegúrate de importar org.apache.flink.api.common.io.TextInputFormat.CompressionType si aún no está importado.

    Ten en cuenta que si estás utilizando el sistema de archivos s3a, es posible que también necesites establecer la configuración fs.s3a.impl.disable.cache en true para evitar el almacenamiento en caché de archivos comprimidos. Puedes agregar esta línea antes de crear el StreamExecutionEnvironment:

    System.setProperty("fs.s3a.impl.disable.cache", "true");
    

    Esto debería resolver el problema de no poder procesar archivos comprimidos .gz en una fuente de cubo S3.

Comments are closed.