es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Escribir datos JSON de Kafka a tabla de BigQuery cada 5 minutos

    Para lograr esta tarea, podemos seguir los siguientes pasos:

    1. Crear una API de consumidor de Kafka utilizando la biblioteca “kafka-consumer-api” en Python 3.x.
    2. Utilizar la API de consumidor de Kafka para consumir los datos JSON del tema de Kafka.
    3. Utilizar la biblioteca “google-cloud-bigquery” para escribir los datos JSON en una tabla de BigQuery.
    4. Implementar un mecanismo de programación de trabajos basado en tiempo utilizando las bibliotecas “time” y “schedule”. Esto asegurará que los datos se escriban en la tabla de BigQuery cada 5 minutos.

    Aquí hay un fragmento de código de alto nivel que demuestra cómo implementar los pasos anteriores:

    from kafka import KafkaConsumer
    from google.cloud import bigquery
    import time
    import schedule
    import json
    
    # Configurar API de consumidor de Kafka
    consumer = KafkaConsumer(<nombre-del-tema>, bootstrap_servers=['<servidor:puerto>'],
                             auto_offset_reset='earliest',
                             enable_auto_commit=True,
                             group_id='<identificador-del-grupo>',
                             value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    
    # Configurar cliente de BigQuery
    client = bigquery.Client()
    
    # Definir función para escribir datos en la tabla de BigQuery
    def write_to_bigquery():
        data = []
        for message in consumer:
            # Recopilar datos del tema de Kafka
            data.append(message.value)
    
            # Escribir datos en la tabla de BigQuery cada 5 minutos
            if len(data) == 100:
                table_id = '<identificador-del-proyecto>.<conjunto-de-datos>.<tabla>'
                table = client.get_table(table_id)
                errors = client.insert_rows_json(table, data)
                if errors == []:
                    print('Datos cargados correctamente en BigQuery.')
                else:
                    print('Error al cargar datos en BigQuery:', errors)
                data = []
    
    # Programar la función write_to_bigquery para que se ejecute cada 5 minutos
    schedule.every(5).minutes.do(write_to_bigquery)
    
    # Ejecutar el bucle de programación
    while True:
        schedule.run_pending()
        time.sleep(1)
    

    En el fragmento de código anterior, primero configuramos la API de consumidor de Kafka para consumir los datos JSON del tema de Kafka. Luego definimos una función llamada write_to_bigquery que recopila los datos del tema de Kafka y los escribe en una tabla de BigQuery. Utilizamos la biblioteca “schedule” para programar la función “write_to_bigquery” para que se ejecute cada 5 minutos. Finalmente, iniciamos el bucle de programación utilizando el bucle “while True”.

    Tenga en cuenta que hemos establecido el umbral de recopilación de datos en 100 en la función “write_to_bigquery”. Esto significa que los datos se escribirán en la tabla de BigQuery solo cuando se alcance el umbral de recopilación de datos de 100. Puede ajustar este umbral según sus requisitos. Además, asegúrese de reemplazar los marcadores de posición ““, ““, ““, ““, “” y “” con los valores apropiados.

Comments are closed.