Observable and Observer

Reactive Programming?

It is event based asynchronous programming based on observable pattern.





Observable :


  1. Observable is a stream of data. Observable process and emits data when any subscriber starts listening to it.
  2. Observable have one or many subscribers.Observable can emit any number of items (zero or more).
  3. We can also transform emitted data from a stream by applying different Operators on Observable like map, flatmap, groupBy, debounce, filter, join etc.
  4. Operators process data and return an observable. So we can apply any number of Operators on Observable.
          


Observer :



  1. An observer is received data stream emitted by Observable.
  2. Observer subscribes Observable using method subscribe().
  3. When Observable emits data then onNext() method is called.
  4. When completed successfully then this method is called onComplete().
  5. If during this emission any exception is thrown by Observable then this method is called onError().



Code Snippet :


Lets understand below code, before that i have attached Rx java dependencies in below the code, I have created a observable source, which emits items/stream of type string i.e  "Stream","emiting","without","error".  Once observer subscribe it, it gets these item in 
onNext callback method where he can do the further work in it.then onComplete method called.
If any error came in between stream processing in observer end, it shut the further processing and control goes to onError block of observer callback where we can log error.





 import io.reactivex.*
 import io.reactivex.subjects.PublishSubject
 import java.io.IOException

fun main() {
 runWithOutErrror()
}

fun runWithOutErrror() {
Observable.fromArray("Stream","emiting","without","error!!!")
.subscribe(
{ println("OnNext Block : " +it)
},
{ println("Error block: " +it.message)
},
{ println("onComplete block, Streams completed !!!! ")
} )
}

Dependency :

implementation 'io.reactivex.rxjava2:rxjava:2.2.7'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "io.reactivex.rxjava2:rxkotlin:2.4.0"




Types of Observables:

  1. Observable
  2. Single
  3. Maybe
  4. Completable
  5. Flowable

Observable: 


The Observable.create() factory allows us to create an Observable by providing a lambda receiving an Observable emitter.




    1. Cold Observables : 


Cold Observables are much like a music CD that can be replayed to each listener, so each person can hear all the tracks at any time. In the same manner, cold Observables will replay the emissions to each Observer, ensuring that all Observers get all the data. Most data driven Observables are cold, and this includes the Observable.just() and Observable.fromIterable() factories.
Many Observables emitting from finite data sources such as databases, text files, or JSON are cold.

2. Hot Observables : 

A hot Observable is more like a radio station. It broadcasts the same emissions to all Observers at the same time. If an Observer subscribes to a hot Observable, receives some emissions, and then another Observer comes in afterwards, that second Observer will have missed those emissions. Just like a radio station, if you tune in too late, you will have missed that song.

 3. Connectable Observables: 

A helpful form of hot Observable is ConnectableObservable. It will take any Observable, even if it is cold, and make it hot so that all emissions are played to all Observers at once. To do this conversion, you simply need to call publish() on any Observable, and it will yield a ConnectableObservable. But subscribing will not start the emissions yet. You need to call its connect() method to start firing the emissions. This allows you to set up all your 
Observers beforehand.


- Observable factories :


Observable.range() - 

This will emit each number from a start value and increment each emission until the specified count is reached.

Observable.interval() - 

It will emit a consecutive long emission (starting at 0) at every specified time interval. Observable.interval(1, TimeUnit.SECONDS)

Observable.empty() - 

It is sometimes helpful to create an Observable that emits nothing and calls onComplete()

Observable.never() - 

A close cousin of Observable.empty() is Observable.never(). The only difference between them is that it never calls onComplete(), forever leaving observers waiting for emissions but never actually giving anyObservable.error() - You can create an Observable that immediately calls onError() with a specified exception. Observable.error(new Exception("Crash and burn!"))

Single : 

  1. Single should be used when you have only single emission or single item.
  2. So we can use Single.just(“single value”) for creating an Observable.
interface SingleObserver<T> {

void onSubscribe(Disposable d);

void onSuccess(T value);

void onError(Throwable error);

}


Maybe:


  1. Maybe is used when we have 0 or 1 item to be emitted.
  2. For creating maybe observables, we can use either of just/ empty factory methods
interface MayObserver<T> {

void onSubscribe(Disposable d);

void onSuccess(T value);

void onError(Throwable error);

void onComplete();

}

Completable : 


  1. When you don’t have data emissions, but only concerned with whether task is completed or not.
  2. Methods used are complete(), fromRunnable()
  3. Completable is simply concerned with an action being executed, but it does not receive any emissions. Logically, it does not have onNext() or onSuccess() to receive emissions, but it does have onError() and onComplete()
  4. You can construct one quickly by calling Completable.complete() or Completable.fromRunnable()


    interface MayObserver<T> {

    void onSubscribe(Disposable d);

    void onComplete();

    void onError(Throwable error);

    }

    Flowable : 

    1. Observable sources don't support backpressure.
    2. Because of that, we should use it for sources that we merely consume and can't influence.
    3. When we have an Observable we can easily transform it to Flowable using the toFlowable() method:


    var dataFromDatabase: Flowable<Int> = Flowable.just(1, 2, 3, 4, 5,.....)

    Observable<Integer> integerObservable = Observable.just(1, 2, 3….);

    Flowable<Integer> integerFlowable = integerObservable

    .toFlowable(BackpressureStrategy.BUFFER);










          Comments

          Popular posts from this blog

          Difference between Imperative Approach and Reactive Approach

          The 10 Most Popular Coding Challenge Websites 2021

          Java - Recursion