1. What are RxJava and reactive programming In reactive programming, the consumer reacts to the data as it comes in.
This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers. >The Observer pattern has done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. RxJava is a library that lets you create applications in the reactive programming style. At its core, reactive programming provides a clean, efficient way of processing and reacting to streams of real-time data, including data with dynamic values. These data streams don’t necessarily have to take the form of traditional data types, as RxJava pretty much treats everything as a stream of data—everything from variables to properties, caches, and even user input events like clicks and swipes. The data emitted by each stream can either be a value, an error, or a “completed” signal, although you don’t necessarily have to implement the last two.
To create this workflow of data streams and objects that react to them, RxJava extends the Observer software design pattern. Essentially, in RxJava you have Observable objects that emit a stream of data and then terminate, and Observer objects that subscribe to Observables. An Observer receives a notification each time their assigned Observable emits a value, an error, or a completed signal. So at a very high level, RxJava is all about:
- Creating an
Observable. - Giving that
Observablesome data to emit. - Creating an
Observer. - Assigning the
Observerto anObservable. - Giving the
Observertasks to perform whenever it receives an emission from its assignedObservable.
2. Two key types There are two key types to understand
when working with Rx: Observable representing sources of data. Observer(or subscriber) listening to the observables. An observable emits items, a subscriber consumes those items.
2.1 Observables Observables are the sources for the data.
Usually, they start providing data once a subscriber starts listening. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error. Sources may never terminate, for example, an observable for
a button click can potentially produce an infinite stream of events. For now, we must understand the Subscribe method. Here is one key overload of the method:
1 | public final Subscription subscribe(Subscriber <? super T> subscriber) |
This is the method that you use to receive the values emitted by the observables. The subscriber is responsible for handling the values. The Subscriber class is an implementation of the Observer interface.
An Observable pushes 3 kinds of event:
- values;
- Completion, which indicates that no more values will be pushed.
- Errors, if somthing caused the sequence to fail. These events also imply termination.
2.2 Observer(Subscriber)
An observable can have any number of subscribers. If a new item is emitted from the observable, the onNext() method is called on each subscriber. If the observable finishes its data flow successful, the onComplete() method is optionally called on each subscriber. Similar, if the observable finishes its data flow with an error, the onError() method is optionally called on each subscriber. No calls happen after a call to onError or onCompleted.1
2
3
4
5interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}
Example:1
2
3
4
5
6
7
8String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i, //OnNext
Throwable::printStackTrace, //OnError
() -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));
2.3 Implementing Observable
It’s not recommended to manually implement Observer or extend Observable. Since RxJava already provides all the building blocks you need. It is simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.
2.3.1 Creating an Observable
Observable.just()
You can use the.just()operator to convert any object into an Observable. The resultObservablewill the emit the original object and complete.
For example, here we’re creating anObservablethat’ll emit a single string to all itsObservers:1
2
3
4
5
6Observable<String> observable = Observable.just("Hello World!");
Subscription subscription = observable.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Observable.range()
You can use the.range()operator to emit a range of sequential integers. The first integer you provider is the initial value, and the second is the number of integers you want to emit. For example1
2
3
4
5
6Observable<Integer> observable = Observable.range(0, 2);
observable.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3Received:0
Received:0
CompletedObservable.from()
The.from()operator allows you to convert a collection of objects into an observable stream. You can convert an array into anObservableusingObservable.fromArray, aCallableinto anObservableusing theObservable.fromCallable, anIterableinto anObservableusingObservable.fromIterableand a ‘Future’ into anObservableusingObservable.fromFuture()1
2
3
4
5
6
7Integer[] array = {1,3,5};
Observable<Integer> observable = Observable.fromArray(array);
observable.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4Received:1
Received:3
Received:5
CompletedFutureis a part of the Java framework and you may come acroos them while using frameworks that use concurrency. They are a less powerful concept for concurrency than Rx, since they only return one value.1
2
3
4
5
6
7
8
9
10
11
12
13FutureTask<Integer> f = new FutureTask<Integer>(() -> {
Thread.sleep(2000);
return 21;
});
new Thread(f).start();
Observable<Integer> values = Observable.from(f);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2Received: 21
CompletedObservable.interval()
This operator creates anObservablethat emits an infinite sequence of ascending integers, with each emission separated by a time interval chosen by you. For example:1
2
3
4
5
6Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
observable.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5Received: 0
Received: 1
Received: 2
Received: 3
...This sequence will not terminate until we unsubscribe.
Observable.empty()
Theempty()operator creats anObservablethat emits no items but terminates normally, which can be useful when you need to quickly create anObservablefor testing purposes.1
Observable<String> observable = Observable.empty();
More other ways to creat
Observableseeing the creating observables operators
3.Operators
3.1 Introducing Operators
RxJava has an enormous collections of operators that are mainly intended to help you modify, filter, merge and transform the data that’s being emiited by your Observables. You could find the complete list of RxJava operators over at the official github or categorized operators in the official website.
It’s allowed to chain multiple operators together. Applying an operators to an Observable typically returns another Observable, so you can just keep applying operators until you get the results you want. In the previous section we talked about the creating Observable operators, in this section we focus on other useful operators.
3.2 Filtering Observables
filtertakes a predicate function that makes a boolean decision for each value emitted. If the decision iffalse, the item is omitted from the filtered sequence.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 10);
values.filter(v -> v % 2 == 0)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5
6Received:0
Received:2
Received:4
Received:6
Received:8
Completeddistinctoperator filters anObservableby only allowing items through that have not already been emitted.1
2
3
4
5
6
7
8Integer[] array = {1, 1, 2, 2, 3};
Observable<Integer> values = Observable.fromArray(array);
values.distinct()
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4Received:1
Received:2
Received:3
CompletedDistinceUntilChangedoperator compares emitted items from the sourceObservableagainst their immediate predecessors in order to determine whether or not they are distince.1
2
3
4
5
6
7
8Integer[] array = {1, 1, 2, 3, 2};
Observable<Integer> values = Observable.fromArray(array);
values.distinctUntilChanged()
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5Received:1
Received:2
Received:3
Received:2
CompletedignoreElementswill ignore every items, but allows its termination notification(eitheronErrororonCompleted) to pass through unchanged.1
2
3
4
5Observable<Integer> values = Observable.range(0, 5);
values.ignoreElements().subscribe(
()-> System.out.println("Completed"),
e-> System.out.println("error")
);Output
1
Completed
taketakes the first n items emitted by theObservable.1
2
3
4
5
6Observable<Integer> values = Observable.range(0, 5);
values.take(2).subscribe(
v -> System.out.println("Recevied: " + v),
e-> System.out.println("error"),
()-> System.out.println("Completed")
);Output
1
2
3Recevied: 0
Recevied: 1
Completedskipskips first n items emitted by theObservable.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 5);
values.skip(2)
.subscribe(
v -> System.out.println("Recevied: " + v),
e -> System.out.println("error"),
() -> System.out.println("Completed")
);Output
1
2
3
4Recevied: 2
Recevied: 3
Recevied: 4
CompletedtakeWhiletakes items emitted byObservablewhile a predicate istrue, once the predicate function becomesfalse, it stops taking the items.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 5);
values.takeWhile(v -> v < 2)
.subscribe(
v -> System.out.println("Recevied: " + v),
e -> System.out.println("error"),
() -> System.out.println("Completed")
);Output
1
2
3Recevied: 0
Recevied: 1
CompletedskipWhileskip items while a predicate function istrue, once the predicate function becomesfalse, it stops skipping the items.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 5);
values.skipWhile(v -> v < 2)
.subscribe(
v -> System.out.println("Recevied: " + v),
e -> System.out.println("error"),
() -> System.out.println("Completed")
);Output
1
2
3
4Recevied: 2
Recevied: 3
Recevied: 4
CompletedtakeUntiltakes the items emitted by theObservablefrom the beginning until the predicate function returnstrue1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 5);
observable.takeUntil( v -> v==2)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4Received:0
Received:1
Received:2
CompletedskipUntilskips the items emitted by the sourceObservableuntil a secondObservableemits an item, at which pointskipUntilbegins to take the items of the sourceObservable
3.3 Conditional and Boolean Operators
Operators that evaluate one or more observables or items emitted by Observables
Alldetermine that whether all items emitted by anObservablemeet some criteria and returns a boolean value based on tan evaluation of that item.Allreturns anObservablethat emits only a single boolean value. The signature is:1
public final Observable<java.lang.Boolean> all(Func1<? super T,java.lang.Boolean> predicate)
Example:
1
2
3
4
5
6Observable<Integer> observable = Observable.range(0, 5);
observable.all( v -> v < 10)
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e)
);Output
1
true
containsdetermine whether anObservableemits a particular item or not.1
2
3
4
5
6Observable<Integer> observable = Observable.range(0, 2);
observable.contains( 3)
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e)
);Output
1
false
defaultIfEmptyemits items from the sourceObservable, or a default item if the sourceObservableemits nothing.1
2
3
4
5
6Observable<Integer> observable = Observable.empty();
observable.defaultIfEmpty(1)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed"));Output
1
2Received:1
CompletedsequenceEqualdetermine whether twoObservableemit the same sequence of items.1
2
3
4
5
6
7
8Observable<Integer> ints = Observable.range(1, 3);
Observable<String> strings = Observable.just("1","2","3");
Observable
.sequenceEqual(ints,strings, (i,s) -> s.equals(String.valueOf(i)))
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e)
);Output
1
true
amboperator. Given two or more sourceObservable, emit all of the items from only the first of theseObservablesto emit an item or notification. It will pass throught the emissions and notifications of exactly one of theses Observables: the first one that sends a notification toAmb, either by emitting an item or sending anonErrororonCompletednotification.Ambwill ignore and discard the emissions and notifications of all of the other source observabeles.1
2
3
4
5
6
7
8Observable interval = Observable.interval(200,TimeUnit.MILLISECONDS);
Observable<Integer> ints = Observable.range(1, 3);
Observable intervalDelay = Observable.interval(1000,1,TimeUnit.MILLISECONDS);
Observable.ambArray(interval,intervalDelay,ints)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed"));Output
1
2
3
4Received:1
Received:2
Received:3
Completed
3.4 Transforming observables
Operators that transform items that are emitted by an Observable. It’s common to transform the items to the format that we want.
mapis the basic transformation operator. It takes a function which takes an item and return a new item of any type. The returned observable is composed of the values returned by the transformation function.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(1, 3);
observable.map(i -> "String: "+i)
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4String: 1
String: 2
String: 3
CompletedflatMaptransformation operator takes items from the source observable and for each of them, returns a new observable that emits the new values. The returned observables will be flatten into a singleObservable.flatMapmerges the emissions of these resulting observables, emitting these merged result as its own sequence.
Note that
flatMapmerges the emissions of these observables, so that they may interleave.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(1, 3);
observable.flatMap( i -> Observable.range(0,i))
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5
6
70
0
1
0
1
2
Completedbufferperiodically gather items emitted by an Observable into bundles and emit these bundles rather then emitting the items one at a time.buffer(int count)emits ono-overlapping buffers in the forms ofLists, each of which contains at mostcountitems from the source observables(the final emittedListmay have fewer thancountitems).1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 5);
observable.buffer(2)
.subscribe(
list -> System.out.println(list),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4[0, 1]
[2, 3]
[4]
Completedbuffer(int count, int skip)creates a new buffer starting with the first emitted item from the source observable, and everyskipitems thereafter, and fills each buffer withcountitems. It emits these buffer asLists. Depending on the values ofcountandskipthese buffers may overlap(multiple buffers may contains the same item), or they may have gaps.
Examplebuffer(3,2)means creates buffers that each buffer contains 3 elements, and skip 2 items to create a new buffer.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 10);
observable.buffer(3,2)
.subscribe(
list -> System.out.println(list),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5
6[0, 1, 2]
[2, 3, 4]
[4, 5, 6]
[6, 7, 8]
[8, 9]
Completed
groupBydivide an Observable into a set of Observables that each emit a different subset of items from the original observable.
Example: group a range of numbers from 0 to 9 by even and odd. Print the last item of each group.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 10);
observable.groupBy(v -> v% 2 == 0)
.subscribe(
group -> group.takeLast(1).subscribe(v-> System.out.println(v)),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
39
8
Completed
3.5 Aggregate operators
Operators that operate on the entire sequence of items emitted by an Observable. Operators like Count, Average, Max, Min and Sum return a sequence with a single value.
4. Reference :
http://www.vogella.com/tutorials/RxJava/article.html
http://www.baeldung.com/rx-java
https://github.com/Froussios/Intro-To-RxJava
https://code.tutsplus.com/tutorials/getting-started-with-rxjava-20-for-android--cms-28345