i'm trying grasp concepts between hot , cold observable , trying out monifu library. understanding following code should result in 1 of subscriber getting events emitted observable, not!
scala> :paste // entering paste mode (ctrl-d finish) import monifu.reactive._ import scala.concurrent.duration._ import monifu.concurrent.implicits.globalscheduler val obs = observable.interval(1.second).take(10) val x = obs.foreach(a => println(s"from x ${a}")) val y = obs.foreach(a => println(s"from y ${a}")) // exiting paste mode, interpreting. x 0 y 0 import monifu.reactive._ import scala.concurrent.duration._ import monifu.concurrent.implicits.globalscheduler obs: monifu.reactive.observable[long] = monifu.reactive.observable$$anon$5@2c3c615d x: unit = () y: unit = () scala> x 1 y 1 x 2 y 2 x 3 y 3 x 4 y 4 x 5 y 5 x 6 y 6 x 7 y 7 x 8 y 8 x 9 y 9 so, me looks observable publishing events interested subscribers?
i'm primary author of monifu.
a cold observable means subscribe function initiates new data-source each subscriber (on each subscribe() call), whereas hot observable sharing same data-source between multiple subscribers.
as example, consider file data-source. lets model simple observable emits lines file:
def fromfile(file: file): observable[string] = { // subscribe function // passing create ;-) observable.create { subscriber => // executing things on our thread-pool subscriber.scheduler.execute { val source = try { observable.fromiterable(scala.io.source .fromfile(file).getlines().toiterable) } catch { // subscribe functions must protected case nonfatal(ex) => observable.error(ex) } source.unsafesubscribe(subscriber) } } } this function creates cold observable. means open new file handle each subscribed observer , read , emit lines each subscribed observer.
but can turn hot observable:
// note: publish() turns cold observable hot 1 val hotobservable = fromfile(file).publish() and difference when this:
val x = observable.subscribe() val y = observable.subscribe() if observable hot:
- the observable doesn't until call
connect()on it - after
connect(), same file opened , both receive same events - after lines file emitted, new subscribers not anything because (shared) data-source has been depleted
if observable cold:
- on each subscribe, new file handle opened , read
- elements emitted after
subscribe(), no need waitconnect() - all observers subscribing receive lines file, irregardless of moment so
some references apply monifu:
Comments
Post a Comment