es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Contador de palabras con marca de tiempo en Python

Este ejemplo se extrae de la Guía de Programación de Structured Streaming de Spark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

# Crear DataFrame que representa el flujo de líneas de entrada desde la conexión a localhost:9999
lines = spark \
     .readStream \
     .format("socket") \
     .option("host", "localhost") \
     .option("port", 9999) \
     .load()

# Dividir las líneas en palabras
words = lines.select(
    explode(
       split(lines.value, " ")
       ).alias("word"),
       lines.timestamp.alias('time')
)

# Generar el recuento de palabras en ejecución
wordCounts = words.groupBy("word").count() # línea a modificar

# Iniciar la consulta que imprime los recuentos en ejecución en la consola
query = wordCounts \
      .writeStream \
      .outputMode("complete") \
      .format("console") \
      .start()

query.awaitTermination()

Necesito crear una tabla con cada palabra y su tiempo de entrada. La tabla de salida debería ser así:

+-------+--------------------+
|  word |            time    |
+-------+--------------------+
|   car |2021-12-16  12:21:..|
+-------+--------------------+

¿Cómo puedo hacerlo? Creo que la línea marcada con “# línea a modificar” es la única línea que se debe modificar.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, tienes razón. La línea marcada con “#line to modify” necesita ser modificada para incluir la columna “time” en la tabla de salida.

    Puedes actualizar el DataFrame “wordCounts” para incluir la columna “time” modificando la cláusula groupBy de la siguiente manera:

    wordCounts = words.groupBy("word", "time").count()
    

    Esto creará un DataFrame con cada palabra y su tiempo de entrada. Para mostrar la tabla de salida en la consola, puedes utilizar el mismo fragmento de código que se proporciona en el ejemplo:

    query = wordCounts \
              .writeStream \
              .outputMode("complete") \
              .format("console") \
              .start()
    
    query.awaitTermination()
    

    Esto iniciará la consulta e imprimirá la tabla de salida en la consola en el formato requerido.

Comments are closed.