https://gank.io/post/560e15be2dca930e00da1083
(RxJava1.0版本,作者由浅入深的讲解,十分地细致。)

一、RxJava是什么

RxJava本质上是一个异步操作库,是一个能让你用极其简洁的逻辑去处理繁琐复杂任务的异步事件库。

android开发的操作就是在子线程和主线程切来切去,这样一来,会出现很多的嵌套,导致代码不易阅读和维护。而RxJava/RxAndroid完美解决了这一问题。

二、RxJava 的观察者模式

RxJava原理是观察者模式,它有四个重要的概念:
- Observable (可观察者,即被观察者)
- Observer (观察者)
- subscribe (订阅)
- 事件

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
- 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

三、RxJava 的基本用法

1) 创建 Observer (处理事件)

Observer 即观察者,它决定事件触发的时候将有怎样的行为。创建观察者可以实现接口Observer或者继承抽象类Subscriber。

Subscriber在Observer之上扩展了2个方法:
- onStart(): 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。它总是在 subscribe 所发生的线程被调用,而不能指定线程。
- unsubscribe():取消订阅, 移除Observable持有Subscriber的引用,减小内存泄露的风险。

2) 创建 Observable (触发事件)

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。
可以通过Observable.create()、Observable.just()、Observable.from()等方法创建Observable对象。查看源码可以知道just调用from,from调用create方法。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

注意上面的调用顺序:Observable传入了一个OnSubscribe内部类,它的call方法被调用(observable.subscribe方法会调用call),然后call方法又会调用subscriber的方法。这样就形成一条完整的回调链路。如下图所示:

3) Subscribe (订阅)

Subscription subscription = observable.subscribe(observer);
// 或者:
Subscription subscription = observable.subscribe(subscriber);

订阅后返回Subscription对象

注意这里subscribe()和观察者设计模式的意思刚好相反,本来应该是“观察者”订阅“被观察者”,但是作者为了实现响应式编程的API才将其写反。

  • 订阅方法observable.subscribe(observer)的源码:
public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        return subscribe(new Subscriber<T>() {

            @Override
            public void onCompleted() {
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onNext(T t) {
                observer.onNext(t);
            }

        });
    }

可以看到observable.subscribe内部都是订阅Subscriber,只不过如果被订阅的是Observer接口,只会回调其onCompleted、onError、onNext方法。

同时subscribe() 还支持不完整定义的回调subscribe(Action1 ...),可以避免给用户不必要的回调。

四、线程控制 —— Scheduler

1) Scheduler的API

subscribeOn():指定事件产生的线程
observeOn():指定事件消费的线程

2) Scheduler的原理

牵涉到后面的知识,lift()原理。

五、变换

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

1) API

  • map
Observable.just("images/logo.png") // 输入类型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });
  • flatMap
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

flatMap() 常用于嵌套的异步操作

networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 处理显示消息列表
            showMessages(messages);
        }
    });
  • throttleFirst
    防抖动

  • compose
    防抖动

2) 原理

变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。

在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

map变换为例:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

}


public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();

                        //这里的onSubscribe为什么是旧的Observable的变量?为什么不是当前类?
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

0 条评论

发表回复

您的电子邮箱地址不会被公开。