1. What are RxJava and reactive programming In reactive programming, the consumer reacts to the data as it comes in.
This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers. >The Observer pattern has done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. RxJava is a library that lets you create applications in the reactive programming style. At its core, reactive programming provides a clean, efficient way of processing and reacting to streams of real-time data, including data with dynamic values. These data streams don’t necessarily have to take the form of traditional data types, as RxJava pretty much treats everything as a stream of data—everything from variables to properties, caches, and even user input events like clicks and swipes. The data emitted by each stream can either be a value, an error, or a “completed” signal, although you don’t necessarily have to implement the last two.
To create this workflow of data streams and objects that react to them, RxJava extends the Observer software design pattern. Essentially, in RxJava you have Observable
objects that emit a stream of data and then terminate, and Observer
objects that subscribe to Observables
. An Observer
receives a notification each time their assigned Observable emits a value, an error, or a completed signal. So at a very high level, RxJava is all about:
- Creating an
Observable
. - Giving that
Observable
some data to emit. - Creating an
Observer
. - Assigning the
Observer
to anObservable
. - Giving the
Observer
tasks to perform whenever it receives an emission from its assignedObservable
.
2. Two key types There are two key types to understand
when working with Rx: Observable representing sources of data. Observer(or subscriber) listening to the observables. An observable emits items, a subscriber consumes those items.
2.1 Observables Observables are the sources for the data.
Usually, they start providing data once a subscriber starts listening. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error. Sources may never terminate, for example, an observable for
a button click can potentially produce an infinite stream of events. For now, we must understand the Subscribe
method. Here is one key overload of the method:
1 | public final Subscription subscribe(Subscriber <? super T> subscriber) |
This is the method that you use to receive the values emitted by the observables. The subscriber is responsible for handling the values. The Subscriber
class is an implementation of the Observer
interface.
An Observable
pushes 3 kinds of event:
- values;
- Completion, which indicates that no more values will be pushed.
- Errors, if somthing caused the sequence to fail. These events also imply termination.
2.2 Observer(Subscriber)
An observable can have any number of subscribers. If a new item is emitted from the observable, the onNext()
method is called on each subscriber. If the observable finishes its data flow successful, the onComplete()
method is optionally called on each subscriber. Similar, if the observable finishes its data flow with an error, the onError()
method is optionally called on each subscriber. No calls happen after a call to onError or onCompleted.1
2
3
4
5interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}
Example:1
2
3
4
5
6
7
8String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i, //OnNext
Throwable::printStackTrace, //OnError
() -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));
2.3 Implementing Observable
It’s not recommended to manually implement Observer
or extend Observable
. Since RxJava already provides all the building blocks you need. It is simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.
2.3.1 Creating an Observable
Observable.just()
You can use the.just()
operator to convert any object into an Observable. The resultObservable
will the emit the original object and complete.
For example, here we’re creating anObservable
that’ll emit a single string to all itsObservers
:1
2
3
4
5
6Observable<String> observable = Observable.just("Hello World!");
Subscription subscription = observable.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Observable.range()
You can use the.range()
operator to emit a range of sequential integers. The first integer you provider is the initial value, and the second is the number of integers you want to emit. For example1
2
3
4
5
6Observable<Integer> observable = Observable.range(0, 2);
observable.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3Received:0
Received:0
CompletedObservable.from()
The.from()
operator allows you to convert a collection of objects into an observable stream. You can convert an array into anObservable
usingObservable.fromArray
, aCallable
into anObservable
using theObservable.fromCallable
, anIterable
into anObservable
usingObservable.fromIterable
and a ‘Future’ into anObservable
usingObservable.fromFuture()
1
2
3
4
5
6
7Integer[] array = {1,3,5};
Observable<Integer> observable = Observable.fromArray(array);
observable.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4Received:1
Received:3
Received:5
CompletedFuture
is a part of the Java framework and you may come acroos them while using frameworks that use concurrency. They are a less powerful concept for concurrency than Rx, since they only return one value.1
2
3
4
5
6
7
8
9
10
11
12
13FutureTask<Integer> f = new FutureTask<Integer>(() -> {
Thread.sleep(2000);
return 21;
});
new Thread(f).start();
Observable<Integer> values = Observable.from(f);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2Received: 21
CompletedObservable.interval()
This operator creates anObservable
that emits an infinite sequence of ascending integers, with each emission separated by a time interval chosen by you. For example:1
2
3
4
5
6Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
observable.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5Received: 0
Received: 1
Received: 2
Received: 3
...This sequence will not terminate until we unsubscribe.
Observable.empty()
Theempty()
operator creats anObservable
that emits no items but terminates normally, which can be useful when you need to quickly create anObservable
for testing purposes.1
Observable<String> observable = Observable.empty();
More other ways to creat
Observable
seeing the creating observables operators
3.Operators
3.1 Introducing Operators
RxJava has an enormous collections of operators that are mainly intended to help you modify, filter, merge and transform the data that’s being emiited by your Observables
. You could find the complete list of RxJava operators over at the official github or categorized operators in the official website.
It’s allowed to chain multiple operators together. Applying an operators to an Observable
typically returns another Observable
, so you can just keep applying operators until you get the results you want. In the previous section we talked about the creating Observable
operators, in this section we focus on other useful operators.
3.2 Filtering Observables
filter
takes a predicate function that makes a boolean decision for each value emitted. If the decision iffalse
, the item is omitted from the filtered sequence.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 10);
values.filter(v -> v % 2 == 0)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5
6Received:0
Received:2
Received:4
Received:6
Received:8
Completeddistinct
operator filters anObservable
by only allowing items through that have not already been emitted.1
2
3
4
5
6
7
8Integer[] array = {1, 1, 2, 2, 3};
Observable<Integer> values = Observable.fromArray(array);
values.distinct()
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4Received:1
Received:2
Received:3
CompletedDistinceUntilChanged
operator compares emitted items from the sourceObservable
against their immediate predecessors in order to determine whether or not they are distince.1
2
3
4
5
6
7
8Integer[] array = {1, 1, 2, 3, 2};
Observable<Integer> values = Observable.fromArray(array);
values.distinctUntilChanged()
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5Received:1
Received:2
Received:3
Received:2
CompletedignoreElements
will ignore every items, but allows its termination notification(eitheronError
oronCompleted
) to pass through unchanged.1
2
3
4
5Observable<Integer> values = Observable.range(0, 5);
values.ignoreElements().subscribe(
()-> System.out.println("Completed"),
e-> System.out.println("error")
);Output
1
Completed
take
takes the first n items emitted by theObservable
.1
2
3
4
5
6Observable<Integer> values = Observable.range(0, 5);
values.take(2).subscribe(
v -> System.out.println("Recevied: " + v),
e-> System.out.println("error"),
()-> System.out.println("Completed")
);Output
1
2
3Recevied: 0
Recevied: 1
Completedskip
skips first n items emitted by theObservable
.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 5);
values.skip(2)
.subscribe(
v -> System.out.println("Recevied: " + v),
e -> System.out.println("error"),
() -> System.out.println("Completed")
);Output
1
2
3
4Recevied: 2
Recevied: 3
Recevied: 4
CompletedtakeWhile
takes items emitted byObservable
while a predicate istrue
, once the predicate function becomesfalse
, it stops taking the items.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 5);
values.takeWhile(v -> v < 2)
.subscribe(
v -> System.out.println("Recevied: " + v),
e -> System.out.println("error"),
() -> System.out.println("Completed")
);Output
1
2
3Recevied: 0
Recevied: 1
CompletedskipWhile
skip items while a predicate function istrue
, once the predicate function becomesfalse
, it stops skipping the items.1
2
3
4
5
6
7Observable<Integer> values = Observable.range(0, 5);
values.skipWhile(v -> v < 2)
.subscribe(
v -> System.out.println("Recevied: " + v),
e -> System.out.println("error"),
() -> System.out.println("Completed")
);Output
1
2
3
4Recevied: 2
Recevied: 3
Recevied: 4
CompletedtakeUntil
takes the items emitted by theObservable
from the beginning until the predicate function returnstrue
1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 5);
observable.takeUntil( v -> v==2)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed")
);Output
1
2
3
4Received:0
Received:1
Received:2
CompletedskipUntil
skips the items emitted by the sourceObservable
until a secondObservable
emits an item, at which pointskipUntil
begins to take the items of the sourceObservable
3.3 Conditional and Boolean Operators
Operators that evaluate one or more observables
or items emitted by Observables
All
determine that whether all items emitted by anObservable
meet some criteria and returns a boolean value based on tan evaluation of that item.All
returns anObservable
that emits only a single boolean value. The signature is:1
public final Observable<java.lang.Boolean> all(Func1<? super T,java.lang.Boolean> predicate)
Example:
1
2
3
4
5
6Observable<Integer> observable = Observable.range(0, 5);
observable.all( v -> v < 10)
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e)
);Output
1
true
contains
determine whether anObservable
emits a particular item or not.1
2
3
4
5
6Observable<Integer> observable = Observable.range(0, 2);
observable.contains( 3)
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e)
);Output
1
false
defaultIfEmpty
emits items from the sourceObservable
, or a default item if the sourceObservable
emits nothing.1
2
3
4
5
6Observable<Integer> observable = Observable.empty();
observable.defaultIfEmpty(1)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed"));Output
1
2Received:1
CompletedsequenceEqual
determine whether twoObservable
emit the same sequence of items.1
2
3
4
5
6
7
8Observable<Integer> ints = Observable.range(1, 3);
Observable<String> strings = Observable.just("1","2","3");
Observable
.sequenceEqual(ints,strings, (i,s) -> s.equals(String.valueOf(i)))
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e)
);Output
1
true
amb
operator. Given two or more sourceObservable
, emit all of the items from only the first of theseObservables
to emit an item or notification. It will pass throught the emissions and notifications of exactly one of theses Observables: the first one that sends a notification toAmb
, either by emitting an item or sending anonError
oronCompleted
notification.Amb
will ignore and discard the emissions and notifications of all of the other source observabeles.1
2
3
4
5
6
7
8Observable interval = Observable.interval(200,TimeUnit.MILLISECONDS);
Observable<Integer> ints = Observable.range(1, 3);
Observable intervalDelay = Observable.interval(1000,1,TimeUnit.MILLISECONDS);
Observable.ambArray(interval,intervalDelay,ints)
.subscribe(
v -> System.out.println("Received:" + v),
e -> System.out.println("Errir" + e),
() -> System.out.println("Completed"));Output
1
2
3
4Received:1
Received:2
Received:3
Completed
3.4 Transforming observables
Operators that transform items that are emitted by an Observable
. It’s common to transform the items to the format that we want.
map
is the basic transformation operator. It takes a function which takes an item and return a new item of any type. The returned observable is composed of the values returned by the transformation function.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(1, 3);
observable.map(i -> "String: "+i)
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4String: 1
String: 2
String: 3
CompletedflatMap
transformation operator takes items from the source observable and for each of them, returns a new observable that emits the new values. The returned observables will be flatten into a singleObservable
.flatMap
merges the emissions of these resulting observables, emitting these merged result as its own sequence.Note that
flatMap
merges the emissions of these observables, so that they may interleave.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(1, 3);
observable.flatMap( i -> Observable.range(0,i))
.subscribe(
result -> System.out.println(result),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5
6
70
0
1
0
1
2
Completedbuffer
periodically gather items emitted by an Observable into bundles and emit these bundles rather then emitting the items one at a time.buffer(int count)
emits ono-overlapping buffers in the forms ofList
s, each of which contains at mostcount
items from the source observables(the final emittedList
may have fewer thancount
items).1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 5);
observable.buffer(2)
.subscribe(
list -> System.out.println(list),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4[0, 1]
[2, 3]
[4]
Completedbuffer(int count, int skip)
creates a new buffer starting with the first emitted item from the source observable, and everyskip
items thereafter, and fills each buffer withcount
items. It emits these buffer asList
s. Depending on the values ofcount
andskip
these buffers may overlap(multiple buffers may contains the same item), or they may have gaps.
Examplebuffer(3,2)
means creates buffers that each buffer contains 3 elements, and skip 2 items to create a new buffer.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 10);
observable.buffer(3,2)
.subscribe(
list -> System.out.println(list),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
3
4
5
6[0, 1, 2]
[2, 3, 4]
[4, 5, 6]
[6, 7, 8]
[8, 9]
Completed
groupBy
divide an Observable into a set of Observables that each emit a different subset of items from the original observable.
Example: group a range of numbers from 0 to 9 by even and odd. Print the last item of each group.1
2
3
4
5
6
7Observable<Integer> observable = Observable.range(0, 10);
observable.groupBy(v -> v% 2 == 0)
.subscribe(
group -> group.takeLast(1).subscribe(v-> System.out.println(v)),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Output
1
2
39
8
Completed
3.5 Aggregate operators
Operators that operate on the entire sequence of items emitted by an Observable. Operators like Count
, Average
, Max
, Min
and Sum
return a sequence with a single value.
4. Reference :
http://www.vogella.com/tutorials/RxJava/article.html
http://www.baeldung.com/rx-java
https://github.com/Froussios/Intro-To-RxJava
https://code.tutsplus.com/tutorials/getting-started-with-rxjava-20-for-android--cms-28345