Observable.create()

Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) {
            try {
                e.onNext("Alpha");
                e.onNext("Beta");
                e.onNext("Gamma");
                e.onNext("Delta");
                e.onNext("Epsilon");
                e.onComplete();
            } catch (Throwable t) {
                e.onError(t);
            }
        }
    });
source.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) {
        System.out.println(s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) {
        throwable.printStackTrace();
    }
});

output:

Alpha
Beta
Gamma
Delta
Epsilon

Observable.just()

Observable<String> mString = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
mString.map(new Function<String, Character>() {
    @Override
    public Character apply(String s) throws Exception {
        return s.charAt(0);
    }
}).subscribe(System.out::println);

output:

A
B
G
D
E

supplemental:

System.out::println 是 Java8 的 method references 使用於 lambda 表示法的內容僅有呼叫現有函示時使用
https://docs.oracle.com/javase/tutorial/java/javaOO/methodreferences.html

ConnectableObservable

類似廣播給所有 observers

ConnectableObservable<String> connSource = Observable.just("Alpha-", "Beta-", "Gamma-", "Delta-", "Epsilon-")
                .publish();
connSource.subscribe(System.out::println); // 列印字串
connSource.map(String::length).subscribe(System.out::println); // 取得文字長度後列印
connSource.connect();

output:

Alpha-
6
Beta-
5
Gamma-
6
Delta-
6
Epsilon-
8

Observable.range()

Observable.range(1, 10).subscribe(System.out::println);

output:

1
2
3
4
5
6
7
8
9
10

Observable.interval()

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println(aLong + "sec(s)");
            }
        });
sleep(5000);
public static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

output:

0sec(s)
1sec(s)
2sec(s)
3sec(s)
4sec(s)

Observable.feature()


Observable.empty()

Observable<String> empty = Observable.empty();
empty.subscribe(
    System.out::println,
    Throwable::printStackTrace,
    () -> System.out.println("Done!"));

output:

Done!

Observable.never()

永遠不會傳任何資料,與 empty() 差別就是永遠不會呼叫onComplete()


Observable.error()

直接丟出宣告的錯誤並呼叫 onError()

Observable.error(new Exception("Test Error"))
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o.toString());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println(throwable.getMessage());
            }
        });

output:

Test Error

Observable.defer()

每一次的 subscription 都會產生一個新的 Observable,所以 Observable 有變更會被反映出來。

private static int start = 0;
private static int count = 3;

public static void main(String[] args) {
    Observable<Integer> deferData = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.range(start, count);
        }
    });

    deferData.subscribe(System.out::println);
    count = 5;
    deferData.subscribe(System.out::println);
}

output:

0
1
2
0
1
2
3
4

Observable.fromCallable()

錯誤想要在執行期間被自訂的錯誤處理方法去處理

Observable.fromCallable(new Callable<Object>() {
    @Override
    public Object call() {
        return 1 / 0;
    }
}).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) {
        System.out.println("Received: " + o);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) {
        System.out.println("Error Captured: " + throwable.getLocalizedMessage());
    }
});

output:

Error Captured: / by zero

results matching ""

    No results matching ""