RxJava2 Tutorial Basic

1. What are RxJava and reactive programming In reactive programming, the consumer reacts to the data as it comes in.

This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers. >The Observer pattern has done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. RxJava is a library that lets you create applications in the reactive programming style. At its core, reactive programming provides a clean, efficient way of processing and reacting to streams of real-time data, including data with dynamic values. These data streams don’t necessarily have to take the form of traditional data types, as RxJava pretty much treats everything as a stream of data—everything from variables to properties, caches, and even user input events like clicks and swipes. The data emitted by each stream can either be a value, an error, or a “completed” signal, although you don’t necessarily have to implement the last two.

To create this workflow of data streams and objects that react to them, RxJava extends the Observer software design pattern. Essentially, in RxJava you have Observable objects that emit a stream of data and then terminate, and Observer objects that subscribe to Observables. An Observer receives a notification each time their assigned Observable emits a value, an error, or a completed signal. So at a very high level, RxJava is all about:

  • Creating an Observable.
  • Giving that Observable some data to emit.
  • Creating an Observer.
  • Assigning the Observer to an Observable.
  • Giving the Observer tasks to perform whenever it receives an emission from its assigned Observable.

2. Two key types There are two key types to understand

when working with Rx: Observable representing sources of data. Observer(or subscriber) listening to the observables. An observable emits items, a subscriber consumes those items.

2.1 Observables Observables are the sources for the data.

Usually, they start providing data once a subscriber starts listening. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error. Sources may never terminate, for example, an observable for
a button click can potentially produce an infinite stream of events. For now, we must understand the Subscribe method. Here is one key overload of the method:

1
public final Subscription subscribe(Subscriber <? super T> subscriber)

This is the method that you use to receive the values emitted by the observables. The subscriber is responsible for handling the values. The Subscriber class is an implementation of the Observer interface.

An Observable pushes 3 kinds of event:

  • values;
  • Completion, which indicates that no more values will be pushed.
  • Errors, if somthing caused the sequence to fail. These events also imply termination.

2.2 Observer(Subscriber)

An observable can have any number of subscribers. If a new item is emitted from the observable, the onNext() method is called on each subscriber. If the observable finishes its data flow successful, the onComplete() method is optionally called on each subscriber. Similar, if the observable finishes its data flow with an error, the onError() method is optionally called on each subscriber. No calls happen after a call to onError or onCompleted.

1
2
3
4
5
interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}

Example:

1
2
3
4
5
6
7
8
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i, //OnNext
Throwable::printStackTrace, //OnError
() -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));

2.3 Implementing Observable

It’s not recommended to manually implement Observer or extend Observable. Since RxJava already provides all the building blocks you need. It is simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.

2.3.1 Creating an Observable

  1. Observable.just()
    You can use the .just() operator to convert any object into an Observable. The result Observable will the emit the original object and complete.
    For example, here we’re creating an Observable that’ll emit a single string to all its Observers:

    1
    2
    3
    4
    5
    6
    Observable<String> observable = Observable.just("Hello World!");
    Subscription subscription = observable.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
    );
  2. Observable.range()
    You can use the .range() operator to emit a range of sequential integers. The first integer you provider is the initial value, and the second is the number of integers you want to emit. For example

    1
    2
    3
    4
    5
    6
    Observable<Integer> observable = Observable.range(0, 2);
    observable.subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    Received:0
    Received:0
    Completed
  3. Observable.from()
    The .from() operator allows you to convert a collection of objects into an observable stream. You can convert an array into an Observable using Observable.fromArray, a Callable into an Observable using the Observable.fromCallable, an Iterable into an Observable using Observable.fromIterable and a ‘Future’ into an Observable using Observable.fromFuture()

    1
    2
    3
    4
    5
    6
    7
    Integer[] array = {1,3,5};
    Observable<Integer> observable = Observable.fromArray(array);
    observable.subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    Received:1
    Received:3
    Received:5
    Completed

    Future is a part of the Java framework and you may come acroos them while using frameworks that use concurrency. They are a less powerful concept for concurrency than Rx, since they only return one value.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    FutureTask<Integer> f = new FutureTask<Integer>(() -> {
    Thread.sleep(2000);
    return 21;
    });
    new Thread(f).start();

    Observable<Integer> values = Observable.from(f);

    Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    Received: 21
    Completed
  4. Observable.interval()
    This operator creates an Observable that emits an infinite sequence of ascending integers, with each emission separated by a time interval chosen by you. For example:

    1
    2
    3
    4
    5
    6
    Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
    observable.subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    5
    Received: 0
    Received: 1
    Received: 2
    Received: 3
    ...

    This sequence will not terminate until we unsubscribe.

  5. Observable.empty()
    The empty() operator creats an Observable that emits no items but terminates normally, which can be useful when you need to quickly create an Observable for testing purposes.

    1
    Observable<String> observable = Observable.empty();

    More other ways to creat Observable seeing the creating observables operators

3.Operators

3.1 Introducing Operators

RxJava has an enormous collections of operators that are mainly intended to help you modify, filter, merge and transform the data that’s being emiited by your Observables. You could find the complete list of RxJava operators over at the official github or categorized operators in the official website.
It’s allowed to chain multiple operators together. Applying an operators to an Observable typically returns another Observable, so you can just keep applying operators until you get the results you want. In the previous section we talked about the creating Observable operators, in this section we focus on other useful operators.

3.2 Filtering Observables

  1. filter takes a predicate function that makes a boolean decision for each value emitted. If the decision if false, the item is omitted from the filtered sequence.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> values = Observable.range(0, 10);
    values.filter(v -> v % 2 == 0)
    .subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    5
    6
    Received:0
    Received:2
    Received:4
    Received:6
    Received:8
    Completed
  2. distinct operator filters an Observable by only allowing items through that have not already been emitted.

    1
    2
    3
    4
    5
    6
    7
    8
    Integer[] array = {1, 1, 2, 2, 3};
    Observable<Integer> values = Observable.fromArray(array);
    values.distinct()
    .subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    Received:1
    Received:2
    Received:3
    Completed
  3. DistinceUntilChanged operator compares emitted items from the source Observable against their immediate predecessors in order to determine whether or not they are distince.

    1
    2
    3
    4
    5
    6
    7
    8
    Integer[] array = {1, 1, 2, 3, 2};
    Observable<Integer> values = Observable.fromArray(array);
    values.distinctUntilChanged()
    .subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    5
    Received:1
    Received:2
    Received:3
    Received:2
    Completed
  4. ignoreElements will ignore every items, but allows its termination notification(either onError or onCompleted) to pass through unchanged.

    1
    2
    3
    4
    5
    Observable<Integer> values = Observable.range(0, 5);
    values.ignoreElements().subscribe(
    ()-> System.out.println("Completed"),
    e-> System.out.println("error")
    );

    Output

    1
    Completed
  5. take takes the first n items emitted by the Observable.

    1
    2
    3
    4
    5
    6
    Observable<Integer> values = Observable.range(0, 5);
    values.take(2).subscribe(
    v -> System.out.println("Recevied: " + v),
    e-> System.out.println("error"),
    ()-> System.out.println("Completed")
    );

    Output

    1
    2
    3
    Recevied: 0
    Recevied: 1
    Completed
  6. skip skips first n items emitted by the Observable.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> values = Observable.range(0, 5);
    values.skip(2)
    .subscribe(
    v -> System.out.println("Recevied: " + v),
    e -> System.out.println("error"),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    Recevied: 2
    Recevied: 3
    Recevied: 4
    Completed
  7. takeWhile takes items emitted by Observable while a predicate is true, once the predicate function becomes false, it stops taking the items.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> values = Observable.range(0, 5);
    values.takeWhile(v -> v < 2)
    .subscribe(
    v -> System.out.println("Recevied: " + v),
    e -> System.out.println("error"),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    Recevied: 0
    Recevied: 1
    Completed
  8. skipWhile skip items while a predicate function is true, once the predicate function becomes false, it stops skipping the items.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> values = Observable.range(0, 5);
    values.skipWhile(v -> v < 2)
    .subscribe(
    v -> System.out.println("Recevied: " + v),
    e -> System.out.println("error"),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    Recevied: 2
    Recevied: 3
    Recevied: 4
    Completed
  9. takeUntil takes the items emitted by the Observable from the beginning until the predicate function returns true

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> observable = Observable.range(0, 5);
    observable.takeUntil( v -> v==2)
    .subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    Received:0
    Received:1
    Received:2
    Completed
  10. skipUntil skips the items emitted by the source Observable until a second Observable emits an item, at which point skipUntil begins to take the items of the source Observable

3.3 Conditional and Boolean Operators

Operators that evaluate one or more observables or items emitted by Observables

  1. All determine that whether all items emitted by an Observable meet some criteria and returns a boolean value based on tan evaluation of that item. All returns an Observable that emits only a single boolean value. The signature is:

    1
    public final Observable<java.lang.Boolean> all(Func1<? super T,java.lang.Boolean> predicate)

    Example:

    1
    2
    3
    4
    5
    6
    Observable<Integer> observable = Observable.range(0, 5);
    observable.all( v -> v < 10)
    .subscribe(
    result -> System.out.println(result),
    e -> System.out.println("Error: " + e)
    );

    Output

    1
    true
  2. contains determine whether an Observable emits a particular item or not.

    1
    2
    3
    4
    5
    6
    Observable<Integer> observable = Observable.range(0, 2);
    observable.contains( 3)
    .subscribe(
    result -> System.out.println(result),
    e -> System.out.println("Error: " + e)
    );

    Output

    1
    false
  3. defaultIfEmpty emits items from the source Observable, or a default item if the source Observable emits nothing.

    1
    2
    3
    4
    5
    6
    Observable<Integer> observable = Observable.empty();
    observable.defaultIfEmpty(1)
    .subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed"));

    Output

    1
    2
    Received:1
    Completed
  4. sequenceEqual determine whether two Observable emit the same sequence of items.

    1
    2
    3
    4
    5
    6
    7
    8
    Observable<Integer> ints = Observable.range(1, 3);
    Observable<String> strings = Observable.just("1","2","3");
    Observable
    .sequenceEqual(ints,strings, (i,s) -> s.equals(String.valueOf(i)))
    .subscribe(
    result -> System.out.println(result),
    e -> System.out.println("Error: " + e)
    );

    Output

    1
    true
  5. amb operator. Given two or more source Observable, emit all of the items from only the first of these Observables to emit an item or notification. It will pass throught the emissions and notifications of exactly one of theses Observables: the first one that sends a notification to Amb, either by emitting an item or sending an onError or onCompleted notification. Amb will ignore and discard the emissions and notifications of all of the other source observabeles.

    1
    2
    3
    4
    5
    6
    7
    8
    Observable interval = Observable.interval(200,TimeUnit.MILLISECONDS);
    Observable<Integer> ints = Observable.range(1, 3);
    Observable intervalDelay = Observable.interval(1000,1,TimeUnit.MILLISECONDS);
    Observable.ambArray(interval,intervalDelay,ints)
    .subscribe(
    v -> System.out.println("Received:" + v),
    e -> System.out.println("Errir" + e),
    () -> System.out.println("Completed"));

    Output

    1
    2
    3
    4
    Received:1
    Received:2
    Received:3
    Completed

3.4 Transforming observables

Operators that transform items that are emitted by an Observable. It’s common to transform the items to the format that we want.

  1. map is the basic transformation operator. It takes a function which takes an item and return a new item of any type. The returned observable is composed of the values returned by the transformation function.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> observable = Observable.range(1, 3);
    observable.map(i -> "String: "+i)
    .subscribe(
    result -> System.out.println(result),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    String: 1
    String: 2
    String: 3
    Completed
  2. flatMap transformation operator takes items from the source observable and for each of them, returns a new observable that emits the new values. The returned observables will be flatten into a single Observable. flatMap merges the emissions of these resulting observables, emitting these merged result as its own sequence.

    Note that flatMap merges the emissions of these observables, so that they may interleave.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> observable = Observable.range(1, 3);
    observable.flatMap( i -> Observable.range(0,i))
    .subscribe(
    result -> System.out.println(result),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    4
    5
    6
    7
    0
    0
    1
    0
    1
    2
    Completed
  3. buffer periodically gather items emitted by an Observable into bundles and emit these bundles rather then emitting the items one at a time.

    • buffer(int count) emits ono-overlapping buffers in the forms of Lists, each of which contains at most count items from the source observables(the final emitted List may have fewer than count items).

      1
      2
      3
      4
      5
      6
      7
      Observable<Integer> observable = Observable.range(0, 5);
      observable.buffer(2)
      .subscribe(
      list -> System.out.println(list),
      e -> System.out.println("Error: " + e),
      () -> System.out.println("Completed")
      );

      Output

      1
      2
      3
      4
      [0, 1]
      [2, 3]
      [4]
      Completed
    • buffer(int count, int skip) creates a new buffer starting with the first emitted item from the source observable, and every skip items thereafter, and fills each buffer with count items. It emits these buffer as Lists. Depending on the values of count and skip these buffers may overlap(multiple buffers may contains the same item), or they may have gaps.
      Example buffer(3,2) means creates buffers that each buffer contains 3 elements, and skip 2 items to create a new buffer.

      1
      2
      3
      4
      5
      6
      7
      Observable<Integer> observable = Observable.range(0, 10);
      observable.buffer(3,2)
      .subscribe(
      list -> System.out.println(list),
      e -> System.out.println("Error: " + e),
      () -> System.out.println("Completed")
      );

      Output

      1
      2
      3
      4
      5
      6
      [0, 1, 2]
      [2, 3, 4]
      [4, 5, 6]
      [6, 7, 8]
      [8, 9]
      Completed
  4. groupBy divide an Observable into a set of Observables that each emit a different subset of items from the original observable.
    Example: group a range of numbers from 0 to 9 by even and odd. Print the last item of each group.

    1
    2
    3
    4
    5
    6
    7
    Observable<Integer> observable = Observable.range(0, 10);
    observable.groupBy(v -> v% 2 == 0)
    .subscribe(
    group -> group.takeLast(1).subscribe(v-> System.out.println(v)),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
    );

    Output

    1
    2
    3
    9
    8
    Completed

3.5 Aggregate operators

Operators that operate on the entire sequence of items emitted by an Observable. Operators like Count, Average, Max, Min and Sum return a sequence with a single value.

4. Reference :

http://www.vogella.com/tutorials/RxJava/article.html
http://www.baeldung.com/rx-java
https://github.com/Froussios/Intro-To-RxJava
https://code.tutsplus.com/tutorials/getting-started-with-rxjava-20-for-android--cms-28345