程序员

RxJava操作符系列六

RxJava

RxJava操作符系列传送门

RxJava操作符源码
RxJava操作符系列一
RxJava操作符系列二
RxJava操作符系列三
RxJava操作符系列四
RxJava操作符系列五

前言

在上几篇文章我们介绍了一些RxJava创建,转换,过滤,组合,辅助的一些操作符,今天我们继续去学习RxJava的一些条件和布尔操作符,在此感谢那些阅读文章并提出建议的同学,其实写技术文章主要是提升自己的知识面以及对知识的理解程度,也有人说有些地方描述还不够清晰,文笔有待提高,是的,确实是这样,描述不清晰我感觉不仅是文笔有待提高(提升空间很大),对技术的理解也需要提高(技术渣蜕变中…)。不过我相信,只要我能写出来,这就是一种进步,就是一个积累的过程,我会努力,加油。好了,不扯了,进入正题。

All

该操作符是判断Observable发射的所有数据是否都满足某一条件,它接收一个Fun1方法参数,此方法接收原始数据,并返回一个布尔类型值,如果原始Observable正常终止并且每一项数据都满足条件,就返回true;如果原始Observable的任何一项数据不满足条件就返回False。当某数据不满足条件(返回false)时之后的数据不再发射。如下示例代码

 Observable.just(1,2,3,4).all(new Func1() {
            @Override
            public Boolean call(Integer integer) {
                Log.e(TAG, "call: "+integer );
                return integer<2;
            }
        }).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " );
            }

            @Override
            public void onNext(Boolean aBoolean) {
                Log.e(TAG, "onNext: "+aBoolean );
            }
        });

输出日志信息

call: 1
call: 2
onNext: false
onCompleted:

Amb

当我们传递多个Observable(可接收2到9个Observable)给amb时,它只发射其中一个Observable的数据和通知:首先发送通知给amb的那个,不管发射的是一项数据还是一个onError或onCompleted通知。amb将忽略和丢弃其它所有Observables的发射物。

这里写图片描述
 Observable observable= Observable.just(1,2,3).delay(500,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
       Observable observable1= Observable.just(4,5,6).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
        Observable.amb(observable,observable1).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "  onNext: "+integer);
            }
        });

输出日志信息

onNext: 4
onNext: 5
onNext: 6
onCompleted:

有一个类似的对象方法ambWith。如上示例中Observable.amb(observable,observable1)改为observable.ambWith(observable1)是等价的。

Contains

我们可以给Contains传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false,即Observable发射的数据是否包含某一对象。
示例代码

Observable.just(1,2,3,4).contains(2)
                .subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "onCompleted: " );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " );
                    }

                    @Override
                    public void onNext(Boolean aBoolean) {
                        Log.e(TAG, "onNext: "+aBoolean);
                    }
                });

输出日志信息

onNext: true
onCompleted:

isEmpty也是一种判断是否包含的操作符,不同的是它判断原始Observable是否没有发射任何数据。

exists操作符与contains操作符作用相同,但是它接收的是一个Func1函数,可以指定一个判断条件,当发射的数据有满足判断条件(返回true)就发射true,否则为false。

DefaultIfEmpty

该操作符简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就发射一个我们提供的默认值。

Observable.empty().defaultIfEmpty(1).subscribe(new Subscriber() {

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted: " );
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: ");
            }

            @Override
            public void onNext(Object object) {
                Log.e(TAG, "onNext: "+object);
            }
        });

输出日志信息

onNext: 1
onCompleted:

defaultIfEmpty只有在没有发射数据时才会有效果,若发射的有数据,和不使用此操作符效果一样。

SequenceEqual

该操作符会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false

  Observable observable=Observable.just(1,2,3);
        Observable observable1=Observable.just(1,3,2);
        Observable.sequenceEqual(observable,observable1)
                .subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "onCompleted: " );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " );
                    }

                    @Override
                    public void onNext(Boolean aBoolean) {
                        Log.e(TAG, "onNext: " +aBoolean);
                    }
                });

执行后输出

onNext: false
onCompleted:

SkipUntil

SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。

这里写图片描述
Observable observable = Observable.just(1, 2, 3,4,5,6).delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
        Observable observable1 = Observable.just(20, 21, 22).delay(130, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
        observable.skipUntil(observable1)
               .subscribe(new Subscriber() {
                   @Override
                   public void onCompleted() {
                       Log.e(TAG, "onCompleted: " );
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.e(TAG, "onError: " );
                   }

                   @Override
                   public void onNext(Object o) {
                       Log.e(TAG, "onNext: "+o );
                   }
               });

输出日志信息

onNext: 4
onNext: 5
onNext: 6
onCompleted:

由于上面两个Observable都是在一个新线程中,时间不可控,每次运行的结果一般不会相同。但是都会符合上面所述规则,可参考上图。

SkipWhile

该操作符也是忽略它的发射物,直到我们指定的某个条件变为false的那一刻,它开始发射原始Observable。切记是判断条件返回false时开始发射数据。
示例代码

 Observable.range(1,5).skipWhile(new Func1() {
            @Override
            public Boolean call(Integer integer) {
                Log.e(TAG, "call: "+integer);
                return integer<3;
            }
        }).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted: " );
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " );
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " +integer);
            }
        });

输出日志信息

call: 1
call: 2
call: 3
onNext: 3
onNext: 4
onNext: 5
onCompleted:

TakeUntil

如果我们理解了SkipUntil操作符了,那么这个操作符也就很好理解了,该操作符与SkipUntil有点相反的意思。

这里写图片描述

通过上图你应该也看出来和SkipUntil的区别,当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据(SkipUntil则是丢弃之前数据,发射之后的数据)。

Observable observable = Observable.just(1, 2, 3,4,5,6).delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
        Observable observable1 = Observable.just(20, 21, 22).delay(120, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
        observable.takeUntil(observable1)
                .subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "onCompleted: " );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " );
                    }

                    @Override
                    public void onNext(Object o) {
                        Log.e(TAG, "onNext: "+o );
                    }
                });

输出日志信息(每次执行的输出结果一般不相同)

onNext: 1
onNext: 2
onNext: 3
onNext: 4
onCompleted:

TakeWhile

该操作符发射原始Observable,直到我们指定的某个条件不成立的那一刻,它停止发射原始Observable(skipWhile此时开始发射),并终止自己的Observable。

 Observable.range(1,5).takeWhile(new Func1() {
            @Override
            public Boolean call(Integer integer) {
                Log.e(TAG, "call: "+integer);
                return integer<3;
            }
        }).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted: " );
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " );
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " +integer);
            }
        });

输出日志信息

call: 1
onNext: 1
call: 2
onNext: 2
call: 3
onCompleted:

今天的这篇文章到这里就结束了,若文中有错误的地方,欢迎指正。谢谢。

Proudly Powered by WordPress | WordPress Theme by Tidyhive