suppose have event stream of elements type in:
val observablein: observable[in] = ??? and function transforming objects of type in objects of type out, "in future":
val futureintofutureout: (future[in]) => future[out] = ??? at point want transform elements of observablein according function futureintofutureout. is, want result event stream of elements of type out, matching elements of original stream, converted via function futureintofutureout.
i think should work:
val observableout: observable[out] = observablein flatmap { in => observable.from(futureintofutureout(future(in))) } is right? there better way this?
edit:
your solution correct far can tell. if want improve performance bit, consider:
val observableout: observable[out] = observablein.flatmap { in => val p = promise.successful(in) observable.from(futureintofutureout(p.future)) } this bit faster, not create asynchronous computation map future future.apply does.
old:
i leaving old suggestion below, works in case mapping single event in observable.
import scala.concurrent._ val promisein = promise[int]() observablein.foreach(x => promisein.trysuccess(x)) val observableout = observable.create { observer => promisein.future.map(futureintofutureout).foreach { y => observer.onnext(y) observer.oncompleted() } } explanation
since starting out observable[in] object (i.e. event stream), need find way transfer event observable future. typical way create new future create promise first -- input side of future object. use foreach on observable invoke trysuccess on future when first event arrives:
observablein ---x---> foreach ---x---> promisein.trysuccess(x) once event on observable arrives, promise asynchronously completed. can reading side of promise, i.e. future calling future method; , call map on future -- promisein.future.map(futureintofutureout). graphically:
promisein.future ---x---> map ---f(x)---> futureout the resulting future asynchronously completed futureintofutureout(x). @ point need find way emit value through observable[out]. typical way create new observable call observable.create factory method. method gives observable's writing end -- observer, use emit events calling onnext:
futureout ---f(x)---> foreach ---f(x)---> observer.onnext(f(x)) since know future emits @ single event, call oncompleted on observer, close output observable.
edit: if want master rx , scala futures, may want consider this book, deals these topics. disclaimer: i'm author.
Comments
Post a Comment