RxJava library
Essence is that the Observable object emmits events and Observer object handles them.
A Subject object incorrporate functionality of subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable. As observable it can reemit the events from multiple subscribers to anyone observing it.
There are three kinds of events:
- values - just some values
- completion - indicates that no more values
- error - indicates that sequence of events is failed
You can handle stream of values by operators in functional style like in Java stream API, i.e. transform, filter and etc.
You can specify a thread where values will be emitted and thread where values will be handled by observer.
You must dispose the observable when it is no longer needed like in first example.
Add dependency
implementation ("io.reactivex.rxjava3:rxjava:3.0.13")
Then use like below:
// general syntax
// source.op1().op2().opN().subscribe(consumer)
val d = Observable.range(1,10)
.map { it*it}
.subscribe{ println(it) }
...
d.dispose();
Flowable.fromCallable {
Thread.sleep(1000) // imitate expensive computation
"Done"
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe({ x: String? -> println(x) }) { obj: Throwable -> obj.printStackTrace() }
// android example
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */)
// general syntax
// source.op1().op2().opN().subscribe(consumer)
Disposable d = Observable.range(1,10)
.map(it -> it*it)
.subscribe(it -> System.out.println(it));
...
d.dispose();
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
// android example
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
This code looks like iterator pattern but there is differrence. With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. With an Observable the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.
RxJava on GitHub. Read more about classes in API documentation. There is also android add-on RxAndroid.
backpressure
When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.
observables
class | description |
---|---|
Observable | Class is designated to the non-backpressured operations (short sequences, GUI interactions, etc.). |
Flowable | Class is designated to support backpressure. |
Maybe | It can only emit either a single value, no value at all or an error. |
Single | It can only emit either a single successful value or an error (there is no onComplete notification as there is for an Observable). |
subjects
PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.subscribe(System.out::println);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
// output
// 2
// 3
// 4
AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted(); // output 2
class | description |
---|---|
PublishSubject | Emits items to currently subscribed Observers. |
ReplaySubject | Caches all events to replays them to current and late observers. |
BehaviorSubject | Remembers only the last value (like ReplaySubject with buffer size 1). |
AsyncSubject | Emits the very last value followed by a completion event or the received error to observers. Its use is to emit a single value and immediately complete. |