This manual is a work in progress and is currently incomplete.
If you'd like to help improve it, and we hope you do, please see the README.

15 RxJava

The excellent RxJava can be used in Ratpack applications to elegantly compose asynchronous operations.

The ratpack-rx JAR provides with RxRatpack class that provides static methods for adapting Ratpack promises to RxJava’s Observable.

The ratpack-rx module as of 1.4.5 is built against (and depends on) RxJava 1.1.2.

15.1 Initialization

The RxRatpack.initialize() must be called to fully enable the integration. This method only needs to be called once for the JVM’s lifetime.

15.2 Observing Ratpack

The integration is based on the RxRatpack.observe() and RxRatpack.observeEach() static methods. These methods adapt Ratpack’s promise type into an observable, which can then be used with all of the observable operators that RxJava offers.

For example, blocking operations can be easily observed.

import ratpack.exec.Promise;
import ratpack.exec.Blocking;
import ratpack.test.handling.HandlingResult;

import static org.junit.Assert.assertEquals;
import static ratpack.rx.RxRatpack.observe;
import static ratpack.test.handling.RequestFixture.requestFixture;

public class Example {
  public static void main(String... args) throws Exception {
    HandlingResult result = requestFixture().handle(context -> {
      Promise<String> promise = Blocking.get(() -> "hello world");
      observe(promise).map(String::toUpperCase).subscribe(context::render);
    });

    assertEquals("HELLO WORLD", result.rendered(String.class));
  }
}

15.3 Implicit error handling

A key feature of the RxJava integration is the implicit error handling. All observable sequences have an implicit default error handling strategy of forwarding the exception to the execution context error handler. In practice, this means that error handlers rarely need to be defined for observable sequences.

import ratpack.error.ServerErrorHandler;
import ratpack.rx.RxRatpack;
import ratpack.test.handling.RequestFixture;
import ratpack.test.handling.HandlingResult;
import rx.Observable;

import static org.junit.Assert.assertEquals;

public class Example {
  public static void main(String... args) throws Exception {
    RxRatpack.initialize(); // must be called once per JVM

    HandlingResult result = RequestFixture.requestFixture().handleChain(chain -> {
      chain.register(registry ->
          registry.add(ServerErrorHandler.class, (context, throwable) ->
              context.render("caught by error handler: " + throwable.getMessage())
          )
      );

      chain.get(ctx -> Observable.<String>error(new Exception("!")).subscribe((s) -> {}));
    });

    assertEquals("caught by error handler: !", result.rendered(String.class));
  }
}

In this case, the throwable thrown during the blocking operation will be forwarded to the current ServerErrorHandler, which will probably render an error page to the response. If the subscriber does implement an error handling strategy, it will be used instead of the implicit error handler.

The implicit error handling applies to all observables that are created on Ratpack managed threads. It is not restricted to observables that are backed by Ratpack promises.