es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Python unittest mock pyspark chain Python unittest mock permite simular el comportamiento de objetos y funciones en las pruebas unitarias de Python. El módulo pyspark, por su parte, ofrece una interfaz para programar en Spark utilizando el lenguaje Python. La combinación de ambos permite realizar pruebas unitarias en cadenas de procesamiento en Spark.

Me gustaría escribir pruebas unitarias para métodos simples que contienen código de pyspark.

¿Cómo puedo simular la parte de lectura de spark? He intentado esto:

“`
@patch("class<em>to</em>test.SparkSession")
def test<em>do</em>stuff(self, mock<em>spark: MagicMock) -> None:
spark = MagicMock()
spark.read.return</em>value.format.return<em>value.load.return</em>value = \
self.spark.createDataFrame([(1, 2)], ["key2", "c2"])
mock<em>spark.return</em>value = spark

<pre><code>input_df = self.spark.createDataFrame([(1, 1)], ["key1", "c1"])
actual_df = ClassToTest().do_stuff(input_df,"df2","key1","key2")
expected_df = self.spark.createDataFrame([(1, 1, 1, 2)], ["key1", "c1", "key2", "c2"])
assert_pyspark_df_equal(actual_df, expected_df)
</code></pre>

<code>
Pero falla con este error:
</code>py4j.Py4JException: El método join([class java.util.ArrayList, class org.apache.spark.sql.Column, class java.lang.String]) no existe“`
Parece que la simulación no funcionó como esperaba, ¿qué debo hacer para que spark.read.load devuelva el dataframe de prueba que especifiqué?

Editar: código completo aquí

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para simular la parte de lectura de Spark, puedes utilizar el decorador patch del módulo unittest.mock para simular el método spark.read. Luego puedes encadenar los métodos simulados para devolver el DataFrame que deseas probar.

    Aquí tienes un ejemplo de cómo puedes simular el método spark.read en tu prueba:

    from unittest.mock import patch, MagicMock
    
    @patch("pyspark.sql.SparkSession")
    def test_do_stuff(self, mock_spark: MagicMock):
        # crear un DataFrame simulado para devolver en la operación de unión
        expected_df = self.spark.createDataFrame([(1, 1, 1, 2)], ["key1", "c1", "key2", "c2"])
    
        # crear un SparkSession y un DataFrame simulados para devolver en lugar de los objetos reales
        mock_df2 = MagicMock()
        mock_df2.join.return_value = expected_df
        mock_spark.return_value.read.return_value.load.return_value = mock_df2
    
        # crear el input_df para el método
        input_df = self.spark.createDataFrame([(1, 1)], ["key1", "c1"])
    
        # llamar al método que se está probando
        actual_df = ClassToTest().do_stuff(input_df, "path/to/df2", "key1", "key2")
    
        # asegurar que los DataFrames esperados y reales sean iguales
        assert_pyspark_df_equal(actual_df, expected_df)
    

    En este ejemplo, estamos creando un DataFrame simulado (mock_df2) y configurándolo para devolver el DataFrame esperado cuando se llame al método join. Luego, estamos simulando el método read del objeto SparkSession para devolver el DataFrame simulado cuando se llame con load. Finalmente, estamos creando nuestro DataFrame de entrada y llamando al método que se está probando (do_stuff). Por último, estamos asegurando que los DataFrames esperados y reales sean iguales.

Comments are closed.