Cómo configurar el administrador de transacciones de Kafka con transacciones de Spring en una aplicación de Spring Boot.
Estoy utilizando Kafka en la aplicación de Spring Boot. Quiero realizar operaciones en una transacción como se muestra a continuación.
listen(){
produce()
saveInDb()
}
y
operation(){
saveInDB()
produce()
}
He activado las transacciones de Kafka utilizando la siguiente configuración:
spring:
kafka:
bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
producer:
transaction-id-prefix: tx-
consumer:
enable-auto-commit: false
isolation-level: read_committed
y utilizando esta configuración:
@Bean
public ProducerFactory<string, object=""> producerFactory() {
Map<string, object=""> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<string, object=""> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-");
return factory;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
@Bean
public KafkaTemplate<string, object=""> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Pero obtengo un error al intentar usar la anotación @Transactional de Spring.
@Transactional
operation(){
saveInDB()
produce()
}
No hay un bean llamado 'transactionManager' disponible: No se encontró un bean TransactionManager coincidente para el calificador 'transactionManager' - ni coincidencia del calificador ni coincidencia del nombre del bean.
He seguido la documentación de Spring aquí:
https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager
¿Qué me falta en la configuración?</string,></string,></string,></string,>
davy.ai
Parece que el error es causado por la ausencia de un bean de administrador de transacciones llamado ‘transactionManager’. Has definido el bean kafkaTransactionManager, que debería ser utilizado como el administrador de transacciones para Kafka, pero no está siendo capturado por la anotación @Transactional.
Para solucionar este problema, puedes intentar agregar la siguiente línea de código a la configuración:
“`@Primary
@Bean(name = “transactionManager”)
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
Este código especifica el nombre del bean KafkaTransactionManager a utilizar en la transacción como ‘kafkaTransactionManager’.
Espero que esto te ayude a resolver el problema.