RxAndroid

Libraries for ReactiveX on Android

by Nick Fedesna

Nested Callbacks (OK, BUT NOT REALLY OK)


    sApi.loginUser(credentials).enqueue(new Callback<UserToken>() {

        public void onResponse(Response<UserToken> response) {
            String token = response.body().token;
            // save User token

            sApi.getUserProfile(token).enqueue(new Callback<UserProfile>() {
                public void onResponse(Response<UserProfile> response) {
                    // do something with UserProfile
                }

                public void onFailure(Throwable t) {
                    // notify error getting UserProfile
                }
            });
        }

        public void onFailure(Throwable t) {
            // notify login failure
        }
    });

                

Nested Futures (OMG, please no!)

Enter: ReactiveX

Offering efficient execution and composition
of asynchronous and event-based sequences
by providing a collection of operators
capable of filtering, transforming and combining
Observable emissions and notifications
while abstracting away concerns about things like
low-level threading, synchronization, thread-safety,
concurrent data structures, and non-blocking I/O.

Wait, what's an
Observable  ??

Pull v. Push

event Iterable (pull) Observable (push)
data T next() onNext(T)
error throws Exception onError(Exception)
complete !hasNext() onCompleted()

I subscribe to all this, so now what?

Observer<T>

emissions onNext(T)
notifications onError(Throwable e)
onComplete()

Subscribe


        Observable<T>.subscribe(Action1<? super T> onNext,
                                Action1<Throwable> onError,
                                Action0 onComplete)

        Observable<T>.subscribe(Observer<? super T> observer)
            

Tons of Operators

Transforming Items Map, Buffer, FlatMap, GroupBy, Scan, Window Filtering Debounce, Distinct, Filter, First, IgnoreElements, Last, Sample Combining And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip Conditional & Boolean All, Amb, Contains, DefaultIfEmpty, Skip, Take Mathematical & Aggregate Average, Concat, Count, Max, Min, Reduce, Sum Utility Delay, Do, Materialize, TimeInterval, Timeout, Timestamp, Using

Schedulers & Threading

subscribeOn(Scheduler)
observeOn(Scheduler)

I'm a visual learner, what's all this look like?

Simple Example


        Integer[] items = { 0, 1, 2, 3, 4, 5 };
        Observable.from(items)
                  .map(i -> (i * i) + i + 1)
                  .subscribeOn(Schedulers.computation())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(this::itemEmitted,
                             this::handleError,
                             () -> Timber.d("Sequence complete"));

        // Output: 1, 3, 7, 13, 21, 31, Sequence complete
            

If you're not using any logging utility, you should have a look at Timber!

Nested Callbacks (Remember?)


    sApi.loginUser(credentials).enqueue(new Callback<UserToken>() {

        public void onResponse(Response<UserToken> response) {
            String token = response.body().token;
            // save User token

            sApi.getUserProfile(token).enqueue(new Callback<UserProfile>() {
                public void onResponse(Response<UserProfile> response) {
                    // do something with UserProfile
                }

                public void onFailure(Throwable t) {
                    // notify error getting UserProfile
                }
            });
        }

        public void onFailure(Throwable t) {
            // notify login failure
        }
    });

                

Rx version:


        private void loginUser(User user) {
            sApi.loginUserRx(credentials)
                .map(response -> response.token)
                .doOnNext(this::saveUserToken)
                .flatMap(sApi::getUserProfileRx)
                .subscribe(this::saveUserProfile,
                           this::handleError);
        }

        private void saveUserToken(String token) {…}
        private void userLoggedIn(User user) {…}
        private void handleError(Throwable throwable) {…}
            

RxAndroid

compile 'io.reactivex:rxandroid:1.0.1'

AndroidSchedulers.mainThread()
HandlerScheduler.from(handler)


RxJava

compile 'io.reactivex:rxjava:1.0.14'
Best to include so you have latest RxJava because RxAndroid releases will be rare.
Also, it's where a plethora of good documentation (the wiki) lives.

RxLifecycle

Prevent leaking via un-subscription helpers & base classes

compile 'com.trello:rxlifecycle:0.2.0'
compile 'com.trello:rxlifecycle-components:0.2.0'

RxBinding

RxJava bindings for creating Observables
from a ton of Android UI widget events

compile 'com.jakewharton.rxbinding:rxbinding:0.2.0'
(rxbinding-support-v4, rxbinding-appcompat-v7, rxbinding-design, rxbinding-recyclerview-v7)

Example: Log-in validation


    Observable<Boolean> emailValid = RxTextView.textChanges(email)
            .map(text -> Patterns.EMAIL_ADDRESS.matcher(text).matches());
    Observable<Boolean> pwd1Valid = RxTextView.textChanges(pwd1)
            .map(text -> text.length() > 6);
    Observable<Boolean> pwd2Valid = RxTextView.textChanges(pwd2)
            .map(text -> TextUtils.equals(text, pwd1.getText()));

    Observable.combineLatest(emailValid, pwd1Valid, pwd2Valid
                             (email, pwd1, pwd2) -> email && pwd1 && pwd2)
            .subscribe(signIn::setEnabled);
                

The code highlighting plugin doesn't do this justice
so check out this and more example code here:
github.com/nick-fedesna/RxAndroidExample


RxAndroid

by Nick Fedesna