Post

Reactive Programming with RxJava: Part 1


Reactive Programming with RxJava: Part 1

So you have head of reactive programming and may be you are wondering what’s all this hype about. In this series, I would try to introduce you to the reactive paradigm and would share my experience related to it. We can use reactive programming anywhere, be it frontend, backend or mobile development. In this series we would mostly discuss about backend services using Java. But mentioned concepts can be applies in all the contexts.

Why do we need reactive programming

So, let’s understand the problem first? Why do we require Reactive Programming? Because if there is no problem, then we don’t need a solution right??

Whenever we make a request to an api or call I/O operation like Reading a file or Database access, We initialise a request using a thread and wait for that operation to be completed. During this time the thread, we used to perform the operation, is keep on waiting and cannot be used by any other operation.

So for example, if you have 4 threads to serve the request from a service then maximum number requests which can be severed from the service is only 4. And if all the threads are engaged to perform I/O operation, You cannot handle any more requests . If you make any further request to the service, service will queue the request and wait for one of the 4 thread to be finished. Thus for 5th request to your service will not respond and that’s what a consumer would say “Slow or non responsive” service.

Ideal case would have been that if any operation is waiting, it should release the thread and that thread could be used to serve other requests.

This is exactly where Reactive Programming come into the picture. It is nothing but the Asynchronous programming which make use of Non blocking constructs to perform any operation.

Libraries

Different Languages have different constructs for the Asynchronous programming like JavaScript has async/await, Kotlin has Coroutines and Java has Future. But it’s not easy to write the asynchronous code. It has more boiler plate and you need to take care of threading, and if you have ever worked with the Threads before, you know what I am talking about.

Fortunately, we have different libraries which abstract aways above mentioned complications and provide simple and intuitive API for programming. For Java we have below libraries available

You can read about the libraries on their respective pages and compare them. For the purpose of understanding the basics of reactive programming we would use RxJava in this series.

In this series we would discuss about below

  1. RxJava Introduction
  2. Observable and Subscriber
  3. Understanding Observable and Observer Events
  4. Type of Observable
  5. Creating Observable
  6. Important Transformational Operators
  7. Error Handling

RxJava

RxJava is a Java VM implementation of ReactiveX, a library for composing asynchronous and event-based programs by using observable sequences.

The building blocks of RxJava are Observables and Subscribers. Observable is used for emitting items and Subscriber is used for consuming those items. Addition to this, there are set of methods for modifying and composing the data.

Observables

Observables are the sources for the data. Usually they start providing data once a subscriber starts listening. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error. Sources may never terminate, for example, an observable for a button click can potentially produce an infinite stream of events.

Subscribers

A observable can have any number of subscribers. If a new item is emitted from the observable, the onNext() method is called on each subscriber. If the observable finishes its data flow successful, the onComplete() method is called on each subscriber. Similar, if the observable finishes its data flow with an error, the onError() method is called on each subscriber.

SetUp

Before going further into the explanations and examples, let’s setup the project. So that you can try the examples on your machine if you like to. Create a simple maven project and add below dependency to the project. You can refer to this rxjava-tutorial repository as well. Most of the code samples in this blog is from this repository.

<dependency>            
 <groupId>io.reactivex.rxjava2</groupId>
 <artifactId>rxjava</artifactId>            
 <version>2.2.10</version>        
</dependency>

Understanding Observable and Observer Events

Observable Events

Observable is source of the event in the RxJava world. It generally sends 3 types of events representing actual item (onNext), end of stream (onComplete) and error (onError) if any error occurs while emitting events. But type of events differ from one type of observer to other type. We will discuss all the types of Observables later in the blog.

From the implementation point of view Observable call the OnNext, OnCompleted and OnError methods of subscribed Observer to emit above mentioned events.

Observer/Subscriber Events

Observer subscribe to Observable. It is notified when there is an event from the Observable and then It can react to those events accordingly.

There are three methods on the observer interface that we want to know about :

  1. OnNext is called on our observer each time a new event is published to the attached Observable. This is the method where we’ll perform some action on each event
  2. OnCompleted is called when the sequence of events associated with an Observable is complete, indicating that we should not expect any more onNext calls on our observer
  3. OnError is called when an unhandled exception is thrown during the RxJava framework code or our event handling code

In the below example we have created an Observable which emits 3 events and one onComplete event. We also have Observer which react to the events coming from Observable. At the end observer has subscribed to observable. You can check the code at ObservableAndObserver

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Emit  " + 10);

        emitter.onNext("Emit " + 50);

        //throw new RuntimeException("Error occurred.");

        emitter.onNext("Emit " + 100);

        emitter.onComplete();

    }
});

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {}

    @Override
    public void onNext(String s) {
        System.out.println("Received event: " + s);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Seems like error has occurred "+ e.getMessage()+" No further event or OnComplete would be received.");
    }

    @Override
    public void onComplete() {
        System.out.println("Received onComplete no further events would be received further ");
    }
};

observable.subscribe(observer);

In you run the above code you will see below output.

Received event: Emit  10
Received event: Emit 50
Received event: Emit 100
Received onComplete no further events would be received further

So we received all the events and onComplete event. If you uncomment throw new RuntimeException(“Error occurred.”); line in above code and run the program you will see below output.

Received event: Emit  10
Received event: Emit 50
Seems like error has occurred Error occurred. No further event or OnComplete would be received.

In the second execution, Observable is throwing an exception after 2 events thus after 2 events , OnError would be called and onComplete would not be called.

Creating Observable

Above shown way is not the only way to create an Observable. There are many creational operators available in RxJava which can be used to create the observable. We would cover few of the important creational operator in this section.

Create

This operator creates an Observable from scratch by calling observer methods programmatically. An emitter is provided through which we can call the respective interface methods when needed. We have used the Observable.create() in our introductory example . Thus let’s not repeat it again.

Defer

This operator does not create the Observable until the Observer subscribes.defer is good when you have something that creates/returns an observable already, but you don’t want it to that process to happen until subscription. So if you have a situation where you might use just to create an Observable from some results/value or you have a network API layer that returns an Observable of the request, but you don't want that request to kick off until subscription. defer would be good for those scenarios.

public static void main(String[] args) {
    Observable.just(blockingHeavyOperation())
            .subscribe(next -> System.out.println("[just] Received "+next));


    Observable.defer(()-> Observable.just(blockingHeavyOperation()))
            .subscribe(next -> System.out.println("[defer] Received "+next));
}

static String blockingHeavyOperation(){
    System.out.println("blockingHeavyOperation called.");
    return "<<HeavyOperation>>";
}

In the above code blockingHeavyOperation() would not be called in case of .defer() until Observable is subscribed.

Just

This operator takes a list of arguments (maximum 10) and converts the items into Observable items. just() makes only 1 emission. For instance, If an array is passed as a parameter to the just() method, the array is emitted as single item instead of individual numbers. But if you pass the items directly to just() method, Observable would emit separate event for each item. Note that if you pass null to just(), it will return an Observable that emits null as an item.

The below sample creates an Observable using Observable.just() method. In the code first Observable emits each elements separately and second Observable will emit the entire list in a single emission.

Observable.just( "Superman", "Doctor Strange", "Iron Man")
        .subscribe(event -> System.out.println(event));

Observable.just( new String[]{"Superman", "Doctor Strange", "Iron Man"})
        .subscribe(event -> System.out.println(Arrays.toString(event)));

If you run above code, you would see below output

Superman
Doctor Strange
Iron Man
[Superman, Doctor Strange, Iron Man]

From

This operator creates an Observable from set of items using an Iterable, which means we can pass a list or an array of items to the Observable and each item is emitted one at a time. Some of the examples of the operators include fromCallable(), fromFuture(), fromIterable(), fromPublisher(), fromArray().

The below sample creates an Observable using Observable.from() method. The code will print each item from the array one by one. The order is also preserved.

Observable.fromArray( new String[]{"Superman", "Doctor Strange", "Iron Man"})
        .subscribe(event -> System.out.println(event));

If you run above code, you would see below output

Superman
Doctor Strange
Iron Man

Range

This operator creates an Observable that emits a range of sequential integers. The function takes two arguments: the starting number and length. This operator can be used as a replacement of ‘For loop’ or ‘IntStream’.

The below sample creates an Observable using Observable.range() method. The below has a starting number of 1 and a range of 6 numbers, so it will print values from 1 to 6.

Observable.range(1,6)
        .subscribe(num-> System.out.println("Item "+num));

If you run above code, you would see below output

Item 1
Item 2
Item 3
Item 4
Item 5
Item 6

Type of Observable

RxJava 1 introduced only one type of Observable, which is Observable itself. It can emit 0 or n items and terminates with a success or an error event. RxJava2 introduced 4 more Observable type thus now we have total of 5 Observable types as below:

  • Flowable: Emits 0 or n items and terminates with a success or an error event. Supports backpressure, which allows to control how fast a source emits items.
  • Observable: Emits 0 or n items and terminates with a success or an error event.
  • Single: Emits either a single item or an error event. The reactive version of a method call.
  • Maybe : Succeeds with an item, or no item, or errors. The reactive version of an Optional.
  • Completable : Either completes with a success or with an error event. It never emits items. The reactive version of a Runnable Or return type void.

We will discuss about all the type of Observers and the transformational operators in the next part of this blog.

comments powered by Disqus