rx java - Scala Rx Observable using Monifu -


i'm trying grasp concepts between hot , cold observable , trying out monifu library. understanding following code should result in 1 of subscriber getting events emitted observable, not!

scala> :paste // entering paste mode (ctrl-d finish)  import monifu.reactive._ import scala.concurrent.duration._  import monifu.concurrent.implicits.globalscheduler  val obs = observable.interval(1.second).take(10)  val x = obs.foreach(a => println(s"from x ${a}")) val y = obs.foreach(a => println(s"from y ${a}"))  // exiting paste mode, interpreting.  x 0 y 0 import monifu.reactive._ import scala.concurrent.duration._ import monifu.concurrent.implicits.globalscheduler obs: monifu.reactive.observable[long] = monifu.reactive.observable$$anon$5@2c3c615d x: unit = () y: unit = ()  scala> x 1 y 1 x 2 y 2 x 3 y 3 x 4 y 4 x 5 y 5 x 6 y 6 x 7 y 7 x 8 y 8 x 9 y 9 

so, me looks observable publishing events interested subscribers?

i'm primary author of monifu.

a cold observable means subscribe function initiates new data-source each subscriber (on each subscribe() call), whereas hot observable sharing same data-source between multiple subscribers.

as example, consider file data-source. lets model simple observable emits lines file:

def fromfile(file: file): observable[string] = {   // subscribe function   // passing create ;-)   observable.create { subscriber =>     // executing things on our thread-pool     subscriber.scheduler.execute {       val source = try {         observable.fromiterable(scala.io.source           .fromfile(file).getlines().toiterable)       }        catch {         // subscribe functions must protected         case nonfatal(ex) =>           observable.error(ex)       }        source.unsafesubscribe(subscriber)     }   } } 

this function creates cold observable. means open new file handle each subscribed observer , read , emit lines each subscribed observer.

but can turn hot observable:

// note: publish() turns cold observable hot 1 val hotobservable = fromfile(file).publish() 

and difference when this:

val x = observable.subscribe() val y = observable.subscribe() 

if observable hot:

  1. the observable doesn't until call connect() on it
  2. after connect(), same file opened , both receive same events
  3. after lines file emitted, new subscribers not anything because (shared) data-source has been depleted

if observable cold:

  1. on each subscribe, new file handle opened , read
  2. elements emitted after subscribe(), no need wait connect()
  3. all observers subscribing receive lines file, irregardless of moment so

some references apply monifu:

  1. connectable observable rxjava's wiki
  2. intro rx: hot , cold observables
  3. subjects rxjava's wiki

Comments