Reactive Extension RxJava

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Event              Iterable(Pull)            Observable(Push)

Retrieve data      T next()                  onNext(T)
discover error     throws Exception          onError(Exception)
Complete           returns                   onCompleted()

                   Single                    Multiple

Sync               T getData()               Iterable<T> getData()
Async              Future<T> getData         Observable<T> ge                                                      tData()

Observable.from(doSomeHeavyTask()) --------------------> Observable
.subscribeOn(        --------------------> Schedulers
.subscribe(getObserver)            --------------------> Observers

*Cold(Some kind of filesystem doing nothing) or
*Hot(Something getting fired all the time like stock info)
public class Observable<T> {
    public static <T> Observable<T> create(OnSubscribe<T> f);
    public Subscription subscribe(Observer<? super subscriber>);

onNext | onError | onCompleted
public interface Observer<T> {
    public abstract void onCompleted();
    public abstract void onError(Throwable e);
    public abstract void onNext(T t);

public static interface OnSubscribe<T> {
    public void call (Subscriber<? super T> subscriber);

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    public abstract void onCompleted();
    public abstract void onError(Throwable e);
    public abstract void onNext(T t);

    public final void add(Subscription s) {
        public void setProducer(Producer producer);

    public interface Producer {
        public void request(long n);

//Hello World Example
//Synchronous single value that runs on Main Thread(Single Thread)
Observable.create(subscriber -> {
    subscriber.onNext("Hello World !");

//Synchronous multiple value
 Observable.create(subscriber -> {

//Asynchronous Single value
Observable.create(subscriber -> {
    } catch(Throwable e) {
        subscriber.onError(e); //Error Notification
    }).subscribeOn( //Asynchronous Single value call so forEach statement will run on some other Thread

//Unsubscribe Example
//Its holds a infinite loop so this is keep pushing data but take(10) tells it
//stop when you get 10. And unSubscribed halt process
Observable.create(subscriber -> {
    int i = 0;

Source with examples on Github

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: