JAX-RS 2.1 の Server-Sent Event

この記事は Java EE Advent Calendar 2017 の 16 日目です。

Java EE 8 の JAX-RS 2.1 には 2 つの大きな機能拡張があります。1 つは非同期通信のサポート強化です。JAX-RS 2.0 でもクライアント・サーバーともに非同期通信をサポートしていましたが、マイクロサービスを見据えた JAX-RS 2.1 では本格的なテコ入れが行われています。よく話題に上るのがリアクティブ・クライアントの追加です (リアクティブ API は必ずしも非同期になるとは限りませんが)。いい加減見飽きたでしょうが、リアクティブ・クライアントの使用サンプル (JAX-RS 2.1 仕様書から引用) を以下に示します。

CompletionStage<Number> csp = client.target("price/{destination}")
    .resolveTemplate("destination", "mars")
    .request()
    .rx()
    .get(Number.class);

CompletionStage<String> csf = client.target("forecast/{destination}")
    .resolveTemplate("destination", "mars")
    .request()
    .rx()
    .get(String.class);

csp.thenCombine(csf, (price, forecast) -> reserveIfAffordableAndWarm(price, forecast));

CompletionStage を用いてサービスを次々と呼び出し、レスポンスを組み合わせて処理しているのが見て取れるかと思います。JAX-RS 2.0 のクライアントでは非同期通信のコールバック内でさらに他のサービスを呼ぶような形を取らざるを得ませんでしたが、リアクティブ・クライアントの導入により呼び出し処理がシンプルになります (厳密な同期・非同期も考慮しなくて済みます)。この他にも非同期通信の扱いは全面的に強化されています。

今回の主題はもう 1 つの新機能である Server-Sent Event (SSE) です。少し前の発表資料ですが、再掲します。

当時はまだ Jersey の独自機能でしたが、JAX-RS 2.1 の SSE は概ね Jersey の SSE を踏襲しています。特にクライアント側の API はとても良く似ています。

まずはサーバー側のサンプルコードから見てみましょう (JAX-RS 2.1 仕様書からの引用です)。

@GET
@Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context SseEventSink eventSink, @Context Sse sse) {
  executor.execute(() -> {
    try (SseEventSink sink = eventSink) {
      eventSink.send(sse.newEvent("event1"));
      eventSink.send(sse.newEvent("event2"));
      eventSink.send(sse.newEvent("event3"));
    }
  });
}

ここでは SseEventSink と Sse という 2 つのコンテキストをメソッド引数に inject しています。簡単に言えば、Sse でイベントを作成して SseEventSink に送ると、それがクライアント側に Push される、といった具合に動きます。Jersey SSE よりもシンプルに書けるよう工夫がなされています。try-with-resources は SseEventSink のクローズを行うために少し強引な書き方 (変数 sink は使用されない) をしています。また、ManagedExecutorService.execute() を使って別スレッドで実行していますが、SSE は最後のイベント Push が完了するまで HTTP セッションを閉じないため、リソースクラス本体のスレッドで実行すると処理が固まってしまう恐れがあるためです (JavaFX や Android のような GUI プログラミングにおけるイベント処理では必ずと言っていいほど用いられる方法です)。

次にクライアント側のサンプルコードです (こちらも JAX-RS 2.1 仕様書からの引用です)。

WebTarget target = client.target("http://...");
try (SseEventSource source = SseEventSource.target(target).build()) {
  source.register(System.out::println);
  source.open();
  Thread.sleep(500); // Consume events for just 500 ms
} catch (InterruptedException e) {
  // falls through
}

これは try-with-resources が使えるようになったことを除いてかつての Jersey SSE そのままです。

  • SseEventSource がイベント Push を受け取るもの
  • SseEventSource.register() でイベントを受け取った時のコールバック処理 (ここでは System.out.println() を用いて出力するだけ) を登録
  • SseEventSource.open() でイベントの受信開始

となります。SSE ではクライアント側がインバウンド通信 = 待ち受け処理を必要とするため (通常はサーバー側がインバウンド通信となる)、クライアントと言いながらサーバー的なプログラミングが要求されます。ただし、複雑な待ち受けではないため、すぐに慣れるでしょう。

最後に、同じサーバー Push を実現する WebSocket との棲み分けですが、WebSocket は HTTP 101 によるプロトコル・スイッチにより HTTP(S) 上の独自プロトコルとして双方向通信をサポートします。一方、SSE はプロトコル・スイッチが発生せず、サーバーからクライアントへの片方向通信になります。