es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo usar callbackFlow dentro de un flujo?

Estoy tratando de envolver un callbackFlow dentro de un flujo externo flow – hay elementos que me gustaría emitir desde el flujo externo, pero tengo una interfaz de devolución de llamada antigua, que me gustaría adaptar a Kotlin flow. He revisado varios ejemplos de uso de callbackFlow, pero no logro entender cómo activarlo correctamente dentro de otro flujo.

Aquí tienes un ejemplo:

class Processor {
    fun start(processProgress: ProcessProgressListener) {
        processProgress.onFinished() //finaliza tan pronto como comienza!
    }
}

interface ProcessProgressListener {
    fun onFinished()
}

// método principal aquí:
 fun startProcess(processor: Processor): Flow<String> {
     val mainFlow = flow {
         emit("STARTED")
         emit("IN_PROGRESS")
     }

 return merge(processProgressFlow(processor), mainFlow)

}

fun processProgressFlow(processor: Processor) = callbackFlow {
     val listener = object : ProcessProgressListener {
         override fun onFinished() {
             trySend("FINISHED")
         }
     }

 processor.start(listener)

}

El Processor recibe un escucha que se activa cuando el proceso ha finalizado. Cuando eso sucede, me gustaría emitir el elemento final FINISHED.

La forma en que invoco todo el flujo es la siguiente:

runBlocking {
     startProcess(Processor()).collect {
         print(it)
     }
}

Pero, no obtengo ninguna salida en absoluto. Sin embargo, si no uso merge y solo devuelvo mainFlow, sí obtengo los elementos STARTED y IN_PROGRESS.

¿Qué estoy haciendo mal?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El problema con el código dado es que el callbackFlow nunca se activa porque no se está recopilando. Para solucionar esto, podemos recopilar el callbackFlow dentro del operador merge junto con el mainFlow.

    Aquí está el código actualizado:

    class Procesador {
        fun iniciar(progresoProceso: ListenerProgresoProceso) {
            progresoProceso.alFinalizar() // ¡termina tan pronto como comienza!
        }
    }
    
    interface ListenerProgresoProceso {
        fun alFinalizar()
    }
    
    //método principal aquí:
    fun iniciarProceso(procesador: Procesador): Flow<string> {
        val mainFlow = flow {
            emit("INICIADO")
            emit("EN_PROGRESO")
        }
    
        return mainFlow.merge(flujoprogresoProceso(procesador))
    }
    
    fun flujoprogresoProceso(procesador: Procesador) = callbackFlow {
        val listener = object : ListenerProgresoProceso {
            override fun alFinalizar() {
                trySend("FINALIZADO")
                close() // cierra el flujo cuando haya terminado
            }
        }
    
        procesador.iniciar(listener)
    }
    
    

    Aquí, hemos fusionado el mainFlow y el callbackFlow dentro de la función iniciarProceso utilizando el operador merge. Ahora, el callbackFlow se activará cuando el proceso haya terminado y se emitirá el valor final FINALIZADO.

    También agregamos la función close() al callbackFlow dentro de la función alFinalizar() para cerrar el flujo una vez que hayamos emitido el valor final.

    Ahora, cuando ejecutemos el código, deberíamos obtener la siguiente salida:

    INICIADO
    EN_PROGRESO
    FINALIZADO

Comments are closed.