android - In RxJava, how do I start a potentially infinite stream of events generated from an API? -
i have api available clients can simplified this:
public class api { public void sendevent(event e); } event instances enter system whenever client calls api (technically on binder service derivative) processed, filtered , dispatched other internal components. don't care past events, available time subscriber subscribes. seems natural fit rx paradigm i'm getting feet wet with.
i need observable created once, allows multiple subscribers, , can fed instances of event sent through reactive pipeline observers. subject seems appropriate i'm looking (in particular, this answer this question resonated me).
what other rxjava users recommend?
for example, following on short comment:
public class api implements onsubscribe<event> { private list<subscriber<event>> subscribers = new arraylist<>(); public void sendevent(event event) { // whatever need event (subscriber<event> sub : subscribers) { sub.onnext(event); } } public void call(subscriber<event> sub) { subscribers.add(sub); } } then have instance somewhere: api api = ...
your observable obtained so: observable.create(api); can normal thing observable.
the filtering of unsubscribed subscribers left exercise reader.
edit
a little more research shows publishsubject should help:
public class api { private publishsubject<event> subject = publishsubject.create(); public void sendevent(event event) { // whatever need event // publish subject.onnext(event); } public observable<event> getobservable() { return subject.asobservable(); } } this way, can subscribe observable, , every time event sent api, published subscribers.
use this:
api api = ...; api.getobservable().subscribe(event -> dostuffwithevent(event)); api.getobservable().subscribe(event -> dootherstuffwithevent(event));
Comments
Post a Comment