RxSwift – Determinando si un Observable ha sido desechado
Estoy tratando de obtener el Publisher
, que envía Observables
a sus clientes Consumer
, para determinar cuándo uno de sus consumidores ha eliminado su Observable
.
Molesta. Mi código funcionaba bien hasta que eliminé un .debug
de RxSwift desde el interior del código del Consumer
.
¿Existe alguna forma alternativa en la que pueda hacer que esto funcione?
private class Subscriber {
var ids: [Int]
weak var observable: Observable<[Updates]>?
}
class Publisher {
private let relay: BehaviorRelay<[Int: Updates]>
private var subscribers: [Subscriber] = []
func updatesStream(for ids: [Int]) -> Observable<[Updates]> {
let observable = relay
.map { map in
return map
.filter { ids.contains($0.key) }
.map { $0.value }
}
.filter { !$0.isEmpty }
.asObservable()
let subscriber = Subscriber(ids: ids, observable: observable)
subscribers.append(subscriber)
return observable
}
private func repeatTimer() {
let updates: [Updates] = ....
subscribers.remove(where: { subscriber in
return subscriber.observable == nil
})
relay.accept(updates)
}
}
class Client {
private var disposeBag: DisposeBag?
private let publisher = Publisher()
func startWatching() {
let disposeBag = DisposeBag()
self.disposeBag = disposeBag
publisher
//.debug("Consumer")
.updatesStream(for: [1, 2, 3])
.subscribe(onNext: { values in
print(values)
})
.disposed(by: disposeBag)
}
func stopWatching() {
disposeBag = nil
}
}
davy.ai
Para determinar cuándo el consumidor ha eliminado un observable, puede utilizar el operador
takeUntil
para crear un segundo observable que indique cuando el primer observable ha sido eliminado. Luego puede utilizar este segundo observable para eliminar al suscriptor de la lista de suscriptores.Aquí está la versión actualizada de la clase
Subscriber
:En esta versión de la clase
Subscriber
, creamos unPublishSubject<void>
llamadodisposedSubject
que se utilizará para indicar cuándo se ha eliminado el observable. UtilizamosdidSet
para observar los cambios en la propiedadobservable
. Cuando se establece la propiedadobservable
, verificamos si es nula y, si lo es, enviamos un valorVoid
adisposedSubject
para indicar que el observable ha sido eliminado. Si la propiedadobservable
no es nula, creamos un nuevo observable que es una copia deobservable
, pero con un operadortakeUntil
que escucha adisposedSubject
. Finalmente, nos suscribimos a este nuevo observable y lo eliminamos cuando se libera la memoria.Así es como puede utilizar la versión actualizada de la clase
Subscriber
en la clasePublisher
:En el método
updatesStream(for:)
, creamos un nuevo objetoSubscriber
y lo agregamos a la lista de suscriptores. Luego establecemos la propiedadobservable
del nuevoSubscriber
en elobservable
creado anteriormente. Cuando se libera la memoria delSubscriber
, eldisposedSubject
en elSubscriber
envía un valorVoid
, lo que activa el operadortakeUntil
en el observable y lo elimina. Finalmente, en el métodorepeatTimer()
, eliminamos cualquier suscriptor que haya sido eliminado verificando si su propiedadobservable
es nula.