es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

RxSwift – Determinando si un Observable ha sido desechado

Estoy tratando de obtener el Publisher, que envía Observables a sus clientes Consumer, para determinar cuándo uno de sus consumidores ha eliminado su Observable.

Molesta. Mi código funcionaba bien hasta que eliminé un .debug de RxSwift desde el interior del código del Consumer.

¿Existe alguna forma alternativa en la que pueda hacer que esto funcione?


private class Subscriber {
var ids: [Int]
weak var observable: Observable<[Updates]>?
}

class Publisher {
private let relay: BehaviorRelay<[Int: Updates]>
private var subscribers: [Subscriber] = []

func updatesStream(for ids: [Int]) -&gt; Observable&lt;[Updates]&gt; {
    let observable = relay
       .map { map in
           return map
               .filter { ids.contains($0.key) }
               .map { $0.value }
       }
       .filter { !$0.isEmpty }
       .asObservable()

    let subscriber = Subscriber(ids: ids, observable: observable)
    subscribers.append(subscriber)
    return observable
}

private func repeatTimer() {
    let updates: [Updates] = ....

    subscribers.remove(where: { subscriber in
        return subscriber.observable == nil
    })

    relay.accept(updates)
}

}

class Client {
private var disposeBag: DisposeBag?
private let publisher = Publisher()

func startWatching() {
    let disposeBag = DisposeBag()
    self.disposeBag = disposeBag

    publisher
        //.debug(&quot;Consumer&quot;) 
        .updatesStream(for: [1, 2, 3])
        .subscribe(onNext: { values in
            print(values)
        })
       .disposed(by: disposeBag)
}

func stopWatching() {
    disposeBag = nil
}

}

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para determinar cuándo el consumidor ha eliminado un observable, puede utilizar el operador takeUntil para crear un segundo observable que indique cuando el primer observable ha sido eliminado. Luego puede utilizar este segundo observable para eliminar al suscriptor de la lista de suscriptores.

    Aquí está la versión actualizada de la clase Subscriber:

    private class Subscriber {
        var ids: [Int]
        private let disposeBag: DisposeBag
        private let disposedSubject = PublishSubject<void>()
    
        var observable: Observable<[Updates]>? {
            didSet {
                guard let observable = observable else {
                    disposedSubject.onNext(())
                    return
                }
                observable.takeUntil(disposedSubject)
                    .subscribe()
                    .disposed(by: disposeBag)
            }
        }
    
        init(ids: [Int]) {
            self.ids = ids
            self.disposeBag = DisposeBag()
        }
    }
    

    En esta versión de la clase Subscriber, creamos un PublishSubject<void> llamado disposedSubject que se utilizará para indicar cuándo se ha eliminado el observable. Utilizamos didSet para observar los cambios en la propiedad observable. Cuando se establece la propiedad observable, verificamos si es nula y, si lo es, enviamos un valor Void a disposedSubject para indicar que el observable ha sido eliminado. Si la propiedad observable no es nula, creamos un nuevo observable que es una copia de observable, pero con un operador takeUntil que escucha a disposedSubject. Finalmente, nos suscribimos a este nuevo observable y lo eliminamos cuando se libera la memoria.

    Así es como puede utilizar la versión actualizada de la clase Subscriber en la clase Publisher:

    class Publisher {
        private let relay: BehaviorRelay<[Int: Updates]>
        private var subscribers: [Subscriber] = []
    
        func updatesStream(for ids: [Int]) -> Observable<[Updates]> {
            let observable = relay
                .map { map in
                    return map
                        .filter { ids.contains($0.key) }
                        .map { $0.value }
                }
                .filter { !$0.isEmpty }
                .asObservable()
    
            let subscriber = Subscriber(ids: ids)
            subscribers.append(subscriber)
            subscriber.observable = observable
            return observable
        }
    
        private func repeatTimer() {
            let updates: [Updates] = ....
    
            subscribers.removeAll { $0.observable == nil }
    
            relay.accept(updates)
        }
    }
    

    En el método updatesStream(for:), creamos un nuevo objeto Subscriber y lo agregamos a la lista de suscriptores. Luego establecemos la propiedad observable del nuevo Subscriber en el observable creado anteriormente. Cuando se libera la memoria del Subscriber, el disposedSubject en el Subscriber envía un valor Void, lo que activa el operador takeUntil en el observable y lo elimina. Finalmente, en el método repeatTimer(), eliminamos cualquier suscriptor que haya sido eliminado verificando si su propiedad observable es nula.

Comments are closed.