es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Python Azure Functions kafka connection pool: Python Azure Functions conexión de grupo de conexión de kafka

En la función de Azure, ¿cómo hacemos la conexión del productor de Kafka como singleton o agrupación de conexiones? Cada vez que se dispara la función se crea una nueva conexión de Kafka.

import json
import logging
import os, time
import azure.functions as func
from confluent_kafka import Producer

def main(event: func.EventGridEvent):

    ## Configuración de Kafka
    conf = {
        'bootstrap.servers': os.getenv('AZURE_EVENT_HUB_SERVER') + ':9093',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': os.getenv('AZURE_EVENT_HUB_CONN_STRING')
    }


    data = event.get_json()
    topic = "events"

    p = Producer(**conf)

    if topic is not None:
        try:
            p.produce(topic, key=data.id, value=data)
            # logging.info('Producing message %s', file_path)
        except BufferError:
            logging.error('%% La cola local de productores está llena (%d mensajes en espera de entrega): intente nuevamente\n', len(p))
        # Esperar hasta que todos los mensajes hayan sido entregados.
        p.flush()

        logging.info(f'Procesamiento completado correctamente para el evento: {event.get_json()}')
    else:
        logging.error(f'Falló')
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para hacer la conexión del productor de Kafka como un singleton o una agrupación de conexiones en la función de Azure, podemos implementar una variable global que contenga la instancia del productor y verificar si ya se ha creado antes de crear una nueva instancia.

    Aquí tienes un ejemplo de implementación:

    import json
    import logging
    import os, time
    import azure.functions as func
    from confluent_kafka import Producer
    
    # creamos una variable global para mantener la instancia del productor de Kafka
    producer_instance = None
    
    def get_producer(conf):
        global producer_instance
        if producer_instance is None:
            producer_instance = Producer(**conf)
        return producer_instance
    
    def main(event: func.EventGridEvent):
    
        ## configuración de Kafka
        conf = {
        'bootstrap.servers': os.getenv('AZURE_EVENT_HUB_SERVER') + ':9093',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': os.getenv('AZURE_EVENT_HUB_CONN_STRING')
        }
    
        data = event.get_json()
        topic = "events"
    
        # obtenemos la instancia del productor de Kafka
        p = get_producer(conf)
    
        if topic is not None:
            try:
                p.produce(topic, key=data.id, value=data)
                # logging.info('Producing message %s', file_path)
            except BufferError:
                logging.error('%% La cola local del productor está llena (%d mensajes esperando entrega): intenta de nuevo\n', len(p))
            # Esperamos hasta que todos los mensajes hayan sido entregados
            p.flush()
    
            logging.info(f'Procesamiento completado correctamente para el evento: {event.get_json()}')
        else:
            logging.error(f'Falló')
    
    

    En esta implementación, la función get_producer() es responsable de crear y devolver la instancia del productor de Kafka. Primero verifica si la variable global producer_instance es None, lo que significa que la instancia del productor aún no se ha creado. En este caso, crea una nueva instancia del productor utilizando la configuración conf y la asigna a la variable producer_instance. En llamadas posteriores a get_producer(), simplemente devuelve la instancia existente.

    Al utilizar esta implementación, nos aseguramos de que solo se cree una instancia del productor de Kafka durante toda la vida útil de la función de Azure, y podemos reutilizarla en varias invocaciones de la función. Esto puede ayudar a mejorar el rendimiento y reducir la sobrecarga de crear una nueva instancia del productor cada vez que se activa la función.

Comments are closed.