El formato de entrada `textInputFormat` de Flink no procesa archivos comprimidos en formato GZ desde el sistema de archivos `Source` en AWS S3.
Seguí (https://stackoverflow.com/questions/49122170/zip-compressed-input-for-apache-flink?newreg=4dce460b71504a16ab651b171da287de) y escribí el siguiente fragmento de código para procesar archivos de registro .gz
en un directorio con un simple TextInputFormat
. Funciona en mi directorio de prueba local, escanea y abre automáticamente el contenido del archivo .gz
. Sin embargo, cuando lo ejecuto con una fuente de cubo de S3, no procesa archivos comprimidos .gz
. Este trabajo de Flink sigue abriendo archivos .log
en el cubo de S3. Parece que simplemente no descomprime los archivos .gz
. ¿Cómo puedo resolver esto en el sistema de archivos de S3?
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs";
final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setGlobalJobParameters(params);
TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
textInputFormat.setNestedFileEnumeration(true);
DataStream<string> stream = env.readFile(
textInputFormat, sourceLogDirPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
stream.print();
env.execute();
}
Esta es mi ruta de clase jar de las bibliotecas de Flink:
/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink2.12-1.13.2.jar:/opt/flink/lib/flink-table2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentrylog4j2deploy.jar:/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::
P.D. También probé s3a://<bucket>/
sin éxito.
davy.ai
Para manejar archivos comprimidos
.gz
al leer desde una fuente de cubo S3 con Flink, puedes usar la opciónCompressionType
en el constructor deTextInputFormat
. Esto es lo que puedes modificar en tu código:Esto informará a Flink que los archivos de entrada están comprimidos con GZIP y, por lo tanto, debe descomprimirlos antes de procesarlos. Asegúrate de importar
org.apache.flink.api.common.io.TextInputFormat.CompressionType
si aún no está importado.Ten en cuenta que si estás utilizando el sistema de archivos
s3a
, es posible que también necesites establecer la configuraciónfs.s3a.impl.disable.cache
entrue
para evitar el almacenamiento en caché de archivos comprimidos. Puedes agregar esta línea antes de crear elStreamExecutionEnvironment
:Esto debería resolver el problema de no poder procesar archivos comprimidos
.gz
en una fuente de cubo S3.