https://gank.io/post/560e15be2dca930e00da1083
(RxJava1.0版本,作者由浅入深的讲解,十分地细致。)
目录
一、RxJava是什么
RxJava本质上是一个异步操作库,是一个能让你用极其简洁的逻辑去处理繁琐复杂任务的异步事件库。
android开发的操作就是在子线程和主线程切来切去,这样一来,会出现很多的嵌套,导致代码不易阅读和维护。而RxJava/RxAndroid完美解决了这一问题。
二、RxJava 的观察者模式
RxJava原理是观察者模式,它有四个重要的概念:
- Observable (可观察者,即被观察者)
- Observer (观察者)
- subscribe (订阅)
- 事件
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
- 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
三、RxJava 的基本用法
1) 创建 Observer (处理事件)
Observer 即观察者,它决定事件触发的时候将有怎样的行为。创建观察者可以实现接口Observer或者继承抽象类Subscriber。
Subscriber在Observer之上扩展了2个方法:
- onStart(): 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。它总是在 subscribe 所发生的线程被调用,而不能指定线程。
- unsubscribe():取消订阅, 移除Observable持有Subscriber的引用,减小内存泄露的风险。
2) 创建 Observable (触发事件)
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。
可以通过Observable.create()、Observable.just()、Observable.from()
等方法创建Observable对象。查看源码可以知道just调用from,from调用create方法。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
注意上面的调用顺序:Observable传入了一个OnSubscribe内部类,它的call方法被调用(observable.subscribe
方法会调用call),然后call方法又会调用subscriber的方法。这样就形成一条完整的回调链路。如下图所示:
3) Subscribe (订阅)
Subscription subscription = observable.subscribe(observer);
// 或者:
Subscription subscription = observable.subscribe(subscriber);
订阅后返回Subscription对象
注意这里subscribe()和观察者设计模式的意思刚好相反,本来应该是“观察者”订阅“被观察者”,但是作者为了实现响应式编程的API才将其写反。
- 订阅方法
observable.subscribe(observer)
的源码:
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}
可以看到observable.subscribe
内部都是订阅Subscriber,只不过如果被订阅的是Observer接口,只会回调其onCompleted、onError、onNext
方法。
同时subscribe() 还支持不完整定义的回调subscribe(Action1 ...),可以避免给用户不必要的回调。
四、线程控制 —— Scheduler
1) Scheduler的API
subscribeOn()
:指定事件产生的线程
observeOn()
:指定事件消费的线程
2) Scheduler的原理
牵涉到后面的知识,lift()原理。
五、变换
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
1) API
- map
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
- flatMap
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 常用于嵌套的异步操作
networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 处理显示消息列表
showMessages(messages);
}
});
- throttleFirst
防抖动 -
compose
防抖动
2) 原理
变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。
在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
以map
变换为例:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
//这里的onSubscribe为什么是旧的Observable的变量?为什么不是当前类?
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
0 条评论