Cómo simular un receptor de Azure ServiceBus en Python usando MagicMock
Estoy utilizando el paquete Python Azure ServiceBus y tengo el siguiente fragmento de código:
<h1>receiver.py</h1>
import logging
logger = logging.getLogger("test")
def receive_message(
connection_str,
queue_name
) -> None:
"""
Llama a la API de Azure ServiceBus para obtener el mensaje de una cola.
"""
with ServiceBusClient.from_connection_string(
connection_str, logging_enable=True
) as servicebus_client:
<pre><code> with servicebus_client.get_queue_receiver(
queue_name=queue_name,
max_wait_time=20,
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
) as receiver:
for message in receiver:
logger.debug(f"Received message {message}")
</code></pre>
Intento escribir pruebas unitarias para esta función y quiero poder hacer una simulación de “mock” del receiver
. Este es mi intento de escribir la prueba unitaria, que falla porque no puedo hacer que la prueba entre en el bloque for message in receiver
.
<h1>test_receiver.py</h1>
@patch("receiver.ServiceBusClient")
@patch("receiver.logger")
def test_receive_message(mock_logger, mock_svc_bus_client):
<pre><code># Intento hacer una simulación de "mock":
mock_svc_bus_client.from_connection_string.return_value.get_queue_receiver.return_value = iter(["message"])
receive_message("mock_connection_str", "mock_q_name")
# La aserción falla
mock_logger.return_value.debug.assert_called_once()
</code></pre>
davy.ai
Para crear un objeto de prueba (‘mock’) que se comporte como el objeto
receiver
en la funciónreceive_message
, puede utilizar la funciónunittest.mock.MagicMock()
. A continuación se muestra una versión actualizada del código de prueba que usa el objeto de prueba:En este código actualizado, creamos un objeto
MagicMock
llamadoqueue_receiver_mock
que se comporta como el objetoreceiver
. A continuación, establecemos el método__iter__
del objetoqueue_receiver_mock
para que devuelva una lista que contiene un mensaje ("mensaje"
). Finalmente, establecemos que el objetoqueue_receiver_mock
simulado (“mock”) sea devuelto por el métodoget_queue_receiver
del objetoServiceBusClient
.Con este objeto simulado en su lugar, el bucle
for message in receiver
dentro de la funciónreceive_message
iterará sobre la lista que contiene un mensaje, y la llamadalogger.debug
dentro del bucle se llamará una vez con el mensaje esperado. La prueba pasará si se cumple esta afirmación.