es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Confluent Kafka: La configuración de Retention.ms para un tema individual no está funcionando como se esperaba.

Estoy utilizando Confluent Kafka en mi proyecto donde los mensajes enviados a un tema particular necesitan ser eliminados después de un tiempo de retención. Así que configuré retention.ms para los temas individuales, pero no está funcionando (aún puedo ver los mensajes después de su tiempo de retención).

He buscado la mayoría de las preguntas de Stack, pero aún no puedo encontrar una razón / solución adecuada para el problema de retention.ms de Kafka que no funciona.

Seguí los pasos a continuación para crear y establecer el tiempo de retención en ms.

  1. Creé un tema digamos. ‘user_status’
  2. Actualicé su tiempo de retención.ms siguiendo el código a continuación

from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, NewPartitions
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException

topic_config = ConfigResource(‘topic’, ‘user_status’)
topic_config.set_config(‘retention.ms’, ‘5000’)
admin.alter_configs([topic_config])

  1. Envío un mensaje desde el extremo del productor.
  2. Esperé 6000 ms de tiempo.
  3. Intenté recibir el mensaje del tema particular. Pero recibí el mensaje, y todavía no se eliminó el mensaje por la política de retención.

Nota:
Aseguré lo siguiente: después de actualizar retention.ms, verifiqué que se había actualizado en la información del tema de Kafka (Descripción del tema).

Además, actualicé el server.properties con log.retention.check.interval.ms = 1 ms y reinicié el servicio de Kafka después de actualizar el archivo de propiedades.

Lo que espero de la pregunta anterior
Quiero establecer un retention.ms en un tema individual y que el mensaje que pasa ese tiempo se elimine automáticamente según la política de Kafka.

Lo que está sucediendo ahora con mi código actual.
Los mensajes aún son recibidos por el consumidor incluso después del tiempo de retención de retention.ms.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Como experto en TI especializado en Python-3.x, Apache Kafka, Kafka Consumer API, Confluent Platform y Confluent Kafka Python, entiendo el problema que el usuario está enfrentando con la propiedad retention.ms que no funciona para su tema de Kafka.

    En primer lugar, es importante tener en cuenta que establecer el tiempo de retención para un tema de Kafka usando retention.ms no garantiza que los mensajes se eliminarán exactamente después de ese tiempo. Es una política de mejor esfuerzo donde Kafka intentará eliminar mensajes más allá del período de retención durante la compactación del registro o la eliminación de segmentos.

    Dicho esto, hay algunas cosas que se pueden verificar para asegurarse de que retention.ms se aplique correctamente al tema.

    1. Asegúrese de que la propiedad de configuración del corredor “log.retention.check.interval.ms” esté configurada en un valor bajo (idealmente 1ms) para garantizar que Kafka compruebe los mensajes expirados con frecuencia.
    2. Asegúrese de que la configuración del tema se haya actualizado correctamente con retention.ms. Esto se puede verificar describiendo el tema usando la API CLI o AdminClient de Kafka y verificando que el valor de retention.ms sea el esperado.
    3. Verifique que los mensajes que se envían al tema no estén exentos de eliminación debido a otras configuraciones como “cleanup.policy”.
    4. Verifique los registros del corredor en busca de cualquier error o advertencia relacionado con la eliminación de mensajes expirados.

    En el fragmento de código Python dado, el valor de retention.ms se establece correctamente usando la API de administración. Sin embargo, no está claro si el productor está enviando mensajes con alguna clave específica o si el tema tiene alguna política de limpieza específica que pueda afectar la eliminación de mensajes.

    Si se han realizado todas las verificaciones anteriores y el problema persiste, puede ser útil publicar los registros del corredor o buscar soporte en los foros de la comunidad de Kafka para resolver el problema.

Comments are closed.