Scala Rx Observable using Monifu

I'm just trying to grasp the concepts between a hot and a cold observable and trying out the Monifu library. My understanding is that the following code should result in only one of the subscriber getting the events emitted by the Observable, but it is not!

scala> :paste
// Entering paste mode (ctrl-D to finish)

import monifu.reactive._
import scala.concurrent.duration._

import monifu.concurrent.Implicits.globalScheduler

val obs = Observable.interval(1.second).take(10)

val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))

// Exiting paste mode, now interpreting.

from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon$5@2c3c615d
x: Unit = ()
y: Unit = ()

scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9

So, this to me looks like the Observable is publishing events to all interested subscribers?

Answers


I'm the primary author of Monifu.

A cold observable means that its subscribe function initiates a new data-source for each subscriber (on each subscribe() call), whereas a hot observable is sharing that same data-source between multiple subscribers.

As an example, consider a file to be the data-source. Lets model a simple Observable that emits the lines from a file:

def fromFile(file: File): Observable[String] = {
  // this is the subscribe function that
  // we are passing to create ;-)
  Observable.create { subscriber =>
    // executing things on our thread-pool
    subscriber.scheduler.execute {
      val source = try {
        Observable.fromIterable(scala.io.Source
          .fromFile(file).getLines().toIterable)
      } 
      catch {
        // subscribe functions must be protected
        case NonFatal(ex) =>
          Observable.error(ex)
      }

      source.unsafeSubscribe(subscriber)
    }
  }
}

This function creates a cold observable. What it means is that it will open a new file handle for each subscribed observer and then read and emit the lines for each subscribed observer.

But we can turn it into a hot observable:

// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()

And then the difference will be when you do this:

val x = observable.subscribe()
val y = observable.subscribe()

If the observable is hot:

  1. the observable doesn't do anything until you call connect() on it
  2. after connect(), the same file is opened and both will receive exactly the same events
  3. after all the lines from that file are emitted, then new subscribers will not get anything because the (shared) data-source has already been depleted

If the observable is cold:

  1. on each subscribe, a new file handle is opened and read
  2. elements are emitted immediately after subscribe(), so no need to wait for a connect()
  3. all observers subscribing will receive all lines from that file, irregardless of the moment they do so

Some references that also apply to Monifu:

  1. Connectable Observable from RxJava's wiki
  2. Intro to Rx: Hot and Cold Observables
  3. Subjects from RxJava's wiki

Need Your Help

Error implementing selection sort in C++

c++ arrays algorithm sorting

I've written this code to sort an array using selection sort, but it doesn't sort the array correctly.

Run a task every x-minutes with Windows Task Scheduler

windows scheduled-tasks

I'm trying to get Windows Task Scheduler to run a particular .exe every 10 minutes or so, but the options only allow for once a day execution.