Sunday, January 29, 2017

Applying RxJava on a Fibonacci Sequence Generator

This post shows how can we marry an exiting method which generates a Collection into an Event Stream and then React to it using RxJava. I have discussed fundamentals of RxJava in this previous post.

Let's assume that we have a method which generates first k Fibonacci sequences and returns them as a Collection.


//Generates first k fibonacci numbers
private static List<Integer> generateFibonacci(int count) {
        List<Integer> resp = new ArrayList<> ();
        resp.add (0);
        resp.add (1);
        for(int i=2; i< count; i++){
            resp.add(resp.get (i-1) + resp.get (i-2));
        }
        return resp;
    }


Reactive Programming is programming with Asynchronous Data Streams. In this previous post, I have discussed fundamentals of RxJava. This post will show different ways to apply RxJava constructs on above method.

Observable Creation

Observable provides methods to convert an array, Collection, Future, or even a normal function into Observable. 

from - Converts an Iterable sequence into an Observable that emits the items in the sequence. 

Observable.from (generateFibonacci (10) );

Please note that, method returns a Collection which is of type Iterable so it can be converted into an Observable. 

fromCallable - Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
Observable.fromCallable (() -> generateFibonacci (10));

Transforming Observable

RxJava provides different operation which can be applied on Observable to transform it. Let transform above observable in such a way that only fibonacci element which are less than 15 gets finally pushed to the subscriber. 
filter - Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.
   //1st Approach
        Predicate<Integer> predicate = new Predicate<Integer> ( ) {
            @Override
            public boolean test(Integer integer) {
                return integer < 15;
            }
        };

        Observable.from (generateFibonacci (10))
                .filter (i -> predicate.test (i));

        //2nd Approach
        Observable.from (generateFibonacci (10))
                .filter (integer -> {
                    return integer < 15;
                });

        //3rd Approach
        Observable.from (generateFibonacci (10))
                .filter ((integer) -> integer < 15);

Subscribing Observable

Observables emit events and subscribers react to it. Both Observable and Subscriber/Observers are independent and they just agree to the contract. 


      Observable.from(generateFibonacci(10))

                .subscribe (System.out::println);



        //Or a more detailed and formal subscriber which gives all 3 event handlers!



        Observable.from (generateFibonacci (10))

                .subscribe(new Subscriber<Integer> () {
                    @Override
                    public void onNext(Integer s) { System.out.println(s); }

                    @Override
                    public void onCompleted() { System.out.println("Completed!"); }

                    @Override
                    public void onError(Throwable e) { System.out.println("Ouch!"); }
                });


--happy Rx !!!

No comments:

Post a Comment