es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Obteniendo archivo desde FTP usando Spring Integration.

Tengo un proyecto en el que necesito descargar archivos .csv desde una carpeta remota de FTP. El problema es que los archivos son grandes, digamos de 6-7 MB, y la operación comienza a leerlos incluso antes de que se hayan transferido completamente desde el proveedor externo, lo que genera una excepción.

Vi que podemos usar un LastModifiedFileListFilter, pero no estoy seguro de si esta es la solución adecuada.

Aquí está mi muestra de código.

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
        .from(Ftp.inboundAdapter(ftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory("/ftp/GE/Inbound")
                .patternFilter("*.csv")
                .deleteRemoteFiles(true)
                .localDirectory(new File("inbound"))
                .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX),
            e -> e.id("ftpInboundAdapter")
                .poller(Pollers.fixedDelay(5000))
                .autoStartup(true))
        .transform(e -> {
            log.info("Enviando archivo CSV " + e + " al servidor FTP");
            return e;
        })
        .handle(Ftp.outboundAdapter(ftpSessionFactory())
                .useTemporaryFileName(true)
                .autoCreateDirectory(true)
                .remoteDirectory("/ftp/GE/Inbound/history"))
        .get();
}

Excepción:

Caused by: org.springframework.messaging.MessagingException: Se produjo un error al copiar '/ftp/GE/Inbound/OA_ex_PK_2020_2021.csv' desde el directorio remoto al directorio local; la excepción anidada es java.io.IOException: Error al copiar '/ftp/GE/Inbound/OA_ex_PK_2020_2021.csv'. El servidor respondió con: 550 El proceso no puede acceder al archivo porque está siendo utilizado por otro proceso.
Tags:  ,

Answer

  1. Avatar for davy.ai

    Una solución a este problema sería utilizar un LastModifiedFileListFilter en combinación con un ChainFileListFilter. El ChainFileListFilter te permite definir múltiples filtros que se aplican secuencialmente a los archivos remotos.

    Aquí hay una versión actualizada del fragmento de código que incluye estos filtros:

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        ChainFileListFilter<ftpfile> chainFilter = new ChainFileListFilter<>();
        chainFilter.addFilter(new LastModifiedFileListFilter<>(10_000)); // solo considera archivos modificados en los últimos 10 segundos
        chainFilter.addFilter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "prefix_")); // evita procesar archivos duplicados
    
        return IntegrationFlows
            .from(Ftp.inboundAdapter(ftpSessionFactory())
                    .preserveTimestamp(true)
                    .remoteDirectory("/ftp/GE/Inbound")
                    .patternFilter("*.csv")
                    .deleteRemoteFiles(true)
                    .localDirectory(new File("inbound"))
                    .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
                    .filter(chainFilter), // aplica los filtros a los archivos
                e -> e.id("ftpInboundAdapter")
                    .poller(Pollers.fixedDelay(5000))
                    .autoStartup(true))
            .transform(e -> {
                log.info("Enviando archivo CSV " + e + " al servidor FTP");
                return e;
            })
            .handle(Ftp.outboundAdapter(ftpSessionFactory())
                .useTemporaryFileName(true)
                .autoCreateDirectory(true)
                .remoteDirectory("/ftp/GE/Inbound/history"))
            .get();
    }
    

    Esta versión actualizada del método ftpInboundFlow crea un nuevo ChainFileListFilter y le agrega dos filtros:

    • LastModifiedFileListFilter: Este filtro solo considera archivos que han sido modificados en los últimos 10 segundos. Esto ayuda a evitar el procesamiento de archivos que aún se están escribiendo en el directorio remoto.
    • FtpPersistentAcceptOnceFileListFilter: Este filtro utiliza un SimpleMetadataStore para realizar un seguimiento de los archivos que ya han sido procesados. El parámetro "prefix_" garantiza que solo se realice un seguimiento de los archivos con ese prefijo (en caso de que haya otros archivos en el directorio que no nos interesen).

    Ten en cuenta que pasamos el chainFilter al método filter en la instancia de Ftp.inboundAdapter, que aplica los filtros a los archivos remotos antes de procesarlos.

    Con estos filtros en su lugar, tu flujo de integración solo debería procesar archivos que se hayan transferido completamente al directorio remoto y evitará el procesamiento de archivos duplicados.

Comments are closed.