scala - How do I map an Observable with a function from Future to Future? -


suppose have event stream of elements type in:

val observablein: observable[in] = ??? 

and function transforming objects of type in objects of type out, "in future":

val futureintofutureout: (future[in]) => future[out] = ??? 

at point want transform elements of observablein according function futureintofutureout. is, want result event stream of elements of type out, matching elements of original stream, converted via function futureintofutureout.

i think should work:

val observableout: observable[out] = observablein flatmap { in =>   observable.from(futureintofutureout(future(in))) } 

is right? there better way this?

edit:

your solution correct far can tell. if want improve performance bit, consider:

val observableout: observable[out] = observablein.flatmap { in =>   val p = promise.successful(in)   observable.from(futureintofutureout(p.future)) } 

this bit faster, not create asynchronous computation map future future.apply does.

old:

i leaving old suggestion below, works in case mapping single event in observable.

import scala.concurrent._ val promisein = promise[int]() observablein.foreach(x => promisein.trysuccess(x)) val observableout = observable.create { observer =>   promisein.future.map(futureintofutureout).foreach { y =>     observer.onnext(y)     observer.oncompleted()   } } 

explanation

since starting out observable[in] object (i.e. event stream), need find way transfer event observable future. typical way create new future create promise first -- input side of future object. use foreach on observable invoke trysuccess on future when first event arrives:

observablein ---x---> foreach ---x---> promisein.trysuccess(x) 

once event on observable arrives, promise asynchronously completed. can reading side of promise, i.e. future calling future method; , call map on future -- promisein.future.map(futureintofutureout). graphically:

promisein.future ---x---> map ---f(x)---> futureout 

the resulting future asynchronously completed futureintofutureout(x). @ point need find way emit value through observable[out]. typical way create new observable call observable.create factory method. method gives observable's writing end -- observer, use emit events calling onnext:

futureout ---f(x)---> foreach ---f(x)---> observer.onnext(f(x)) 

since know future emits @ single event, call oncompleted on observer, close output observable.

edit: if want master rx , scala futures, may want consider this book, deals these topics. disclaimer: i'm author.


Comments