跳转至

RxJava

目录

RxJava最核心的东西就是ObservablesSubscribersObservables发出一系列事件,Subscribers处理这些事件。如果一个Observables没有任何的Subscribers,那么这个Observables是不会发出任何事件来。

1.创建Observable

RxJava提供了多种创建Observables的方法。create方法创建一实例需要传入一个OnSubscribe,当Subscriber订阅时进行调用。

创建Subscriber

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};

创建Observable

        Observable<String> myObservable=Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        System.out.println(subscriber);
                //rx.observers.SafeSubscriber@3e4cefce
                subscriber.onNext("hello,world!");
                    }
                }
        );

调用Observablesubscriber方法与Subscrible进行关联

  // 通过Observable的subscribe方法完成subscriber对observable的订阅
      myObservable.subscribe(mySubscriber);

输出

hello,world!

Observablejust用来创建只发出一个事件就结束的Observable对象。

Observablefrom方法,接受一个集合作为输入,然后每次输出一个元素给Subscriber

     Observable.from(new Integer[]{1,2,3,4,5})
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer/2==0;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println(integer);
            }
        });

2.操作符

操作符就是为了解决对Observable对象的变换问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件。RxJava提供了很多很有用的操作符。

map操作符,就是用来把一个事件转换为另一个事件的。

    Observable.just("hello,")
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        return s + "Dan";
                    }
                }).subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });

也可以使用map操作符返回一个发出新的数据类型的Observable对象。

flatMap可以返回一个Observable对象。

 Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("create--->"+subscriber);
                //rx.internal.operators.OperatorMap$1@c10d39
                subscriber.onNext(10);
            }
        }).flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(final Integer integer) {

                return Observable.create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        System.out.println("flatMap--->"+subscriber);
                        // rx.internal.operators.OperatorMerge$InnerSubscriber@2b3bc58a
                        subscriber.onNext(integer * integer);
                    }
                });
            }
        }).subscribe(mySubscriber2);
Subscriber<Integer> mySubscriber2 = new Subscriber<Integer>() {
  @Override
  public void onCompleted() {

  }

  @Override
  public void onError(Throwable e) {

  }

  @Override
  public void onNext(Integer integer) {
    System.out.println(integer);
  }
};

3.错误处理

4.调度器

RxJava可以使用ObservablesubscribeOn方法指定观察者代码运行的线程。

5.订阅

当调用Observable.subscribe()会返回一个Subscription对象。这个对象代表了观察者和订阅者之间的联系。

1
2
3
4
//是否解除订阅
subscription.isUnsubscribed()
//解除订阅
subscription.unsubscribe();

参考