es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tag: PYTHON-ASYNCIO

Cambiar de python-opcua a opcua-asyncio.

Ahora tengo un paquete con una clase que se ve así: class Opc(object): def __init__(self): client = Client(“server_url”) client.connect() opc = Opc() Ahora quiero usar la biblioteca opcua-asyncio, por lo que necesito usar una función asíncrona para conectarme al servidor, pero no puedo esperarla desde el método init. ¿Cómo puedo . . . Read more

Cómo solucionar – TypeError: el objeto StreamReader no puede usarse en una expresión ‘await’.

Estoy usando python(3.8.8) aiohttp y asyncio para hacer solicitudes http asíncronas. Sin embargo, cuando intento esperar una llamada a resp.content, recibo el mensaje de error: “TypeError: el objeto StreamReader no puede ser utilizado en una expresión ‘await’”. Error en la última llamada realizada: Archivo “test_aiohttp.py”, línea 34, en get_country_wrapper country_lst . . . Read more

Cómo llamar a una función asíncrona en pytest_sessionfinish()?

Estoy utilizando pytest-asyncio. Tengo el siguiente archivo conftest.py: import asyncio import pytest from database.mongo_db import mongo @pytest.fixture(scope=”session”, autouse=True) async def initialise_db(): await mongo.connect_client() await mongo.drop_db() @pytest.fixture(scope=”session”) def event_loop(): yield asyncio.new_event_loop() La función initialise_db() se conectará a mi base de datos y la limpiará antes de que se ejecuten todas mis . . . Read more

La clase IsolatedAsyncioTestCase de unitest no funciona.

Necesito usar IsolatedAsyncioTestCase para realizar una verificación asíncrona. ¿Por qué mi código no se ejecuta en paralelo o de manera asíncrona? Necesito que test2, test3 y test4 se ejecuten de manera asíncrona. async def check_status(name:str): print(f”antes {name}”) await asyncio.sleep(5) print(f”después {name}”) return True class TestAsync(IsolatedAsyncioTestCase): async def test_002(self): print(“test_002”) res . . . Read more

cancelación automática de sub tareas en asyncio

Si una tarea asincrónica task_parent crea sub tareas task_child, pero task_parent es cancelada debido a una excepción que se produce después de que se haya creado task_child, ¿se cancela automáticamente también task_child (si no está protegida con asyncio.shield)? Por ejemplo, en el siguiente código: async def f(): t1 = asyncio.create_task(coroutine1()) . . . Read more