Interface TransformablePublisher<T>

  • Type Parameters:
    T - the type of item emitted by this publisher
    All Superinterfaces:
    Publisher<T>

    public interface TransformablePublisher<T>
    extends Publisher<T>
    A wrapper over a Publisher that makes it more convenient to chain transformations of different kinds.

    Note that this type implements the publisher interface, so behaves just like the publisher that it is wrapping with respect to the Publisher.subscribe(Subscriber) method.

    • Method Detail

      • gate

        default TransformablePublisher<T> gate​(Action<? super Runnable> valveReceiver)
        Parameters:
        valveReceiver - an action that receives a runnable “valve” that when run allows request to start flowing upstream
        Returns:
        a publisher that is logically equivalent to the given publisher as far as subscribers are concerned
      • toList

        default Promise<List<T>> toList()
        Consumes the given publisher's items to a list.

        This method can be useful when testing, but should be uses with care in production code as it will exhaust memory if the stream is very large.

        
         import org.reactivestreams.Publisher;
         import ratpack.exec.stream.Streams;
         import ratpack.test.exec.ExecHarness;
        
         import java.util.Arrays;
         import java.util.List;
        
         import static org.junit.jupiter.api.Assertions.*;
        
         public class Example {
           public static void main(String... args) throws Exception {
             List<String> expected = Arrays.asList("a", "b", "c");
             List<String> result = ExecHarness.yieldSingle(execControl ->
               Streams.publish(expected).toList()
             ).getValue();
        
             assertEquals(Arrays.asList("a", "b", "c"), result);
           }
         }
         

        If the publisher emits an error, the promise will fail and the collected items will be discarded.

        
         import org.reactivestreams.Publisher;
         import ratpack.exec.stream.Streams;
         import ratpack.test.exec.ExecHarness;
        
         import static org.junit.jupiter.api.Assertions.*;
        
         public class Example {
           public static void main(String... args) throws Exception {
             Throwable error = ExecHarness.yieldSingle(execControl ->
               Streams.yield(r -> {
                 if (r.getRequestNum() < 1) {
                   return "a";
                 } else {
                   throw new RuntimeException("bang!");
                 }
               }).toList()
             ).getThrowable();
        
             assertEquals("bang!", error.getMessage());
           }
         }
         
        Returns:
        a promise for the stream's contents as a list
      • transform

        default <O> TransformablePublisher<O> transform​(Function<? super TransformablePublisher<? extends T>,​? extends Publisher<O>> transformer)
        Convenience method to allow a non Ratpack publisher transform method to be hooked in.

        This transformable publisher will be given to the function, that should return a new publisher. The returned publisher will then be wrapped in a transformable wrapper which will be returned by this method.

        Type Parameters:
        O - the type of transformed item
        Parameters:
        transformer - a publisher transformer
        Returns:
        a publisher that respects back pressure for each of its subscribers
      • reduce

        default <R> Promise<R> reduce​(R seed,
                                      BiFunction<? super R,​? super T,​? extends R> reducer)
        Reduces the stream to a single value, by applying the given function successively.
        Type Parameters:
        R - the type of result
        Parameters:
        seed - the initial value
        reducer - the reducing function
        Returns:
        a promise for the reduced value
        Since:
        1.4
      • fork

        default TransformablePublisher<T> fork​(Action<? super ExecSpec> execConfig,
                                               Action<? super T> disposer)
        Consumes the given publisher eagerly in a forked execution, buffering results until ready to be consumed by this execution.
        Parameters:
        execConfig - the configuration for the forked execution
        disposer - the disposer for any buffered items when the stream errors or is cancelled
        Returns:
        an execution bound publisher that propagates the items of the given publisher
        Since:
        1.5
        See Also:
        Streams.fork(Publisher, Action, Action)