RxJava
什么是 RxJava
- Observable:被观察者
- Observer:观察者
- subscribe:订阅者
- Observable 就是事件的发出者、可以理解成管道的
上游
、事件源- Observer 就是事件的接受者、可以理解成管道的
下游
、接受者- subscribe 就是 Observable 与 Observer 之间的连接者
这三者之间的对应关系就是 RxJava
RxJava 也是观察者模式的扩展、普通的观察者模式只负责事件的分发与接受、而 RxJava 在观察者模式上增强了 onComplete、onError。
Rxjava 中的 Flowable 和 Backpressure
- Flowable:事件源
上游
- Subscriber:接收者
下游
- subscribe:就是 Flowable 与 Subscriber 之间的连接者
Subscription:Backpressure 中控制数据处理能力的开关和 Disposable(开关) 类似、s.request(Long.MAX_VALUE)。
Flowable 和 Backpressure 组合使用就是对数据发送的各种策略
如果 Flowable
上游
产生大量数据、导致 Subscriber下游
接收数据跟不上时内存就会 OOM 、不采用Flowable
的解决思路就是:
- 一是从
数量
上进行治理、减少发送进水缸里的事件- filter // 数据过滤
- 二是从
速度
上进行治理、减缓事件发送进水缸的速度- 延时发送数据
- sample(2, TimeUnit.SECONDS) //sample 取样
创建操作符
|
|
ObservableEmitter: Emitter 是发射器的意思、这个就是用来发出事件的、它可以发出三种类型的事件、通过调用 emitter 的 onNext(T value)、onComplete() 和 onError(Throwable error) 就可以分别发出 next 事件、complete 事件和 error 事件。
Disposable: 这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava 中怎么去理解它呢、 对应于上面的水管的例子、 我们可以把它理解成两根管道之间的一个机关、 当调用它的 dispose() 方法时、 它就会将两根管道切断、 从而导致下游收不到事件.
- create:创建一个常规的 Observable
- just:将一个/一组对象转换为一个可发射的 Observable
- defer:在订阅之前不创建 Observable
- from:将其他对象转换为 Observable
- start:创建一个可以发出函数返回值的 Observable
- timer:创建一个延迟发出的 Observable
- range:创建一个发射一系列连续整数的 Observable
- repeat:创建一个可重复发射特定序列的 Observable
- interval:创建一个特定时间隔的一系列整数 Observable
- empty/never/throw:创建非常精确和有限行为的 Observable
转换操作符
- Map:一对一 对象转换
- FlatMap:一对多 对象转换
- GroupBy:分组转换
- Buffer:定期将 Observable 中的数据打包、然后发出整个数据
- Window:分段发送 Observable 中的数据包(类似 Buffer )
- Scan:按顺序对 Observable 中的数据统计发送(斐波那契数列)
过滤操作符
- Debounce:过滤发射速率过快的数据项
- Distinct:控制 Observable 发出重复的数据
- ElementAt:发射指定位置的数据
- Filter:仅发射通过条件的数据
- First:发射第一个数据
- IgnoreElements:不发射任何数据、但会走 onComplete 和 onError
- Last:发射最后一个数据
- Sample:采集时间间隔中最近的数据
- Skip:发射跳过指定 N 项后的数据
- SkipLast:发射去掉最后指定 N 项的数据
- Take:只发射指定的前 N 项数据
- TakeLast:只发射指定的最后 N 项数据
组合操作符
- And/Then/When:将两个或多个 Observable 发射的数据集合并到一起
- CombineLatest:结合它们最近发射的数据、然后发射这个函数的返回值
- Join:有一个 Observable 发射了数据、另一个 Observable 就结合发射的数据
- Merge:合并组合
- StarWith:发射数据之前发送指定的序列
- Switch:转换为单个 Observable 、发射最近的数据
- Zip:通过指定函数将多个 Observable 组合在一起、结果未每个组合发射单个项目
异常操作符
- Catch
- onError 通过继续顺序无措的恢复通知
- Retry:如果 Observable 发送了一个 onError 的通知、重新订阅它
还有更多的操作符。。。
辅助操作符
条件和布尔操作符
算术和聚合操作符
异步操作符
连接操作符
转换操作符
阻塞操作符
字符串操作符
线程控制 调度器 Scheduler
调度器 Scheduler: 可以实现多线程功能、指定操作符(或者特定的 Observable)在特定的调度器上执行
调度器的种类
调度器类型 | 效果 |
---|---|
Schedulers.computation() | 用于普通计算任务、如果事件循环和回调处理不要用 IO 操作 |
Schedulers.from(executor) | 使用指定的 Executor 作为调度器 |
Schedulers.immediate() | 在当前线程立即开始执行任务 |
Schedulers.newThread() | 为每个任务创建一个新的线程 |
Schedulers.trampoline() | 当其它排队的任务完成后、在当前线程排队开始执行 |
Schedulers.io() | 用于 IO 耗时任务、如异步阻塞 IO 操作、它默认是一个 CachedThreadScheduler 很像一个缓存线程池 |
SubscribeOn:subscribeOn(Schedulers.newThread()) 指定的是上游发送事件的线程、多次指定上游线程只有第一次指定的有效、其余忽略
ObserveOn:observeOn(AndroidSchedulers.mainThread()) 指定的是下游接收事件的线程、多次指定下游线程是可以的、也就是说没调用一次 observeOn() 、下游线程就会切换一次