Rx Reactive Streams Connectable Observables

Reading through this document:

https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators

I have some questions governing the rules on which Connectable Observables work! This is what is explained in the Wiki:

The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 

Example #1:

def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
    { println("Subscriber #2:" + it); },       // onNext
    { println("Error: " + it.getMessage()); }, // onError
    { println("Sequence #2 complete"); }       // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete

Example #2:

def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
   { println("Subscriber #2:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #2 complete"); }       // onCompleted
);

firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete

What I do not get is the following:

  1. In the Example #2, how will the Observable know that there are two subscribers (in this case)

  2. How to deal with scenarios where one of the subscriber is slow.

Answers


To answer your first question: Because two calls to subscribe are made.

To answer your second question: If this is a concern, you need to apply some backpressure algorithms to the observers which might be slow.

The RxJS repo has a good introduction to some of the backpressure options available in RxJS. I'm not sure how many of the mentioned operators exist in RxJava. I know some of them do.


Need Your Help

Django - Correct way to pass arguments to CBV decorators?

python django django-class-based-views python-decorators django-generic-views

The docs feature nice options for applying decorators such as login_required to Class Based Views.