​ ​
reactivex1.png

牛とReactiveX

東京オフィスで働くファームノートのエンジニア狩谷です。ポケストップが自宅から射程内にあって喜んでいるところです。最寄り駅でもスマホを持っている人で賑わっておりました。

FarmnoteもiOS版、Android版を提供しております。Android版では RxJavaRxAndroid を使って非同期処理を記述しています。RxJava は ReactiveX を Java のライブラリとしたものです。

ReactiveX ではいろいろなことをデータの流れとみなして、データを提供する側である Observable と、データを消費する側である Subscriber とで分離してコードを記述することができます。

http://reactivex.io/

例えば、クリックイベントやWeb APIのレスポンス、BLEデバイスのスキャンなど、これらはデータを提供してくれるものと捉え、 Observable<T> というような型で表現します。

ReactiveX は様々なプログラミング言語でライブラリ化されています。この記事では牛のことを考えながら RxJava を使ってみます!

牛をReactiveX的に捉えると?

乳牛は、1日に2回くらい搾乳によって牛乳を出します。牛は牛乳を提供してくれる、つまり牛を ReactiveX で表現すると Observable<牛乳> となります。

データを消費する側である Subscriber はどうなるでしょうか。牛乳を飲むのは人間ですね。人間は Action1<牛乳> と表現できます。

牛が提供してくれる牛乳を人間が飲む様子は以下のようになります。

Observable<牛乳> 牛
Action1<牛乳> 人間

牛.subscribe(人間)

実際に動かすことができるコードは以下のようになります。

public class Main {

    static class 牛乳 {}

    Observable<牛乳> 牛 = Observable.create(subscriber -> {
        for (int i = 0; i < 10; i++) {
            if (subscriber.isUnsubscribed()) break;
            subscriber.onNext(new 牛乳());
        }
        subscriber.onCompleted();
    });

    Action1<牛乳> 人間 = (x) -> System.out.println("人間 「牛乳おいしい!」");

    private void exec() {
        牛.subscribe(人間);
    }

    public static void main(String[] args) {
        new Main().exec();
    }
}

Transforming Observable

上のコードだと牛が何もないところから牛乳を生み出しているようですが、牛はエサを食べます。エサには牧草などを使った粗飼料の他に、穀類も含んだ濃厚飼料などがあります。ここでは簡単に考えるために牛のエサを牧草とします。

牛は牧草を食べて、牧草に対していろいろな加工を施します。このようにデータを加工する処理を ReactiveX では Operators を使って行います。

http://reactivex.io/documentation/operators.html

ある型のデータを別の型のデータへ変換するには Map という Operator が使えます。 map メソッドにはデータの変換を行う関数を渡します。

牛を Observable<牛乳> ではなく、 牧草を牛乳へ変換する関数 Func1<牧草, 牛乳> と捉え直してみます。

牧草はどこから提供されるものでしょうか?そう、大地ですね。大地は Observable<牧草> となります。人間は Action1<牛乳> なので、型が合わず大地を直接 subscribe することはできませんが、 map で牛を間に挟めば subscribe できるようになります。

Observable<牧草> 大地
Func1<牧草, 牛乳> 牛
Action1<牛乳> 人間

大地.map(牛).subscribe(人間)

牛乳を消費する人間ですが、遡ると大地の恵みを消費していることが分かります。牛は、人間が食べることができない牧草を牛乳に変換してくれます。牛のおかげで人間は大地の恵をもらうことができるとも言えますね!牛はすごい!!

コードはこうなります。

+    static class 牧草 {}
     static class 牛乳 {}

-    Observable<牛乳> 牛 = Observable.create(subscriber -> {
+    Observable<牧草> 大地 = Observable.create(subscriber -> {
         for (int i = 0; i < 10; i++) {
             if (subscriber.isUnsubscribed()) break;
-            subscriber.onNext(new 牛乳());
+            subscriber.onNext(new 牧草());
         }
         subscriber.onCompleted();
     });

+    Func1<牧草, 牛乳> 牛 = (x) -> {
+        System.out.println("牛 「牧草ムシャムシャ」");
+        return new 牛乳();
+    };
+

人間が牛乳を直接飲んでいますが、実際は牛乳パックという形で消費することが多いと思います。乳業工場で牛乳パックにされます。乳業工場は牛乳を使って複数の牛乳パックを生み出します。乳業工場は Func1<牛乳, Observable<牛乳パック>> と表現できます。

大地.map(牛).map(工場) としたいところですが、これだと map の返り値が Observable<Observable<牛乳パック>> という風に Observable が入れ子になってしまい、人間が subscribe することができません。こういう時は Map ではなく FlatMap を使います。 大地.map(牛).flatMap(工場) とすると返り値が Observable<牛乳パック> となり入れ子が解消されます。

Observable<牧草> 大地
Func1<牧草, 牛乳> 牛
Func1<牛乳, Observable<牛乳パック>> 工場
Action1<牛乳パック> 人間

大地.map(牛).flatMap(工場).subscribe(人間)

コードはこうなります。

-    Action1<牛乳> 人間 = (x) -> System.out.println("人間 「牛乳おいしい!」");
+    Action1<牛乳パック> 人間 = (x) -> System.out.println("人間 「牛乳おいしい!」");
+
+    static class 牛乳パック {}
+
+    Func1<牛乳, Observable<牛乳パック>> 工場 = (x) -> {
+        System.out.println("工場 「パック詰めします。2個作れます」");
+        return Observable.from(Arrays.asList(new 牛乳パック(), new 牛乳パック()));
+    };
+

     private void exec() {
-        牛.subscribe(人間);
+        大地.map(牛).flatMap(工場).subscribe(人間)
     }

Map や FlatMap のようなデータを別の型に変換する Operator は Transforming Observables に記載されています。

http://reactivex.io/documentation/operators.html#transforming

Filtering Observables

牛から搾乳した牛乳は全て利用できるわけではありません。乳房炎という牛にとってはとても身近な病気があり、乳房炎になってしまった牛の牛乳は残念ながら利用できません。

搾乳された乳は検査が行われ、問題ない牛乳のみが利用されます。このようにデータをフィルタリングするための Operator として、 Filter があります。 filter メソッドには Boolean を返す関数を渡します。この関数が true を返すデータのみが利用され、 false を返すデータはここでふるい落とされます。

Observable<牧草> 大地
Func1<牧草, 牛乳> 牛
Func1<牛乳, Boolean> 検査
Func1<牛乳, Observable<牛乳パック>> 工場
Action1<牛乳パック> 人間

大地.map(牛).filter(検査).flatMap(工場).subscribe(人間)

コードはこうなります。牛乳にIDを持たせて、インスタンスを作る時にタイムスタンプをIDとしています。検査ではIDが偶数の牛乳のみをOKとしています。

-    static class 牛乳 {}
+    static class 牛乳 {
+        public final long id;
+        public 牛乳(long id) {
+            this.id = id;
+        }
+    }


+    Func1<牛乳, Boolean> 検査 = (x) -> {
+        if (x.id % 2 == 0) {
+            System.out.println("検査 「OK」 | 牛乳ID=" + x.id);
+            return true;
+        } else {
+            System.out.println("検査 「NG」 | 牛乳ID=" + x.id);
+            return false;
+        }
+    }

     private void exec() {
-        大地.map(牛).flatMap(工場).subscribe(人間)
+        大地.map(牛).filter(検査).flatMap(工場).subscribe(人間)
     }

Filter のようなデータの数を減らすような Operator は Filtering Observables の欄に記載されています。

http://reactivex.io/documentation/operators.html#filtering

Combining Observables

牛を、牧草を牛乳へ変換する関数としてきました。しかし、牧草を食べるだけでは牛乳を出すことはできず、酪農家さんが搾乳という作業をすることで牛乳が出ます。

搾乳は1日に2回くらい行いますが、このような定期的に発生するイベントもデータの流れと捉えることができます。酪農家さんは搾乳というイベントを生み出すため Observable<搾乳> になります。

そして、牛は、牧草と搾乳の Pair (apache commons lang3 にある) を引数にとり、牛乳を出力する関数に変更します。

Observable<牧草> 大地
Observable<搾乳> 酪農家
Func1<Pair<牧草, 搾乳>, 牛乳> 牛

Observable<牧草>Observable<搾乳> という2つの Observable を合成して Observable<Pair<牧草, 搾乳>> という1つの Observable を作りたいです。こういう時は Zip という Operator が使えます。

大地.zipWith(酪農家, Pair::of)

これで Observable<Pair<牧草, 搾乳>> が手に入ったので、 map を使って牛とつなげられます。

Observable<牧草> 大地
Observable<搾乳> 酪農家
Func1<Pair<牧草, 搾乳>, 牛乳> 牛
Func1<牛乳, Boolean> 検査
Func1<牛乳, Observable<牛乳パック>> 工場
Action1<牛乳パック> 人間

大地.zipWith(酪農家, Pair::of).map(牛).filter(検査).flatMap(工場).subscribe(人間)

コードはこうなります。

-    Func1<牧草, 牛乳> 牛 = (x) -> {
-        System.out.println("牛 「牧草おいしい!」");
-        return new 牛乳();
+    static class 搾乳 {}
+
+    Observable<搾乳> 酪農家 = Observable.just(new 搾乳()).repeat();
+
+    Func1<Pair<牧草, 搾乳>, 牛乳> 牛 = (x) -> {
+        System.out.println("牛 「牧草ムシャムシャ」");
+        return new 牛乳(new Date().getTime());
     };


     private void exec() {
-        大地.map(牛).filter(検査).flatMap(工場).subscribe(人間)
+        大地.zipWith(酪農家, Pair::of).map(牛).filter(検査).flatMap(工場).subscribe(人間)
     }

Zip のように2つの Observable を合成する Operator は Combining Observables の欄に記載されています。

http://reactivex.io/documentation/operators.html#combining

Scheduler

ReactiveX によってデータを提供する Observable, データに何かしらの処理を行う Operators, 最終的にデータを消費する Subscriber に分離することができました。RxJava で嬉しいことはこれらの処理をそれぞれ非同期的に動かすことができることです。

subscribeOn で Observable や Operators, Subscriber が動くスレッドを指定できます。 observeOn は Operators や Subscriber の間に挟み、それ以降の処理が動くスレッドを指定できます。Androidアプリにおいては RxAndroid と組み合わせることでバックグラウンドのスレッドとUIを操作するスレッドを指定することもできます。

以下のように書くと、Subscriberが動くスレッドと、それより上位の処理が動くスレッドを分けることができます。

大地.zipWith(酪農家, Pair::of)
    .subscribeOn(Schedulers.newThread())
    .map(牛)
    .filter(検査)
    .flatMap(工場)
    .observeOn(Schedulers.newThread())
    .subscribe(人間)

observeOn を各 Operators ごとに指定すれば全て別のスレッドで非同期に動かすことができます。

大地.zipWith(酪農家, Pair::of)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.newThread())
    .map(牛)
    .observeOn(Schedulers.newThread())
    .filter(検査)
    .observeOn(Schedulers.newThread())
    .flatMap(工場)
    .observeOn(Schedulers.newThread())
    .subscribe(人間)

実際に動かすときはメインのスレッドが終わってしまうので、無限ループさせて動かしてみましょう。

private void exec() {
    大地.zipWith(酪農家, Pair::of)
        .subscribeOn(Schedulers.newThread())
        .map(牛)
        .filter(検査)
        .flatMap(工場)
        .observeOn(Schedulers.newThread())
        .subscribe(人間);

    while (true) {}
}

Backpressure Operators

人間は牛乳パックを消費するわけですが、牛が出す牛乳を全て消費しきれますかね?ちょっときつくないですか?飲みきれませんよね。

こんな風にデータをひたすら出し続けると、データを消費する側の処理が追いつかずいっぱいいっぱいになってしまいます。これを Backpressure と呼びます。

Backpressureを防ぐにはどうしたら良いでしょうか?牛乳パックを送るスピードを緩めたり、人間が飲みたい時に牛乳パックを手に入れることができれば良さそうです。前者はSubscriber にデータを push する間隔を調整する、後者は Subscriber からデータを pull してもらうということになります。

ここでは ReactiveX の Delay という Operator を使って push する間隔を調整しましょう。これで Subscriber にデータが送られるのが最短で1秒間隔になります。

大地.zipWith(酪農家, Pair::of)
    .subscribeOn(Schedulers.newThread())
    .map(牛)
    .filter(検査)
    .flatMap(工場)
    .delay(1, TimeUnit.SECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(人間)

Backpressureについてはこちらが参考になります。

https://github.com/ReactiveX/RxJava/wiki/Backpressure#useful-operators-that-avoid-the-need-for-backpressure

おわりに

大地の恵である牧草と酪農家さんによる搾乳により牛は牛乳を出します。そして乳房炎の検査で問題ない牛乳が工場へ運ばれ、牛乳パックの形で人間が消費している、というつながりを ReactiveX で記述してみました。当然、現実の世界はもっともっと複雑ですが、普段の技術を現実世界の別の領域に適用するとどうなるかと妄想するのは楽しいですよね、逆もまた然りです。

つながりと言えば弊社のサイトにも 「生きる」をつなぐ。 という言葉があります。この言葉が気になる方、農業や技術の未来について考えることが好きな方、弊社HPWantedlyからのご応募お待ちしております。

このエントリーをはてなブックマークに追加