es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

La función IO suspendida nunca regresa.

Tengo dificultades para escribir un bucle de recepción de mensajes UDP para Android.

En el siguiente código, en receiveLoop, la llamada a receiveMessages nunca retorna y, por lo tanto, nunca ingreso al bucle de tratamiento de mensajes. Ten en cuenta que todavía puedo recibir paquetes, pero se detiene cuando el búfer del canal está lleno.

Esperaría que receiveMessages devuelva inmediatamente, mientras que el bucle de bloqueo de E/S dentro de él seguiría funcionando para siempre.

class MySocketUDP(private val params: SocketParams) {

    private val rcvSocket: DatagramSocket by lazy {
        val sock = DatagramSocket(params.rcvPort)
        sock.reuseAddress = true
        sock.soTimeout = 1000
        sock
    }

    suspend fun receiveMessages(channel: SendChannel<message>) {
        withContext(Dispatchers.IO) {
            val buf = ByteArray(MAX_MSG_SIZE)
            while (true) {
                val pkt = DatagramPacket(buf, buf.size)
                try {
                    if (channel.isClosedForSend) {
                        break
                    }
                    rcvSocket.receive(pkt)
                    val msg = packetToMessage(buf, 0, pkt.length)
                    Log.d("SOCKET", "filling channel with $msg")
                    channel.send(msg)
                } catch (ex: SocketTimeoutException) {
                } catch (ex: CancellationException) {
                    break
                }
            }
        }
    }
}

class MyModel {

    private suspend fun receiveLoop(socket: MySocketUDP) {
        withContext(Dispatchers.Main) {
            val channel = Channel<message>(16)
            socket.receiveMessages(channel)
            Log.d("MODEL", "Entering msg loop")
            for (msg in channel) {
                dispatchRcvMessage(msg)
            }
        }
    }
}
  1. ¿Por qué receiveMessages nunca devuelve mientras se ejecuta en el despachador IO y es llamado desde el despachador Main?
  2. ¿Necesito realmente iniciar un hilo para este tipo de trabajo de productor/consumidor?
  3. ¿Puedes mostrar cómo lograr un código de bloqueo largo de manera “amigable con las coroutines”?

Gracias

Tags:  , , , ,

Answer

  1. Avatar for davy.ai
    1. receiveMessages nunca retorna porque ejecuta un bucle infinito con llamadas de bloqueo de E/S, y es llamado desde el despachador Main. Esto significa que el despachador Main está esperando a que receiveMessages se complete antes de continuar, pero receiveMessages nunca termina porque está bloqueado en la llamada rcvSocket.receive. Esto resulta en un bloqueo.
    2. No, no es necesario iniciar un hilo separado. Puedes usar corutinas para manejar las llamadas de bloqueo de E/S sin bloquear el despachador Main.

    3. Aquí tienes un ejemplo de cómo puedes lograr esto de manera compatible con las corutinas:

    class MySocketUDP(private val params: SocketParams) {
    
        private val rcvSocket: DatagramSocket by lazy {
            val sock = DatagramSocket(params.rcvPort)
            sock.reuseAddress = true
            sock.soTimeout = 1000
            sock
        }
    
        suspend fun receiveMessages(channel: SendChannel<message>) {
            withContext(Dispatchers.IO) {
                val buf = ByteArray(MAX_MSG_SIZE)
                while (true) {
                    val pkt = DatagramPacket(buf, buf.size)
                    try {
                        rcvSocket.receive(pkt)
                        val msg = packetToMessage(buf, 0, pkt.length)
                        Log.d("SOCKET", "filling channel with $msg")
                        channel.send(msg)
                    } catch (ex: SocketTimeoutException) {
                    } catch (ex: CancellationException) {
                        break
                    } catch (ex: Exception) {
                        // maneja otras excepciones aquí
                    }
                }
            }
        }
    }
    
    class MyModel {
    
        private val scope = CoroutineScope(Job() + Dispatchers.Main)
    
        private fun receiveLoop(socket: MySocketUDP) = scope.launch {
            val channel = Channel<message>(16)
            socket.receiveMessages(channel)
            Log.d("MODEL", "Entering msg loop")
            for (msg in channel) {
                dispatchRcvMessage(msg)
            }
        }
    
        fun stop() {
            scope.cancel()
        }
    }
    

    En este ejemplo, utilizamos la función withContext para cambiar al despachador IO antes de realizar las llamadas de bloqueo de E/S. También capturamos otras excepciones que pueden ser lanzadas durante el bucle de recepción.

    En MyModel, creamos un CoroutineScope con el despachador Main y un Job. Lanzamos la función receiveLoop en una corutina y almacenamos el Job resultante. Luego, podemos cancelar el Job cuando necesitemos detener el bucle de recepción.

    Ten en cuenta que este ejemplo usa launch en lugar de async. No necesitamos retornar un resultado desde receiveLoop, por lo que no es necesario usar async. Si necesitas retornar un resultado, utiliza async en su lugar.

Comments are closed.