compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
返回一个可观察对象,该对象发出给定(常量引用)项的信号,然后完成
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("just:" + integer);
}
});
just:1
just:2
just:4
just:5
just:6
just:7
just:8
just:9
just:10
fromArray 和 Just 几乎是一样的效果,只不过 Just 限制 10 个以内,而 fromArray 并没有限制,查看得知 单个参数 Just 是自行创建 ObservableJust,而多个参数 Just 最终还是回调了 fromArray,这里不再过多演示
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//执行多次
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
//标记事件结束
emitter.onComplete();
//标记事件发送错误
//emitter.onError(new NullPointerException("不能为空"));
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("create:" + integer);
}
});
create:1
create:2
create:3
使用范围数据,指定输出数据的范围(1-40 的数值)
Observable.range(3, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("range:" + integer);
}
});
range:3
range:4
range:5
range:6
range:7
指定某一时刻进行数据发送
Observable.interval(10, 1, TimeUnit.SECONDS) // 先等待 10 秒,之后再每一秒发送一次,10 秒这个参数也可以不填,默认用间隔时间参数替代(这里示例 1 秒)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long l) throws Exception {
System.out.println("interval:" + l);
}
});
2019-03-20 15:30:33.331 interval:0
2019-03-20 15:30:34.331 interval:1
2019-03-20 15:30:35.331 interval:2
2019-03-20 15:30:36.331 interval:3
2019-03-20 15:30:37.331 interval:4
2019-03-20 15:30:38.331 interval:5
..................................
表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe() {}
表示观察者只对被观察者发送的 Next 事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext) {}
表示观察者只对被观察者发送的 Next 事件 & Error 事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
表示观察者只对被观察者发送的 Next 事件、Error 事件 & Complete 事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
表示观察者只对被观察者发送的 Next 事件、Error 事件 、Complete 事件 & onSubscribe 事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
表示观察者对被观察者所有的事件做出响应
public final void subscribe(Observer<? super T> observer) {}
onSubscribe:订阅的时候被调用,方法参数有 Disposable,可用于取消订阅
onNext(T item):Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于业务逻辑
onCompleted():正常终止,在没有遇到错误的情况下,Observable在最后一次调用onNext之后调用此方法
onError(Throwable e):当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出异常
观察者
Observable.just(1, 2, 3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
onSubscribe
onNext:1
onNext:2
onNext:3
onComplete
Observable.just(1, 2, 3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept:" + integer);
}
});
accept:1
accept:2
accept:3
订阅事件,被观察者必须指定了事件的接收者(观察者),整个事件流程才可以被启动
Disposable disposable = observable.subscribe(observer);
disposable.isDisposed();
disposable.dispose();
1build.gradle引入依赖
2
3compile 'io.reactivex:rxjava:1.0.14'
4compile 'io.reactivex:rxandroid:1.0.1'
1Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
2
3 //事件源,可以指定
4 @Override
5 public void call(Subscriber<? super String> subscriber) {
6 //执行多次
7 subscriber.onNext("第一次执行");
8 subscriber.onNext("第二次执行");
9 //标记事件结束
10 subscriber.onCompleted();
11 //标记事件发送错误
12 //subscriber.onError();
13 }
14});
1//from(T[]),返回的对象一般都是数值类型
2Integer[] items = {1, 2, 3, 4, 5, 6, 7, 8, 9};
3Observable observable = Observable.from(items);
1//指定某一时刻进行数据发送
2Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);//每隔一秒发送数据
1//just(T...),处理任意类型的数组集合或数值,参数上限10个,参数类型必须一致
2Integer[] items1 = {1, 2, 3, 4, 5, 6};
3Integer[] items2 = {3, 5, 6, 8, 3, 8};
4Observable observable = Observable.just(items1, items2);
5//使用范围数据,指定输出数据的范围(1-40的数值)
6Observable observable = Observable.range(1, 40);
7####创建事件的接收者(观察者|订阅者),onNext方法中的数据类型必须被观察者指定的泛型
1onNext(T item):Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于业务逻辑
2
3onCompleted():正常终止,在没有遇到错误的情况下,Observable在最后一次调用onNext之后调用此方法
4
5onError(Throwable e):当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出异常
6
7Observer<String> observer = new Observer<String>() {
8
9 @Override
10 public void onNext(String s) {
11 System.out.println("onNext" + s);
12 }
13
14 @Override
15 public void onCompleted() {
16 System.out.println("onCompleted");
17 }
18
19 @Override
20 public void onError(Throwable e) {
21 System.out.println("onError" + e.getMessage());
22 }
23};
1//订阅者
2Subscriber<String> subscriber = new Subscriber<String>() {
3
4 @Override
5 public void onStart(String s) {
6 System.out.println("事件开始了");
7 }
8
9 @Override
10 public void onNext(String s) {
11 System.out.println("onNext" + s);
12 }
13
14 @Override
15 public void onCompleted() {
16 System.out.println("onCompleted");
17 }
18
19 @Override
20 public void onError(Throwable e) {
21 System.out.println("onError" + e.getMessage());
22 }
23};
1//对订阅者进行简化,更简单
2Action1<String> action1 = new Action1<String>() {
3
4 @Override
5 public void call(String s) {
6 System.out.println("call" + s);
7 }
8};
1# 订阅事件,被观察者必须指定了事件的接收者(观察者|订阅者),整个事件流程才可以被启动
1observable.subscribe(observer);
2observable.subscribe(subscriber);
1//选择性参数方法,可对onNext,onCompleted,onError选择性使用,一般只需要onNext方法就足够
2observable.subscribe(action1);
3
4//自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
5observable.subscribe(onNextAction);
6//自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
7observable.subscribe(onNextAction, onErrorAction);
8//自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
9observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
10
11简单解释一下这段代码中出现的 Action1 和 Action0。
12Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;
13由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
14这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。
15Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
16与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。
17事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法
订阅者是观察者的子类,区别在于订阅者可以取消订阅(在程序销毁后)
1if(subscriber != null && !subscriber.isUnsubscribed()) {
2subscriber.unsubscribe();
3}