Package ratpack.rx2

Class RxRatpack


  • public abstract class RxRatpack
    extends java.lang.Object
    Provides integration with RxJava2.

    IMPORTANT: the initialize() method must be called to fully enable integration.

    The methods of this class provide bi-directional conversion between Ratpack's Promise and RxJava's Observable. This allows Ratpack promise based API to be integrated into an RxJava based app and vice versa.

    Conveniently, the initialize() method installs an RxJava extension that provides a default error handling strategy for observables that integrates with Ratpack's execution model.

    To test observable based services that use Ratpack's execution semantics, use the ExecHarness and convert the observable back to a promise with promiseAll(Observable).

    The methods in this class are also provided as Groovy Extensions. When using Groovy, each static method in this class is able to act as an instance-level method against the Observable type.

    Since:
    1.6
    • Method Detail

      • initialize

        public static void initialize()
        Registers an RxJavaPlugins.getErrorHandler() with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.

        This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.

        For a Java application, a convenient place to call this is in the handler factory implementation.

        
         import ratpack.error.ServerErrorHandler;
         import ratpack.rx2.RxRatpack;
         import ratpack.test.embed.EmbeddedApp;
         import io.reactivex.Observable;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static void main(String... args) throws Exception {
             RxRatpack.initialize(); // must be called once for the life of the JVM
        
             EmbeddedApp.fromHandlers(chain -> chain
               .register(s -> s
                 .add(ServerErrorHandler.class, (ctx, throwable) ->
                   ctx.render("caught by error handler: " + throwable.getMessage())
                 )
               )
               .get(ctx -> Observable.<String>error(new Exception("!")).subscribe(ctx::render))
             ).test(httpClient ->
               assertEquals("caught by error handler: !", httpClient.getText())
             );
           }
         }
         
      • single

        public static <T> Single<T> single​(Promise<T> promise)
        Converts a Promise into a Single.

        The returned Single emits the promise's single value if it succeeds, and emits the error (i.e. via onError()) if it fails.

        This method works well as a method reference to the Promise.to(ratpack.func.Function) method.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.exec.Promise;
         import ratpack.test.exec.ExecHarness;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static String value;
           public static void main(String... args) throws Exception {
             ExecHarness.runSingle(e ->
               Promise.value("hello world")
                 .to(RxRatpack::single)
                 .map(String::toUpperCase)
                 .subscribe(s -> value = s)
             );
        
             assertEquals("HELLO WORLD", value);
           }
         }
         
        Type Parameters:
        T - the type of value promised
        Parameters:
        promise - the promise
        Returns:
        a single for the promised value
      • complete

        public static Completable complete​(Operation operation)
        Converts a Operation into a Completable.

        The returned Completable emits nothing if it succeeds, and emits the error (i.e. via onError()) if it fails.

      • observe

        public static <T,​I extends java.lang.Iterable<T>> Observable<T> observe​(Promise<I> promise)
        Observe a promise list as an observable stream.
        Type Parameters:
        T -
        Parameters:
        promise -
        Returns:
      • promiseAll

        public static <T> Promise<java.util.List<T>> promiseAll​(Observable<T> observable)
                                                         throws UnmanagedThreadException
        Converts an Observable into a Promise, for all of the observable's items.

        This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution. It is sometimes more convenient to use promiseAll(ObservableOnSubscribe) over this method.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.test.exec.ExecHarness;
         import io.reactivex.Observable;
         import java.util.List;
         import java.util.Arrays;
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static class AsyncService {
             public <T> Observable<T> observe(final T value) {
               return Observable.create(subscriber ->
                 new Thread(() -> {
                   subscriber.onNext(value);
                   subscriber.onComplete();
                 }).start()
               );
             }
           }
        
           public static void main(String[] args) throws Throwable {
             List<String> results = ExecHarness.yieldSingle(execution ->
               RxRatpack.promiseAll(new AsyncService().observe("foo"))
             ).getValue();
        
             assertEquals(Arrays.asList("foo"), results);
           }
         }
         

        This method uses Observable.toList() to collect the observable's contents into a list. It therefore should not be used with observables with many or infinite items.

        If it is expected that the observable only emits one element, it is typically more convenient to use promise(Single).

        If the observable emits an error, the returned promise will fail with that error.

        This method must be called during an execution.

        Type Parameters:
        T - the type of the value observed
        Parameters:
        observable - the observable
        Returns:
        a promise that returns all values from the observable
        Throws:
        UnmanagedThreadException - if called outside of an execution
        See Also:
        promise(Single)
      • promiseAll

        public static <T> Promise<java.util.List<T>> promiseAll​(ObservableOnSubscribe<T> onSubscribe)
                                                         throws UnmanagedThreadException
        Converts an Observable into a Promise, for all of the observable's items.

        This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.test.exec.ExecHarness;
         import io.reactivex.Observable;
         import java.util.List;
         import java.util.Arrays;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static class AsyncService {
             public <T> Observable<T> observe(final T value) {
               return Observable.create(subscriber ->
                 new Thread(() -> {
                   subscriber.onNext(value);
                   subscriber.onComplete();
                 }).start()
               );
             }
           }
        
           public static void main(String[] args) throws Throwable {
             List<String> results = ExecHarness.yieldSingle(execution ->
               new AsyncService().observe("foo").as(RxRatpack::promiseAll)
             ).getValue();
        
             assertEquals(Arrays.asList("foo"), results);
           }
         }
         

        This method uses Observable.toList() to collect the observable's contents into a list. It therefore should not be used with observables with many or infinite items.

        If the observable emits an error, the returned promise will fail with that error.

        This method must be called during an execution.

        Type Parameters:
        T - the type of the value observed
        Parameters:
        onSubscribe - the on subscribe function
        Returns:
        a promise that returns all values from the observable
        Throws:
        UnmanagedThreadException - if called outside of an execution
        See Also:
        promiseAll(Observable)
      • promise

        public static <T> Promise<T> promise​(SingleOnSubscribe<T> onSubscribe)
                                      throws UnmanagedThreadException
        Converts an SingleOnSubscribe into a Promise, for the Single's item.

        This method can be used to simply adapt a Single to a promise, but can also be used to bind a Single to the current execution.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.test.exec.ExecHarness;
         import io.reactivex.Single;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static class AsyncService {
             public <T> Single<T> single(final T value) {
               return Single.create(subscriber ->
                 new Thread(() -> {
                   subscriber.onSuccess(value);
                 }).start()
               );
             }
           }
        
           public static void main(String[] args) throws Throwable {
             String result = ExecHarness.yieldSingle(execution ->
               new AsyncService().single("foo").as(RxRatpack::promise)
             ).getValue();
        
             assertEquals("foo", result);
           }
         }
         

        If the observable emits an error, the returned promise will fail with that error. If the observable emits no items, the returned promise will fail with a NoSuchElementException.

        This method must be called during an execution.

        Type Parameters:
        T - the type of the value observed
        Parameters:
        onSubscribe - the on subscribe function
        Returns:
        a promise that returns the value from the single
        Throws:
        UnmanagedThreadException
        See Also:
        promise(Single)
      • publisher

        public static <T> TransformablePublisher<T> publisher​(Observable<T> observable,
                                                              BackpressureStrategy strategy)
        Converts an Observable into a Publisher, for all of the observable's items.

        This method can be used to simply adapt an observable to a ReactiveStreams publisher. It is sometimes more convenient to use publisher(ObservableOnSubscribe, BackpressureStrategy) over this method.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.stream.Streams;
         import ratpack.test.exec.ExecHarness;
         import io.reactivex.Observable;
         import io.reactivex.BackpressureStrategy;
         import java.util.List;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static class AsyncService {
             public <T> Observable<T> observe(final T value) {
               return Observable.create(subscriber ->
                 new Thread(() -> {
                   subscriber.onNext(value);
                   subscriber.onComplete();
                 }).start()
               );
             }
           }
        
           public static void main(String[] args) throws Throwable {
             List<String> result = ExecHarness.yieldSingle(execution ->
               RxRatpack.publisher(new AsyncService().observe("foo"), BackpressureStrategy.BUFFER).toList()
             ).getValue();
             assertEquals("foo", result.get(0));
           }
         }
         
        Type Parameters:
        T - the type of the value observed
        Parameters:
        observable - the observable
        Returns:
        a ReactiveStreams publisher containing each value of the observable
      • publisher

        public static <T> TransformablePublisher<T> publisher​(ObservableOnSubscribe<T> onSubscribe,
                                                              BackpressureStrategy strategy)
        Converts an Observable into a Publisher, for all of the observable's items.

        This method can be used to simply adapt an observable to a ReactiveStreams publisher.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.stream.Streams;
         import ratpack.test.exec.ExecHarness;
         import io.reactivex.Observable;
         import io.reactivex.BackpressureStrategy;
         import java.util.List;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static class AsyncService {
             public <T> Observable<T> observe(final T value) {
               return Observable.create(subscriber ->
                 new Thread(() -> {
                   subscriber.onNext(value);
                   subscriber.onComplete();
                 }).start()
               );
             }
           }
        
           public static void main(String[] args) throws Throwable {
             List<String> result = ExecHarness.yieldSingle(execution ->
               new AsyncService().observe("foo").as(onSub -> RxRatpack.publisher(onSub, BackpressureStrategy.BUFFER)).toList()
             ).getValue();
             assertEquals("foo", result.get(0));
           }
         }
         
        Type Parameters:
        T - the type of the value observed
        Parameters:
        onSubscribe - the on subscribe function
        Returns:
        a ReactiveStreams publisher containing each value of the observable
      • bindExec

        public static <T> Observable<T> bindExec​(Observable<T> source)
        Binds the given observable to the current execution, allowing integration of third-party asynchronous observables with Ratpack's execution model.

        This method is useful when you want to consume an asynchronous observable within a Ratpack execution, as an observable.

        
         import io.reactivex.Observable;
         import ratpack.test.exec.ExecHarness;
         import ratpack.rx2.RxRatpack;
         import java.util.Arrays;
         import java.util.List;
        
         import static org.junit.Assert.*;
        
         public class Example {
           public static void main(String... args) throws Exception {
             Observable<String> asyncObservable = Observable.create(subscriber ->
               new Thread(() -> {
                 subscriber.onNext("foo");
                 subscriber.onNext("bar");
                 subscriber.onComplete();
               }).start()
             );
        
             List<String> strings = ExecHarness.yieldSingle(e ->
               RxRatpack.promiseAll(asyncObservable.compose(RxRatpack::bindExec))
             ).getValue();
        
             assertEquals(Arrays.asList("foo", "bar"), strings);
           }
         }
         

        Type Parameters:
        T - the type of item observed
        Parameters:
        source - the observable source
        Returns:
        an observable stream equivalent to the given source
        See Also:
        promiseAll(Observable)
      • fork

        public static <T> Observable<T> fork​(Observable<T> observable)
        Parallelize an observable by forking it's execution onto a different Ratpack compute thread and automatically binding the result back to the original execution.

        This method can be used for simple parallel processing. It's behavior is similar to the subscribeOn but allows the use of Ratpack compute threads. Using fork modifies the execution of the upstream observable.

        This is different than forkEach which modifies where the downstream is executed.

        
         import ratpack.func.Pair;
         import ratpack.rx2.RxRatpack;
         import ratpack.test.exec.ExecHarness;
        
         import io.reactivex.Observable;
        
         import static org.junit.Assert.assertEquals;
         import static org.junit.Assert.assertNotEquals;
        
         public class Example {
           public static void main(String[] args) throws Exception {
             RxRatpack.initialize();
        
             try (ExecHarness execHarness = ExecHarness.harness(6)) {
               Integer sum = execHarness.yield(execution -> {
                 final String originalComputeThread = Thread.currentThread().getName();
        
                 Observable<Integer> unforkedObservable = Observable.just(1);
        
                 // `map` is executed upstream from the fork; that puts it on another parallel compute thread
                 Observable<Pair<Integer, String>> forkedObservable = Observable.just(2)
                   .map((val) -> Pair.of(val, Thread.currentThread().getName()))
                   .compose(RxRatpack::fork);
        
                 return RxRatpack.promise(
                   Observable.zip(unforkedObservable, forkedObservable, (Integer intVal, Pair<Integer, String> pair) -> {
                     String forkedComputeThread = pair.right;
                     assertNotEquals(originalComputeThread, forkedComputeThread);
                     return intVal + pair.left;
                   }).firstOrError()
                 );
               }).getValueOrThrow();
        
               assertEquals(sum.intValue(), 3);
             }
           }
         }
         
        Type Parameters:
        T - the element type
        Parameters:
        observable - the observable sequence to execute on a different compute thread
        Returns:
        an observable on the compute thread that fork was called from
        See Also:
        forkEach(Observable)
      • fork

        public static <T> Observable<T> fork​(Observable<T> observable,
                                             Action<? super RegistrySpec> registrySpec)
                                      throws java.lang.Exception
        A variant of fork(io.reactivex.Observable<T>) that allows access to the registry of the forked execution inside an Action.

        This allows the insertion of objects via RegistrySpec.add(java.lang.Class<O>, O) that will be available to the forked observable.

        You do not have access to the original execution inside the Action.

        
         import ratpack.exec.Execution;
         import ratpack.registry.RegistrySpec;
         import ratpack.rx2.RxRatpack;
         import ratpack.test.exec.ExecHarness;
        
         import io.reactivex.Observable;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static void main(String[] args) throws Exception {
             RxRatpack.initialize();
        
             try (ExecHarness execHarness = ExecHarness.harness(6)) {
               String concatenatedResult = execHarness.yield(execution -> {
        
                 Observable<String> notYetForked = Observable.just("foo")
                   .map((value) -> value + Execution.current().get(String.class));
        
                 Observable<String> forkedObservable = RxRatpack.fork(
                   notYetForked,
                   (RegistrySpec registrySpec) -> registrySpec.add("bar")
                 );
        
                 return RxRatpack.promise(forkedObservable.firstOrError());
               }).getValueOrThrow();
        
               assertEquals(concatenatedResult, "foobar");
             }
           }
         }
         
        Type Parameters:
        T - the element type
        Parameters:
        observable - the observable sequence to execute on a different compute thread
        registrySpec - an Action where objects can be inserted into the registry of the forked execution
        Returns:
        an observable on the compute thread that fork was called from
        Throws:
        java.lang.Exception
        See Also:
        fork(Observable)
      • forkEach

        public static <T> Observable<T> forkEach​(Observable<T> observable)
        Parallelize an observable by creating a new Ratpack execution for each element.

        
         import ratpack.rx2.RxRatpack;
         import ratpack.util.Exceptions;
         import ratpack.test.exec.ExecHarness;
        
         import io.reactivex.Observable;
        
         import java.util.List;
         import java.util.Arrays;
         import java.util.LinkedList;
         import java.util.Collection;
         import java.util.Collections;
         import java.util.concurrent.CyclicBarrier;
        
         import static org.junit.Assert.assertEquals;
        
         public class Example {
           public static void main(String[] args) throws Exception {
             RxRatpack.initialize();
        
             CyclicBarrier barrier = new CyclicBarrier(5);
        
             try (ExecHarness execHarness = ExecHarness.harness(6)) {
               List<Integer> values = execHarness.yield(execution ->
                 RxRatpack.promiseAll(
                   Observable.just(1, 2, 3, 4, 5)
                     .compose(RxRatpack::forkEach) // parallelize
                     .doOnNext(value -> Exceptions.uncheck(() -> barrier.await())) // wait for all values
                     .map(integer -> integer.intValue() * 2)
                     .serialize()
                 )
               ).getValue();
        
               List<Integer> sortedValues = new LinkedList<>(values);
               Collections.sort(sortedValues);
               assertEquals(Arrays.asList(2, 4, 6, 8, 10), sortedValues);
             }
           }
         }
         
        Type Parameters:
        T - the element type
        Parameters:
        observable - the observable sequence to process each element of in a forked execution
        Returns:
        an observable
      • scheduler

        public static Scheduler scheduler​(ExecController execController)
        A scheduler that uses the application event loop and initialises each job as an Execution (via ExecController.fork()).
        Parameters:
        execController - the execution controller to back the scheduler
        Returns:
        a scheduler