es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo obtener DataFrame de la tabla de la base de datos recuperada con JDBC, cortada por múltiples rangos de fechas, con un rango de fecha por fila en otro DataFrame.

Tengo el DataFrame con la hora de inicio del evento:

val dailySummariesDfVisualize =
  dailySummariesDf
    .orderBy("event_time").registerTempTable("raw")    

val df = sqlContext.sql("select * from raw")
df.show()

+--------------------+-------------------+-------------+-----------------+---------------+
|          event_type|         event_time|event_payload|coffee_machine_id|digital_twin_id|
+--------------------+-------------------+-------------+-----------------+---------------+
|Large_Cup_Button_...|2021-03-24 07:06:34|         null|       NP20100005|     NP20100005|
|Large_Cup_Button_...|2021-03-24 07:07:41|         null|       NP20100005|     NP20100005|

Me gustaría que cada fila de event_time sea el comienzo del intervalo de tiempo y que event_time+1 minuto sea el final del intervalo de tiempo, y que haya tantos intervalos de tiempo como filas haya en el DataFrame anterior. Me gustaría hacer una consulta y extraer del otro DataFrame solo los elementos entre el inicio y el final de cada intervalo de tiempo. ¿Cómo puedo hacer eso? ¿La única opción es unir, o puede ser útil una consulta anidada? ¿O tal vez solo un foreach en el DataFrame puede ser útil allí?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para extraer elementos de otra tabla que caigan dentro de cada intervalo de tiempo, se puede utilizar una unión o una consulta anidada. Aquí hay un ejemplo utilizando una consulta anidada:

    import org.apache.spark.sql.functions.{col, expr}
    
    val newDf = df.select(col("event_time"))
      .withColumn("end_time", expr("date_add(event_time, interval 1 minute)"))
      .select(col("event_time"), col("end_time"))
      .as[(java.sql.Timestamp, java.sql.Timestamp)]
      .flatMap { case (start, end) =>
        val query = s"select * from anotherTable where timestamp_col between '$start' and '$end'"
        sqlContext.sql(query).as[(String, java.sql.Timestamp, Double)]
      }
      .toDF("col1", "col2", "col3")
    
    newDf.show()
    

    Primero, extraemos la columna event_time del DataFrame y creamos una nueva columna end_time que es event_time + 1 minuto. Luego, convertimos el DataFrame a una tupla de (inicio, fin) y usamos flatMap para aplicar una consulta anidada a cada tupla. La consulta anidada selecciona todas las filas de otra tabla donde el timestamp_col está entre inicio y fin. Finalmente, convertimos el resultado a un DataFrame con columnas llamadas col1, col2 y col3.

    Tenga en cuenta que este enfoque creará una consulta anidada por cada fila en el DataFrame, lo cual podría ser lento si hay muchas filas. Utilizar una unión puede ser una opción más eficiente, especialmente si la condición de unión se puede optimizar.

Comments are closed.