es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Qué tan eficiente es el estado utilizado para las consultas de top-n en Apache Flink?

Me pregunto cuánto estado utiliza Apache Flink para consultas Top-N y tablas en general.

Primero, estoy utilizando Flink SQL para procesar mensajes de un tema de Kafka:

CREATE TABLE purchases (
  country STRING,
  product STRING
) WITH (
   'connector' = 'kafka',
   'topic' = 'purchases',
   'properties.bootstrap.servers' = 'kafka:29092',
   'value.format' = 'json',
   'properties.group.id' = '1',
   'scan.startup.mode' = 'earliest-offset'
);

También inicialicé un conector JDBC:

CREATE TABLE aggregations (
  `country` STRING,
  `product` STRING,
  `purchases` BIGINT NOT NULL,
  PRIMARY KEY (`country`, `product`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://postgres:5432/postgres?&user=postgres&password=postgres',
  'table-name' = 'aggregations'
);

Finalmente, inicio la agregación:

insert into aggregations
SELECT `country`, `product`, `purchases`
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
  FROM (select country, product, count(*) as `purchases` from purchases group by country, product))
WHERE row_num <= 3;

Desde la documentación de administración de estado de Flink, dice:

Conceptualmente, las tablas fuente nunca se mantienen enteramente en un estado. Un desarrollador maneja tablas lógicas (es decir, tablas dinámicas). Sus requisitos de estado dependen de las operaciones utilizadas.

Entonces, ¿entiendo correctamente que Flink no guarda las filas de la tabla “purchases” del conector de Kafka?

Más importante aún, en la agregación:

select country, product, count(*) as `purchases` from purchases group by country, product

¿Flink mantiene cada clave de país y producto en el estado?

Tags:  ,

Answer

  1. Avatar for davy.ai

    Según la documentación de gestión de estado de Flink, parece que Flink no guarda las filas completas de la tabla purchases provenientes del conector Kafka en el estado. En cambio, maneja tablas lógicas y los requisitos de estado dependen de las operaciones utilizadas. En el ejemplo dado, la tabla purchases se procesa con una operación GROUP BY y la tabla resultante se utiliza para una agregación posterior.

    En la agregación:

    select country, product, count(*) as `purchases` from purchases group by country, product
    

    Flink necesita realizar un seguimiento de cada combinación distinta de claves country y product, así como de los recuentos o agregados correspondientes. Dependiendo del tamaño de los datos, esto puede requerir una cantidad significativa de estado. Sin embargo, Flink tiene varios backends de estado y estrategias para gestionar el estado según el caso de uso y los recursos disponibles. Por lo tanto, la cantidad exacta de estado utilizada por Flink puede variar según diversos factores.

Comments are closed.