Thursday, January 26, 2017

Reactive Programming through RxJava

Rx was originally developed as Reactive Extension for C#. Later, Netflix ported it for JVM platform under Apache license, named as RxJava!

RxJava is also an extension of Observer Design Pattern.This post, I will be focusing mostly on the fundamentals and constructs of RxJava.

Rx Building Blocks

The two fundamental constructs of Rx are Observables and Subscribers. Observables abstract event stream and emit them asynchronously to the subscibers (or observers).  There could be zero or more events and it terminates either by successfully completing or ending to an error. Streams can be created out of anything- list of integers, user clicks in UI, DB queries, tweeter feeds and what not. It's up to you, you can convert any sequence of data, input, function etc into a stream. And on top of streams, we have the functional toolkit to filter, modify, combine, merge these events/streams. We can merge two streams to form a single stream, map values from one stream to another. Below image shows the UI click represented as events. An instance of Subscription represents the connection between Observables and Subscribers. 



Observables: rx.Observable<T>

Observable is the first core component of Rx. Let's see what are ways through which observables can be created-

//1
List<Integer> list = Arrays.asList (1,2,3,4 );
Observable<List<Integer>> listObservable = Observable.just (list);

//2

Observable.from(Arrays.asList(1,2,3)); 
//from can take an Array, Iterable and Future

//3

Callable<String> call = () -> function1();
Observable.fromCallable(call)

Observable.fromCallable(()-> {throw new RuntimeException("some error")}); 




The observable class provides method- subscribe which will be used to push values to the observers. The subscriber is basically an implementation of Observer interface. Observable pushes three kinds of events as shown below as part of Observer interface. 

Observer
Provides a mechanism for receiving push-based notification. This interface provides three methods which will get called accordingly.

public Interface Observer<T>{
      void onCompleted();  
      void onError(Throwable e);
     void onNext(T t);
}

onCompleted() notifies the observer that observable has finished sending events.
onError() notifies the observer that observable has encountered an error / unhandled exception.
onNext() provides the subscribed observer with the new item/event.

Usually, in the code, you will not be able to explicitly notice Observers and those three callback methods; thanks to lambda and some other shorthands. To subscribe to an Observable, it's not necessary to provide an instance of Observer at all. Also, it's not always necessary to provide an implementation of onNext, onCompleted and onError; you could just provide a subset of them.


Consuming Observables (i.e. Subscribing)

Consuming Observables basically means Subscribing to them. Only when you subscribe, you can receive events abstracted by observable. 

Observable
                .just(1, 2, 3)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Completed Observable.");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.err.println("Whoops: " + throwable.getMessage());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("Got: " + integer);
                    }
                });

Output

Got: 1
Got: 2
Got: 3

Completed Observable.

A well-formed Observable will call onNext 0 or more times and then will call either onError or onNext exactly once. 


Now let's simulate through an example; how observable can throw an Error.


Observable
                .just(1, 2, 3)
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        if (integer.equals(2)) {
                            throw new RuntimeException("I don't like 2");
                        }
                    }
                })
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Completed Observable.");
                    }

                    @Override

                    public void onError(Throwable throwable) {
                        System.err.println("Oops: " + throwable.getMessage());
                    }

                    @Override

                    public void onNext(Integer integer) {
                        System.out.println("Got: " + integer);
                    }
                });


Output

Got: 1
Oops: I don't like 2

The first value gets through without any obstacle, but the second value throws an exception and then terminates the Observable. This will get confirmed if you run above snippet.

We don't need to always implement full subscriber every time. 
        Observable
                .just(1, 2, 3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("Got: " + integer);
                    }
                });

    
In this case, if error happens it will come on your face as there is no handler for that. It's best to always implement error handler right from the beginning.


Note: 
RxJava is single threaded by default.
To understand different operations visually: http://rxmarbles.com/

References:

http://reactivex.io/tutorials.html
https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
http://docs.couchbase.com/developer/java-2.0/observables.html
http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/
http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/



No comments:

Post a Comment