前言
上篇文章讲解了 RxJava2.0的最基本使用,在本节中主要看下RxJava的线程控制。
1. 概述
2. 下边演示上游和下游在同一个线程
正常情况,上游和下游在同一个线程,也就是说上游在那个线程发送事件,下游就在哪个线程接收事件,下边通过示例代码演示:
/**
* 在主线程中创建一个上游 Observable 发送事件,则上游就在主线程中发送事件
* 在主线程中创建一个下游 Observer 接收事件,则下游就在主线程中接收事件
*/
public static void demo1(){
// 创建一个上游:Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e("TAG" , "Observable thread is : " + Thread.currentThread().getName()) ;
Log.e("TAG" , "emit 1") ;
emitter.onNext(1);
}
}) ;
// 创建一个下游:Observer
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG" , "Observer thread is : " + Thread.currentThread().getName()) ;
Log.e("TAG" , "next : " + integer) ;
}
} ;
// 建立连接
observable.subscribe(consumer);
}
运行结果如下:
cn.novate.rxjava2 E/TAG: Observable thread is : main
cn.novate.rxjava2 E/TAG: emit 1
cn.novate.rxjava2 E/TAG: Observer thread is : main
cn.novate.rxjava2 E/TAG: next : 1
以上验证了,上游、下游在同一个线程工作;
但是这样肯定是不能满足我们的需求,我们更多的是需要在子线程中做耗时操作,然后切回到主线程中进行UI更新,如下图所示
上图黄色表示子线程,深蓝色表示主线程
要达到在子线程中做耗时操作,然后切回主线程进行UI更新,只需要让上游在子线程中发送事件,然后把下游切回到主线程中接收事件就ok,示例代码如下:
/**
* 让上游在子线程中发送事件,然后把下游切回到主线程接收事件
*/
public static void demo2(){
// 创建一个上游:Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e("TAG" , "Observable thread is " + Thread.currentThread().getName()) ;
Log.e("TAG" , "emit 1") ;
emitter.onNext(1);
}
}) ;
// 创建一个下游:Observer
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG" , "Observer thread is " + Thread.currentThread().getName()) ;
Log.e("TAG" , "next :" + integer) ;
}
} ;
// 建立连接
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
运行结果如下
cn.novate.rxjava2 E/TAG: Observable thread is RxNewThreadScheduler-1
cn.novate.rxjava2 E/TAG: emit 1
cn.novate.rxjava2 E/TAG: Observer thread is main
cn.novate.rxjava2 E/TAG: next :1
subscribeOn()指的是上游发送事件的线程,observeOn()指的是下游接收事件的线程;
注意:
1>:多次调用subscribeOn()方法,只有第一次有效,其余的被忽略;
2>:多次调用observeOn()方法,都是可以的,也就是说每调用一次observeOn(),下游线程就会切换一次
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
在RxJava中,已经内置很多线程供我们选择,比如:
1>:Schedulers.newThread():常规的线程;
2>:Schedulers.io():io操作的线程,用于联网请求、读写文件等操作;
3>:Schedulers.computation():CPU计算密集型操作,用于大量计算操作;
4>:AndroidSchedulers.mainThread():主线程;
在RxJava内部使用线程池维护这些线程,效率比较高;
3. 实践
在我们开发过程中,一般都是把耗时操作在子线程中做,比如读写文件、读写数据库、联网请求等操作,下边做一个登录示例来演示,如何把线程切换到子线程中进行耗时操作,然后又是如何把线程切换到主线程中进行更新UI:
/**
* 具体示例:
* 通过演示登录成功与失败的功能来演示:
* 如何把线程切换到子线程中让其执行耗时操作,然后再次切换到主线程中更新UI
*/
public static void demo3(final Context context){
Api api = RetrofitProvider.get().create(Api.class) ;
api.login(new LoginRequest())
.subscribeOn(Schedulers.io()) // 切换到io线程(子线程)中进行联网请求
.observeOn(AndroidSchedulers.mainThread()) // 在耗时操作进行完之后切换到主线程中处理请求结果,来更新UI
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(LoginResponse value) {
}
@Override
public void onError(Throwable e) {
Log.e("TAG" , "登录失败") ;
}
@Override
public void onComplete() {
Log.e("TAG" , "登录成功") ;
}
});
}
注意:上篇文章我们讲到了Disposable,说如果调用 Disposable.dispose()方法,会切断水管,让下游接收不到事件。既然接收不到事件,那么就不能更新UI,因此可以在这个Activity中保存 Disposable,在Activity退出时,调用Disposable.dispose()方法来切断即可;
如果有多个 Disposable,RxJava内置了一个 CompositeDisposable容器,每次得到一个 Disposable,就调用 CompositeDisposable.add()方法把它添加到容器中,在Activity退出时候,直接调用 CompositeDisposable.clear()方法,直接切断所有水管即可;
以上就是教程二的全部内容。