聚合运算符
concat
concat操作符将多个Observable结合成一个Observable并发射数据,并且严格按照先后顺序发射数据,前一个Observable的数据没有发射完,是不能发射后面Observable的数据的。
Observable < Integer > observable1 = Observable . just ( 1 , 2 , 3 );
Observable < Integer > observable2 = Observable . just ( 4 , 5 , 6 );
Observable < Integer > observable3 = Observable . just ( 7 , 8 , 9 );
Observable . concat ( observable1 , observable2 , observable3 ). subscribe ( new Subscriber < Integer > () {
@Override
public void onCompleted () {}
@Override
public void onError ( Throwable e ) {}
@Override
public void onNext ( Integer integer ) {
Log . d ( TAG , "integer=" + integer ); // 1,2,3,4,5,6,7,8,9
}
});
当一个Observable发生错误的时候,发射会终止。
Observable < Integer > observable1 = Observable . just ( 1 , 2 , 3 );
Observable < Integer > observable2 = Observable . create ( new Observable . OnSubscribe < Integer > () {
@Override
public void call ( Subscriber <? super Integer > subscriber ) {
subscriber . onError ( new Throwable ( "error" ));
}
});
Observable < Integer > observable3 = Observable . just ( 7 , 8 , 9 );
Observable . concat ( observable1 , observable2 , observable3 ). subscribe ( new Subscriber < Integer > () {
@Override
public void onCompleted () {}
@Override
public void onError ( Throwable e ) {
Log . e ( TAG , e . getMessage ());
}
@Override
public void onNext ( Integer integer ) {
Log . d ( TAG , "integer=" + integer ); // 1,2,3,error
}
});
count
count操作符用来统计源Observable发射了多少个数据,最后将数目给发射出来;如果源Observable发射错误,则会将错误直接报出来;在源Observable没有终止前,count是不会发射统计数据的。
Observable . just ( 1 , 2 , 3 ). count (). subscribe ( new Subscriber < Integer > () {
@Override
public void onCompleted () {}
@Override
public void onError ( Throwable e ) {}
@Override
public void onNext ( Integer integer ) {
Log . d ( TAG , "integer=" + integer ); // integer=3
}
});
toList
toList操作符可以将Observable发射的多个数据组合成一个List。
//过滤掉年级小于20的person
getPersons ()
. flatMap ( new Func1 < List < Person > , Observable < Person >> () {
@Override
public Observable < Person > call ( List < Person > persons ) {
return Observable . from ( persons );
}
})
. filter ( new Func1 < Person , Boolean > () {
@Override
public Boolean call ( Person person ) {
return person . age > 20 ;
}
})
. toList ()
. subscribe ( new Action1 < List < Person >> () {
@Override
public void call ( List < Person > persons ) {
//
}
});
toSortedList
toSortedList类似于toList,不同的是,它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable接口,会抛出一个异常。然而,你也可以传递一个函数作为用于比较两个数据项,这是toSortedList不会使用Comparable接口。
public class Person implements Comparable < Person > {
public String name ;
public Integer age ;
public Person ( String name , Integer age ) {
this . name = name ;
this . age = age ;
}
@Override
public int compareTo ( @NonNull Person person ) {
return age . compareTo ( person . age );
}
}
Observable . from ( new Person [] { new Person ( "a" , 22 ), new Person ( "b" , 18 ), new Person ( "c" , 21 )})
. toSortedList ()
. subscribe ( new Action1 < List < Person >> () {
@Override
public void call ( List < Person > persons ) {
for ( Person person : persons ) {
System . out . println ( person . name );
//b
//c
//a
}
}
});
public class Person {
public String name ;
public Integer age ;
public Person ( String name , Integer age ) {
this . name = name ;
this . age = age ;
}
}
Observable . from ( new Person [] { new Person ( "a" , 22 ), new Person ( "b" , 18 ), new Person ( "c" , 21 )})
. toSortedList ( new Func2 < Person , Person , Integer > () {
@Override
public Integer call ( Person person , Person person2 ) {
return person . age . compareTo ( person2 . age );
}
})
. subscribe ( new Action1 < List < Person >> () {
@Override
public void call ( List < Person > persons ) {
for ( Person person : persons ) {
System . out . println ( person . name );
//b
//c
//a
}
}
});
reduce
Reduce操作符接收Observable发射的数据并利用提供的函数的计算结果作为下次计算的参数,输出最后的结果。首次没有计算结果传入前两个参数。
Observable . from ( new Integer [] { 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 }). reduce ( new Func2 < Integer , Integer , Integer > () {
@Override
public Integer call ( Integer x , Integer y ) {
return x + y ; // 1+2+3+4+5+6+7+8+9+10
}
}). subscribe ( new Action1 < Integer > () {
@Override
public void call ( Integer integer ) {
Log . d ( TAG , "result=" + integer ); // result = 55
}
});
October 29, 2018
July 12, 2017