Using RxJs and Angular 2 in order to deal with server-sent events

I am trying to display server-sent events emitted values in an angular 2 /RxJs app.

The backend regularly sends individual strings to the client through server-sent events.

I am not sure how to deal with the retrieved values on the angular 2/RxJs side.

Here is my client (a ng component):

import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable}     from 'rxjs/Observable';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings | async">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    constructor(private http:Http) {
    }

    errorMessage:string;
    someStrings:string[];

    ngOnInit() {
        this.getSomeStrings()
            .subscribe(
                aString => this.someStrings.push(aString),
                error => this.errorMessage = <any>error);
    }

    private getSomeStrings():Observable<string> {
        return this.http.get('interval-sse-observable')
            .map(this.extractData)
            .catch(this.handleError);
    }

    private extractData(res:Response) {
        if (res.status < 200 || res.status >= 300) {
            throw new Error('Bad response status: ' + res.status);
        }
        let body = res.json();
        return body || {};
    }

    private handleError(error:any) {
        // In a real world app, we might send the error to remote logging infrastructure
        let errMsg = error.message || 'Server error';
        console.error(errMsg); // log to console instead
        return Observable.throw(errMsg);
    }
}

The backend method is as follows (and uses RxJava):

   @ResponseStatus(HttpStatus.OK)
   @RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
    public SseEmitter tickSseObservable() {
        return RxResponse.sse(
                Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                        .map(tick -> randomUUID().toString())
        );
    }

I just noticed that the app hangs on the request and that nothing is displayed on the page.

I suspect there is an issue with my use of the map method i.e. .map(this.extractData).

I would just like to add the incoming strings to the array and display that array in the template which would update as the strings come in.

Can anyone please help?

edit: Here is a working solution (thanks to Thierry's answer below):

import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let source = new EventSource('/interval-sse-observable');
        source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
    }
}

Answers


You can't use the Http class of Angular2 to handle server side events since it's based on the XHR object.

You could leverage the EventSource object:

var source = new EventSource('/...');
source.addListener('message', (event) => {
  (...)
});

See these articles:


Here is a working example :

SseService

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';

declare var EventSource;

@Injectable()
export class SseService {

    constructor() {
    }

    observeMessages(sseUrl: string): Observable<string> {
        return new Observable<string>(obs => {
            const es = new EventSource(sseUrl);
            es.addEventListener('message', (evt) => {
                console.log(evt.data);
                obs.next(evt.data);
            });
            return () => es.close();
        });
    }
}

AppComponent

import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>Angular Server-Sent Events</h1>
    <ul>
        <li *ngFor="let message of messages">
             {{ message }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit, OnDestroy {
    private sseStream: Subscription;
    messages:Array<string> = [];

    constructor(private sseService: SseService){
    }

    ngOnInit() {
        this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
                        .subscribe(message => {
                            messages.push(message);
                        });
    }

    ngOnDestroy() {
        if (this.sseStream) {
            this.sseStream.unsubscribe();
        }
    }
}

To add to Thierry's answer, By Default the event type is 'message'. However the event type could be anything like ('chat', 'log' etc.,) based on the server side implementation. In my case, the first two events from the server were 'message' and the rest of them were 'log'. My code looks as below

var source = new EventSource('/...'); source.addListener('message', message => { (...) }); source.addListener('log', log => { (...) });


Need Your Help

Determining version of easy_install/setuptools

python setuptools easy-install

I'm trying to install couchapp, which uses easy_install - and is quite explicit in stating a particular version of easy_install/setuptools is needed: 0.6c6. I seem to have easy_install already on m...

How big should Action Bar(Toolbar) icons be in the new Android Material design?

android android-5.0-lollipop material-design

I tried searching the design guidelines, but I couldn't find anything related to this. I am trying to figure out the optimal size for Action bar icons in the new L release. Here is an example of wh...