Maxiee 的 RxJava 学习指南 (2)

在上一篇中主要对官方文档进行了学习.

如果只看文档里的知识而不去实践, 自己只是知道而非理解, 必须要在实际项目中进行实践, 才能得到真知.

我采取的办法就是找一个实际的开源项目来学习, 最终我选择了 RxJava-Android-Samples 这个项目.

RxJava-Android-Samples

选择 RxJava-Android-Samples 这个项目的原因通过它的介绍可以看出:

This is a repository with real-world useful examples of using RxJava with Android.

这不正是我们需要的吗! 感谢 kaushikgopal 的无私奉献.

下面我们通过对这个项目的源码阅读, 来学习 RxJava 的实践知识.

整个代码结构十分清晰, 在 MainFragment 上点击对应的按钮进入对应 Demo 的 Fragment.

后续的文章中会针对代码的要点进行讲解, 因此建议将这个项目 Clone 下来结合文章一起看, 否则可能会觉得有些跳跃, 但是结合代码看就很容易明白了.

后台任务与并发

本文中, 我们学习 RxJava-Android-Samples 的第一个 Demo -- 后台任务与并发, 它位于 ConcurrencyWithSchedulersDemoFragment.

这个页面的功能是点击一个按钮, 然后会在后台做一个延时操作 (以定时器模拟), 待定时操作完成后, 在界面中打一段 Log.

这部分代码大体流程如下图所示:

88CC6C58-10BD-4E62-98F7-1D49C4E0C0EC

下面来具体分析.

创建 Observable

创建 Observable 的代码位于 _getObservable() 方法, 它的代码为:

private Observable<Boolean> _getObservable() {
    return Observable.just(true)
        .map(
            aBoolean -> {
                _log("Within Observable");
                _doSomeLongOperation_thatBlocksCurrentThread();
                return aBoolean;
            });
}

其中:

  • 源 Observable 发出一个 true 数据
  • 数据传入 map 运算符
  • map 运算符的 _doSomeLongOperation_thatBlocksCurrentThread() 方法里执行了一个阻塞线程的耗时操作
  • 当执行完毕后, 返回 aBoolean 也就是传递数据

_doSomeLongOperation_thatBlocksCurrentThread() 的代码为:

private void _doSomeLongOperation_thatBlocksCurrentThread() {
    _log("performing long operation");

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        Timber.d("Operation was interrupted");
    }
}

功能就是一个阻塞线程 3s.

从中我们可以看出, 这个 map 操作符里面, 就是执行耗时操作的地方.

创建 Observer

创建 Observer 的代码位于 _getDisposableObserver() 方法中, 它的代码为:

private DisposableObserver<Boolean> _getDisposableObserver() {
    return new DisposableObserver<Boolean>() {

        @Override
        public void onComplete() {
            _log("On complete");
            _progress.setVisibility(View.INVISIBLE);
        }

        @Override
        public void onError(Throwable e) {
            Timber.e(e, "Error in RxJava Demo concurrency");
            _log(String.format("Boo! Error %s", e.getMessage()));
            _progress.setVisibility(View.INVISIBLE);
        }

        @Override
        public void onNext(Boolean bool) {
            _log(String.format("onNext with return value \"%b\"", bool));
        }
    };
}

其中, onComplete, onErroronNext 都比较易懂, 这里有一个问题就是 DisposableObserver 是什么? Disposable 是什么意思?

DisposableObserver

DisposableObserver 的在线注释文档中做出了说明:

DisposableObserver 是一个抽象的 Observer, 它通过实现了 Disposable 接口允许异步取消.

现在知道 DisposableObserver 是对被观察者做了什么扩展了, 它支持异步取消.

所有 DisposableObserver 预先实现的 final 方法都是线程安全的.

取消 DisposableObserver 需要调用它的 dispose() 方法, 这个方法既可以在 DisposableObserver 之外调用, 也可以在它的 onNext 里面调用, 实现自己取消自己.

我的第一反应就是 DisposableObserver 很适合配合 Fragment 的生命周期释放资源, 这里的确也是这样用的, 这个放在后面部分再说.

建立订阅关系

有了被观察者, 也有了观察者, 下面要做的就是建立订阅关系, 具体的代码为:

_getObservable()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(d);

这里有一个线程切换的操作. 这里我把上一篇中梳理的规则再摆过来, 作为参考:

  • subscribeOn 出现的顺序位置没有关系, 只要指定了 subscribeOn, 那么被观察者就在它指定的线程上运行
  • 在整个链条中 subscribeOn 只调用一次
  • observeOn 可以多次调用, 每调用一次, 链条下游的就切换一次线程

参照着规则看代码, 就理解了.

除此之外, 我的第一印象是, 订阅成功后, 会返回一个订阅对象, 但看代码并没有返回. 这是为什么呢? 通过查阅文档, 原来这是 RxJava 2 的一点变化,

订阅 (Subscription) 的变化

在 RxJava 1 中, rx.Subscription 负责流和资源的生命周期管理.

在 RxJava 2 中 Subscription 这个名称改作他用了, Reactive-Streams 规范使用这个名字专门用于表示源于消费者之间的交互点 (interaction point), 类名为 org.reactivestreams.Subscription, 允许请求一个正的数量值并允许取消序列.

既然 Subscription 这个名称被占用了, 在 RxJava 2 中原本的订阅概念改了一个新名字 io.reactivex.Disposable.

看到这里, 就更加理解 DisposableObserver 的意思了, 原来 DisposableObserver 中的那个异步取消, 就是新的取消订阅关系.

变化中还有一点, 是原来的 CompositeSubscription 改名为 CompositeDisposable, 这个东东在下面会用到.

生命周期管理

上一节建立订阅关系后, 点击按钮就能够正常执行异步操作了. 这一节主要讨论生命周期.

上一节建立订阅关系后, 在后面紧跟着的还有一行代码:

_disposables.add(d);

_disposables 这个成员的定义:

private CompositeDisposable _disposables = new CompositeDisposable();

这就是上一节最后提到的.

这样, 当建立一个订阅关系后, 会把 DisposableObserver 添加到 CompositeDisposable 中, 等到 Fragment 生命周期结束时, 一并清掉:

@Override
public void onDestroy() {
    super.onDestroy();
    _disposables.clear();
}

这个 clear 里面的内部实现就是对所有添加进去的 Disposable 挨个执行 dispose 进行取消操作.

从而实现 Fragment 对多个订阅关系进行统一生命周期管理, 在生命周期结束时统一取消的逻辑.

总结

在这一篇中, 完成了对第一个 Demo 的学习.

通过学习, 对很多概念有了更加全面的认识.

这一篇中主要包含了以下重点内容:

  • 建立基本的观察者-被观察者订阅关系

  • 用 RxJava 来执行异步操作, 可以替代 AsyncTask

  • Fragment 结合 CompositeDisposable 进行多个订阅关系的生命周期统一管理

在下一篇中, 我将继续对剩下 Demo 进行代码阅读, 希望有了前两篇的基础, 学习的脚步能够更快一些.

微信公众号

我开通了微信公众号: MaxieeTalk, 欢迎订阅, 及时收到我的最新技术文章~

打赏

如果这篇文章对您有所帮助, 欢迎扫描下面支付宝二维码请我喝杯可乐~

您的打赏将会极大提高我的写作热情, 激励我更新更多更好的博客.

103ED89B-4DCA-450E-9E42-FE5A1129A03D