RxJava
目录
RxJava
最核心的东西就是Observables
和Subscribers
。Observables
发出一系列事件,Subscribers
处理这些事件。如果一个Observables
没有任何的Subscribers
,那么这个Observables
是不会发出任何事件来。
1.创建Observable
RxJava
提供了多种创建Observables
的方法。create
方法创建一实例需要传入一个OnSubscribe
,当Subscriber
订阅时进行调用。
创建Subscriber
| Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};
|
创建Observable
| Observable<String> myObservable=Observable.create(
new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println(subscriber);
//rx.observers.SafeSubscriber@3e4cefce
subscriber.onNext("hello,world!");
}
}
);
|
调用Observable
的subscriber
方法与Subscrible
进行关联
| // 通过Observable的subscribe方法完成subscriber对observable的订阅
myObservable.subscribe(mySubscriber);
|
输出
Observable
的just
用来创建只发出一个事件就结束的Observable
对象。
Observable
的from
方法,接受一个集合作为输入,然后每次输出一个元素给Subscriber
。
| Observable.from(new Integer[]{1,2,3,4,5})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer/2==0;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
|
2.操作符
操作符就是为了解决对Observable
对象的变换问题,操作符用于在Observable
和最终的Subscriber
之间修改Observable
发出的事件。RxJava
提供了很多很有用的操作符。
map
操作符,就是用来把一个事件转换为另一个事件的。
| Observable.just("hello,")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "Dan";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
|
也可以使用map
操作符返回一个发出新的数据类型的Observable
对象。
flatMap
可以返回一个Observable
对象。
| Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("create--->"+subscriber);
//rx.internal.operators.OperatorMap$1@c10d39
subscriber.onNext(10);
}
}).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("flatMap--->"+subscriber);
// rx.internal.operators.OperatorMerge$InnerSubscriber@2b3bc58a
subscriber.onNext(integer * integer);
}
});
}
}).subscribe(mySubscriber2);
|
| Subscriber<Integer> mySubscriber2 = new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
};
|
3.错误处理
4.调度器
RxJava
可以使用Observable
的subscribeOn
方法指定观察者代码运行的线程。
5.订阅
当调用Observable.subscribe()
会返回一个Subscription
对象。这个对象代表了观察者和订阅者之间的联系。
| //是否解除订阅
subscription.isUnsubscribed()
//解除订阅
subscription.unsubscribe();
|
参考