es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo cargar multipartes en Amazon S3 de forma asíncrona utilizando el SDK de Java.

En mi aplicación de Java necesito escribir datos en S3, cuyo tamaño desconozco de antemano y generalmente son grandes. Por recomendación en la documentación de AWS S3, estoy utilizando la API de nivel bajo de Java de AWS SDK para escribir datos en el bucket de S3.

En mi aplicación, proporciono un S3BufferedOutputStream, que es una implementación de OutputStream donde otras clases en la aplicación pueden usar este flujo para escribir en el bucket de S3.

Almaceno los datos en un búfer y recorro en bucle, y una vez que los datos son más grandes que el tamaño del bucket, subo los datos en el búfer como un solo UploadPartRequest. Aquí está la implementación del método “write” de S3BufferedOutputStream:

@Override
public void write(byte[] b, int off, int len) throws IOException {
    this.assertOpen();
    int o = off, l = len;
    int size;
    while (l > (size = this.buf.length - position)) {
        System.arraycopy(b, o, this.buf, this.position, size);
        this.position += size;
        flushBufferAndRewind();
        o += size;
        l -= size;
    }
    System.arraycopy(b, o, this.buf, this.position, l);
    this.position += l;
}

La implementación completa es similar a esta: repositorio de código

Mi problema aquí es que cada UploadPartRequest se realiza de forma síncrona, por lo que debemos esperar a que se cargue una parte para poder subir la siguiente parte. Y debido a que estoy utilizando la API de nivel bajo de AWS S3, no puedo aprovechar la carga paralela proporcionada por el TransferManager.

¿Existe alguna forma de lograr la carga paralela utilizando la SDK de nivel bajo? ¿O algún cambio de código que pueda hacerse para operar de forma asíncrona sin corromper los datos cargados y mantener el orden de los datos?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Sí, es posible lograr la carga paralela utilizando el SDK de bajo nivel. Una posible solución para hacer que el proceso de carga sea asíncrono es utilizar la clase CompletableFuture de Java.

    Para implementar esto, puede modificar el código para crear un hilo por cada carga de parte y utilizar CompletableFuture para realizar la carga de manera asíncrona. A continuación se muestra un ejemplo de modificación de código:

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.assertOpen();
        int o = off, l = len;
        int size;
        while (l > (size = this.buf.length - position)) {
            System.arraycopy(b, o, this.buf, this.position, size);
            this.position += size;
            flushBufferAndRewind();
            o += size;
            l -= size;
            uploadPartAsync(); // Llamar al método CompletableFuture para cargar la parte de manera asíncrona
        }
        System.arraycopy(b, o, this.buf, this.position, l);
        this.position += l;
    }
    
    private void uploadPartAsync() {
        UploadPartRequest request = new UploadPartRequest()
                    .withBucketName(bucketName)
                    .withKey(keyName)
                    .withUploadId(uploadId)
                    .withPartNumber(currentPartNumber)
                    .withPartSize(partSize)
                    .withInputStream(new ByteArrayInputStream(buf, 0, this.position));
        CompletableFuture<partetag> future = CompletableFuture.supplyAsync(() -> {
            try {
                return s3.uploadPart(request).getPartETag();
            } catch (AmazonClientException e) {
                throw new IllegalStateException(e);
            }
        }, uploadExecutorService);
    
        future.thenAccept(partETag -> {
            synchronized (this) {
                this.partETags.add(partETag);
                this.currentPartNumber++;
                if (this.position == this.buf.length) {
                    this.position = 0;
                }
            }
        });
    }
    

    El método uploadPartAsync() crea un objeto CompletableFuture que ejecuta el método s3.uploadPart(request) de manera asíncrona en un hilo separado. El resultado de la carga se agrega a una lista de objetos PartETag.

    Tenga en cuenta que debe crear un objeto ExecutorService para administrar los hilos que ejecutan el proceso de carga. Puede crear un objeto ExecutorService utilizando la clase Executors del paquete Java Concurrent, como se muestra en el siguiente código:

    ExecutorService uploadExecutorService = Executors.newFixedThreadPool(5); // Crea un grupo de hilos con 5 hilos
    

    Con estas modificaciones, el proceso de carga se puede realizar de forma asíncrona y se puede beneficiar de la carga paralela proporcionada por el ExecutorService.

Comments are closed.