Tag: PYTHON-ASYNCIO
Ahora tengo un paquete con una clase que se ve así: class Opc(object): def __init__(self): client = Client(“server_url”) client.connect() opc = Opc() Ahora quiero usar la biblioteca opcua-asyncio, por lo que necesito usar una función asíncrona para conectarme al servidor, pero no puedo esperarla desde el método init. ¿Cómo puedo . . . Read more
Estoy usando python(3.8.8) aiohttp y asyncio para hacer solicitudes http asíncronas. Sin embargo, cuando intento esperar una llamada a resp.content, recibo el mensaje de error: “TypeError: el objeto StreamReader no puede ser utilizado en una expresión ‘await’”. Error en la última llamada realizada: Archivo “test_aiohttp.py”, línea 34, en get_country_wrapper country_lst . . . Read more
Estoy utilizando pytest-asyncio. Tengo el siguiente archivo conftest.py: import asyncio import pytest from database.mongo_db import mongo @pytest.fixture(scope=”session”, autouse=True) async def initialise_db(): await mongo.connect_client() await mongo.drop_db() @pytest.fixture(scope=”session”) def event_loop(): yield asyncio.new_event_loop() La función initialise_db() se conectará a mi base de datos y la limpiará antes de que se ejecuten todas mis . . . Read more
Necesito usar IsolatedAsyncioTestCase para realizar una verificación asíncrona. ¿Por qué mi código no se ejecuta en paralelo o de manera asíncrona? Necesito que test2, test3 y test4 se ejecuten de manera asíncrona. async def check_status(name:str): print(f”antes {name}”) await asyncio.sleep(5) print(f”después {name}”) return True class TestAsync(IsolatedAsyncioTestCase): async def test_002(self): print(“test_002”) res . . . Read more
Si una tarea asincrónica task_parent crea sub tareas task_child, pero task_parent es cancelada debido a una excepción que se produce después de que se haya creado task_child, ¿se cancela automáticamente también task_child (si no está protegida con asyncio.shield)? Por ejemplo, en el siguiente código: async def f(): t1 = asyncio.create_task(coroutine1()) . . . Read more