es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Enriquecer datos en tiempo real con Apache Beam y Datastore

Estoy trabajando con Google Dataflow (python) para intentar enriquecer los datos provenientes de Pub/Sub e insertarlos en BigQuery.

Este es un diagrama de alto nivel de la solución: enlace

He trabajado con Databrick y estoy intentando hacer algo similar, pero hasta ahora obtengo un rendimiento deficiente.

Estoy usando una conexión de Datastore dentro de un ParDo para realizar consultas y, luego, otro ParDo para actualizar Datastore.

¿Cómo puedo hacer esto para obtener un mejor rendimiento?

Datastore solo tiene un tipo y el proceso de enriquecimiento consiste en enriquecer el mensaje recibido con la última información guardada y actualizar la entidad con la información que se está procesando.

¡Gracias por la ayuda!

Edit 1:
Esta es una de las funciones ParDo para consultar información de Datastore.

    def process(self, element):
        from google.cloud import datastore

        logging.info(f"Enriqueciendo {element.get('deviceId')}")

        datastore_client = datastore.Client()
        datastore_kind = "Device"

        key = datastore_client.key(datastore_kind, element['deviceId'])
        result = datastore_client.get(key)

        mapper = self.SensorToDatastoreMapper()
        should_update, mapped_data = mapper.map_record_to_entity(element, result)

        element.update(mapped_data)
Tags:  , ,

Answer

  1. Avatar for davy.ai

    Como experto en IT especializado en python, google-cloud-dataflow y apache-beam, hay algunas formas de mejorar el rendimiento de tu proceso de enriquecimiento de datos mientras utilizas google dataflow. Aquí tienes algunas sugerencias:

    1. Utiliza procesamiento en lotes: En lugar de procesar los mensajes uno por uno, procésalos en lotes. Puedes utilizar la transformación GroupByKey para agrupar los mensajes por su ID de dispositivo, lo que creará un lote de mensajes para cada dispositivo. De esta forma, puedes reducir la cantidad de consultas a la base de datos que necesitas hacer, lo que puede mejorar el rendimiento.
    2. Utiliza entradas secundarias: Si los datos que necesitas para enriquecer los mensajes son relativamente pequeños, puedes cargarlos en memoria utilizando una entrada secundaria. Las entradas secundarias son datos de solo lectura a los que puede acceder una transformación ParDo durante la ejecución del pipeline. Al cargar los datos en memoria, puedes evitar consultar la base de datos por completo, lo que puede mejorar significativamente el rendimiento.

    3. Utiliza caché: Si los datos que necesitas para enriquecer los mensajes son grandes y no caben en memoria, puedes utilizar caché. Esta técnica implica almacenar en caché los datos en un sistema de almacenamiento rápido, como Memcached o Redis. Luego, cuando se necesita enriquecer un mensaje, puedes verificar primero si los datos requeridos se encuentran en la caché. Si es así, puedes recuperarlos rápidamente. Si no, puedes consultar la base de datos y luego almacenar el resultado en la caché para su uso futuro.

    4. Optimiza tus consultas a la base de datos: Asegúrate de utilizar índices y proyecciones para optimizar tus consultas a la base de datos. Los índices acelerarán tus consultas y las proyecciones reducirán la cantidad de datos que se deben leer de la base de datos.

    Al aplicar estas técnicas, puedes mejorar significativamente el rendimiento de tu proceso de enriquecimiento de datos mientras trabajas con google dataflow en python.

Comments are closed.