es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Conectarse a Hive con el controlador jdbc en Spark.

Necesito transferir datos de Hive remoto a Hive local con Spark. Intento conectarme a Hive remoto con el controlador JDBC: ‘org.apache.hive.jdbc.HiveDriver’. Ahora intento leer desde Hive y el resultado son los encabezados de columna en lugar de los datos reales:

df = self.spark_session.read.format('JDBC') \
         .option('url', 'jdbc:hive2://{self.host}:{self.port}/{self.database}') \
         .option('driver', 'org.apache.hive.jdbc.HiveDriver') \
         .option("user", self.username) \
         .option("password", self.password) \
         .option('dbtable', 'test_table') \
         .load()

df.show()

Resultado:

+----------+
|str_column|
+----------+
|str_column|
|str_column|
|str_column|
|str_column|
|str_column|
+----------+

Sé que Hive JDBC no es un soporte oficial en Apache Spark. Pero ya he encontrado soluciones para descargar desde otras fuentes no compatibles, como IMB Informix. Tal vez alguien ya haya resuelto este problema.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Posibles soluciones:

    1. Problema de inferencia de esquema

    Es posible que los encabezados de columna en el resultado indiquen que Spark no puede inferir correctamente el esquema desde la conexión JDBC a Hive. Puede intentar establecer explícitamente el esquema para el DataFrame especificando los nombres de columna y tipos en un objeto StructType o utilizando la opción inferSchema con un DataFrame de ejemplo:

    from pyspark.sql.types import StructType, StructField, StringType
    
    schema = StructType([
        StructField("col1", StringType()),
        StructField("col2", StringType()),
        StructField("col3", StringType())
    ])
    
    df = spark.read.format('jdbc') \
                 .option('url', 'jdbc:hive2://{host}:{port}/{database}') \
                 .option('driver', 'org.apache.hive.jdbc.HiveDriver') \
                 .option('user', username) \
                 .option('password', password) \
                 .option('dbtable', 'test_table') \
                 .option('fetchsize', 10000) \
                 .option('batchsize', 10000) \
                 .option('numPartitions', 8) \
                 .option('partitionColumn', 'partition_col') \
                 .option('lowerBound', 0) \
                 .option('upperBound', 1000000) \
                 .option('inferSchema', True) \
                 .schema(schema) \
                 .load()
    
    df.show()
    
    1. Problema de compatibilidad de tipos de datos

    También es posible que los tipos de datos en su tabla remota de Hive no sean compatibles con los tipos predeterminados utilizados por Spark, lo que resulta en resultados inesperados. Puede intentar convertir explícitamente las columnas a los tipos correctos en la consulta SQL o utilizando el método .cast() en el DataFrame:

    from pyspark.sql.functions import col
    
    df = spark.read.format('jdbc') \
                 .option('url', 'jdbc:hive2://{host}:{port}/{database}') \
                 .option('driver', 'org.apache.hive.jdbc.HiveDriver') \
                 .option('user', username) \
                 .option('password', password) \
                 .option('dbtable', 'test_table') \
                 .load()
    
    df = df.select(col('int_column').cast('integer'),
                   col('float_column').cast('float'),
                   col('str_column').cast('string'))
    
    df.show()
    
    1. Problema de muestreo de datos

    Si los datos en la tabla remota de Hive son demasiado grandes o complejos para cargar completamente en Spark, es posible que necesite muestrear los datos usando una consulta SQL que limite el número de filas. Puede utilizar el método .option() para especificar la consulta SQL:

    df = spark.read.format('jdbc') \
                 .option('url', 'jdbc:hive2://{host}:{port}/{database}') \
                 .option('driver', 'org.apache.hive.jdbc.HiveDriver') \
                 .option('user', username) \
                 .option('password', password) \
                 .option('query', 'SELECT * FROM test_table LIMIT 10000') \
                 .load()
    
    df.show()
    
    1. Problema de versión no compatible

    Finalmente, es posible que la versión del controlador JDBC que esté utilizando no sea compatible con Spark. Puede intentar actualizar o cambiar la versión del controlador JDBC o la de Spark hasta que encuentre una combinación compatible. Alternativamente, puede utilizar otros métodos para transferir los datos de Hive remoto a Hive local, como Sqoop, Apache Nifi o scripts personalizados.

Comments are closed.