es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo ver una métrica específica en Spark Structured Streaming con Python

Soy muy nuevo en Spark y Python. Estoy intentando ver cualquier métrica en Spark Structured Streaming (por ejemplo, processedRowsPerSecond), pero no sé cómo hacerlo.

He leído en la “Guía de programación de Structured Streaming” que con print(query.lastProgress) puedes obtener directamente el estado actual y las métricas de una consulta activa, pero si lo escribo, solo obtengo None una vez. La última parte de mi código es la siguiente:

query = windowedCountsDF\
    .writeStream\
    .outputMode('update')\
    .option("truncate", "false") \
    .format('console') \
    .queryName('numbers') \
    .start()

print(query.lastProgress)

query.awaitTermination()

Cualquier idea sobre cómo hacerlo será muy apreciada.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para obtener métricas como processedRowsPerSecond en Spark Structured Streaming, puedes usar el método lastProgress del objeto StreamingQuery. Este método devuelve información sobre el progreso de la consulta, incluyendo métricas como processedRowsPerSecond. Sin embargo, dado que lastProgress devuelve la última actualización de progreso, es necesario llamarlo repetidamente y a intervalos regulares para tener una idea de cómo progresa la consulta.

    Aquí tienes una versión actualizada del código que muestra cómo usar lastProgress para obtener métricas:

    query = windowedCountsDF\
            .writeStream\
            .outputMode('update')\
            .option("truncate", "false") \
            .format('console') \
            .queryName("numbers") \
            .start()
    
    while not query.isActive():
        time.sleep(0.1)
    
    while query.isActive():
        print(query.lastProgress)
        time.sleep(1)
    
    query.awaitTermination()
    

    En esta versión del código, primero esperamos a que la consulta se active. Luego, entramos en un bucle que llama repetidamente a lastProgress y muestra el resultado en la consola. También introducimos un retraso de 1 segundo entre cada llamada a lastProgress, para no saturar la consola con demasiada salida. El bucle termina cuando la consulta ya no está activa, momento en el cual llamamos a awaitTermination para esperar a que la consulta termine.

    Ten en cuenta que el método lastProgress devuelve None si aún no se ha realizado ningún progreso, por lo que es posible que quieras comprobar este caso y manejarlo adecuadamente.

Comments are closed.