es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Spark: Cómo utilizar una tabla temporal para filtrar los resultados al usar sqlContext.sql y particionar por columnas.

Tengo el siguiente código:

DataFrame idDF = sparkHelper.loadIds();
List<String> idList = idDF.javaRDD().map(row -> {
            Integer id = row.getAs("id");
            return Integer.toString(id);
        }).collect();
String ids = String.join(",", idList);

String sql = "(SELECT a.* from TableA a where a.id in (" + ids + ")) as tl";

Map<String,String> props = getDbConnectionProps(); // Configuración básica de conexiones.
props.put("dbtable", sql);
props.put("fetchSize", "100000");
props.put("partitionColumn", "col1");
props.put("lowerBound", "1");
props.put("upperBound", "1000");
props.put("numPartitions", "10");

DataFrame aDF = sqlContext.read().format("jdbc").options(props).load();

El beneficio de hacer esto es que puedo especificar algunas propiedades al consultar la tabla A, por ejemplo, la columna de partición.

Sin embargo, ahora se considera reemplazar la idea de construir la variable ids por una tabla temporal que pueda reutilizarse en muchas cargas de datos SQL. Me pregunto si hay alguna manera de hacerlo y mantener la partición de datos de la misma manera que antes.

Pensamiento 1: Estaba pensando en hacer esto

DataFrame idDF = sparkHelper.loadIds();
idDF.registerTempTable("temp_table_ids");
DataFrame aDF = sqlContext.sql("select a.* from TableA a inner join temp_table_ids ti on a.id = ti.id");

Ahora, mi pregunta es cómo puedo asegurarme de que aDF esté particionada por la columna col1 de la misma manera que el código actual. Si cargamos primero aDF y luego lo reparticionamos, creo que no es tan eficiente como pasar la información de la columna de partición al cargar los datos como lo hicimos antes. ¿Estoy en lo correcto?

Pensamiento 2: Si el método anterior no es posible, ¿puedo crear realmente una tabla temporal en la base de datos MySQL y usarla en la carga de datos del contexto Hive? ¿Será posible hacer esto? Hice una prueba creando una tabla temporal tempTableA.

sqlContext.sql(“DROP TEMPORARY TABLE IF EXISTS tempTableA”);
sqlContext.sql(“CREATE TEMPORARY TABLE tempTableA (id INT NOT NULL, PRIMARY KEY(accountId))”);

y usarla en el código “actual” (reemplazando la cadena de ids por la unión interna). Sin embargo, no puede ser reconocida. El error que obtuve es “no se puede reconocer la entrada cerca de ‘DROP’ ‘ TEMPORARY ‘ ‘TABLE’ en la declaración DDL”.

Por favor ayuda. Gracias.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Estás buscando una manera de particionar los datos mientras usas una tabla temporal.

    Idea 1: Puedes intentar usar el método repartition en el DataFrame para particionar los datos. Esto te permitirá especificar la columna de partición y el número de particiones.

    Idea 2: Puedes crear una tabla temporal en MySQL y usarla en la carga de datos del contexto de Hive. Sin embargo, es posible que necesites usar la declaración CREATE TEMPORARY TABLE en Hive para crear la tabla temporal.

Comments are closed.