Class Streams
- java.lang.Object
-
- ratpack.exec.stream.Streams
-
public class Streams extends Object
Some lightweight utilities for working with reactive streams.Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM.
Ratpack uses the Reactive Streams API when consuming streams of data (e.g
Response.sendStream(Publisher)
).This class provides some useful reactive utilities that integrate other parts of the Ratpack API with Reactive Stream types. It is not designed to be a fully featured reactive toolkit. If you require more features than provided here, consider using Ratpack's RxJava or Reactor integration.
The methods in this class are available as Groovy Extensions. When using Groovy, applications can utilize the static methods provided in this class as instance-level methods against the first argument in their variable argument list.
-
-
Constructor Summary
Constructors Constructor Description Streams()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> TransformablePublisher<T>
batch(int batchSize, Publisher<T> publisher, Action<? super T> disposer)
Batches and serialised demand.static <T> TransformablePublisher<T>
bindExec(Publisher<T> publisher)
Binds the given publisher to the currentExecution
.static <T> TransformablePublisher<T>
bindExec(Publisher<T> publisher, Action<? super T> disposer)
Binds the given publisher to the currentExecution
.static <T> TransformablePublisher<T>
buffer(Publisher<T> publisher)
Returns a publisher that allows the given publisher to without respecting demand.static <T> TransformablePublisher<T>
concat(Iterable<? extends Publisher<? extends T>> publishers)
Similar toconcat(Iterable, Action)
, but with a noop disposer.static <T> TransformablePublisher<T>
concat(Iterable<? extends Publisher<? extends T>> publishers, Action<? super T> disposer)
Returns a publisher that aggregates the given publishers into a single stream of elements, without interleaving them.static <T> TransformablePublisher<T>
constant(T item)
Creates a new publisher, that indefinitely streams the given object to all subscribers.static <T> TransformablePublisher<T>
empty()
An empty publisher that immediately completes.static <T> TransformablePublisher<T>
fanOut(Publisher<? extends Iterable<? extends T>> publisher)
Returns a publisher that publishes each element from Collections that are produced from the given input publisher.static <T> TransformablePublisher<T>
fanOut(Publisher<? extends Iterable<? extends T>> publisher, Action<? super T> disposer)
Returns a publisher that publishes each element from Collections that are produced from the given input publisher.static <T> TransformablePublisher<T>
filter(Publisher<T> input, Predicate<? super T> filter)
Returns a publisher that filters items from the given input stream by applying the given filter predicate.static <I,O>
TransformablePublisher<O>flatMap(Publisher<I> input, Function<? super I,? extends Promise<? extends O>> function)
Returns a publisher that publishes items from the given input publisher after transforming each item via the given, promise returning, function.static <T> TransformablePublisher<T>
flatten(Publisher<? extends Publisher<T>> publisher)
Creates a single publisher from a publisher of publishers.static <T> TransformablePublisher<T>
flatten(Publisher<? extends Publisher<T>> publisher, Action<? super T> disposer)
Creates a single publisher from a publisher of publishers.static <T> TransformablePublisher<T>
flatYield(Function<? super YieldRequest,? extends Promise<T>> producer)
Creates a new publisher, backed by the given asynchronous data producing function.static <T> TransformablePublisher<T>
fork(Publisher<T> publisher, Action<? super ExecSpec> execConfig, Action<? super T> disposer)
Consumes the given publisher eagerly in a forked execution, buffering results until ready to be consumed by this execution.static <T> TransformablePublisher<T>
gate(Publisher<T> publisher, Action<? super Runnable> valveReceiver)
Allows requests from the subscriber of the return publisher to be withheld from the given publisher until an externally defined moment.static <I,O>
TransformablePublisher<O>map(Publisher<I> input, Function<? super I,? extends O> function)
Returns a publisher that publishes items from the given input publisher after transforming each item via the given function.static <T> TransformablePublisher<T>
merge(Publisher<? extends T>... publishers)
Returns a publisher that merges the given input publishers into a single stream of elements.static <T> TransformablePublisher<T>
multicast(Publisher<T> publisher)
Returns a publisher that will stream events emitted from the given publisher to all of its subscribers.static <T> TransformablePublisher<T>
periodically(ScheduledExecutorService executorService, Duration duration, Function<? super Integer,? extends T> producer)
Executes the given function periodically, publishing the return value to the subscriber.static <T> TransformablePublisher<T>
periodically(Registry registry, Duration duration, Function<? super Integer,? extends T> producer)
static <T> TransformablePublisher<T>
publish(Iterable<T> iterable)
Converts an iterable to a publishable.static <T> TransformablePublisher<T>
publish(Promise<? extends Iterable<T>> promise)
Converts aPromise
for an iterable into a publishable.static <T,R>
Promise<R>reduce(Publisher<T> publisher, R seed, BiFunction<? super R,? super T,? extends R> reducer)
Reduces the stream to a single value, by applying the given function successively.static <U,D>
TransformablePublisher<D>streamMap(Publisher<? extends U> input, StreamMapper<? super U,D> mapper)
Allows transforming a stream into an entirely different stream.static <T> TransformablePublisher<T>
take(long count, Publisher<T> upstreamPublisher)
Returns a publisher that emits only the firstn
elements from the given publisher, wheren
is the given count.static <T> TransformablePublisher<T>
takeWhile(Predicate<T> condition, Publisher<T> upstreamPublisher)
Returns a publisher that emits elements from the given publisher, while thecondition
is true.static <T> Promise<List<T>>
toList(Publisher<T> publisher)
Creates a promise for the given publisher's items as a List.static <T> Promise<T>
toPromise(Publisher<T> publisher)
Creates a promise for the given publisher's single item.static <T> TransformablePublisher<T>
transformable(Publisher<T> publisher)
Wraps the publisher in Ratpack'sTransformablePublisher
to make composing a pipeline easier.static <T> TransformablePublisher<T>
wiretap(Publisher<T> publisher, Action<? super StreamEvent<T>> listener)
Allows listening to the events of the given publisher as they flow to subscribers.static <T> TransformablePublisher<T>
yield(Function<? super YieldRequest,? extends T> producer)
Creates a new publisher, backed by the given data producing function.
-
-
-
Method Detail
-
transformable
public static <T> TransformablePublisher<T> transformable(Publisher<T> publisher)
Wraps the publisher in Ratpack'sTransformablePublisher
to make composing a pipeline easier.The return publisher is effectively the same publisher in terms of the
Publisher.subscribe(org.reactivestreams.Subscriber)
method.- Type Parameters:
T
- the type of item the publisher emits- Parameters:
publisher
- the publisher to wrap- Returns:
- a wrapped publisher
-
empty
public static <T> TransformablePublisher<T> empty()
An empty publisher that immediately completes.- Type Parameters:
T
- the type of item expected- Returns:
- an empty publisher
- Since:
- 1.5
-
publish
public static <T> TransformablePublisher<T> publish(Iterable<T> iterable)
Converts an iterable to a publishable.Upon subscription, a new iterator will be created from the iterable. Values are pulled from the iterator in accordance with the requests from the subscriber.
Any exception thrown by the iterable/iterator will be forwarded to the subscriber.
- Type Parameters:
T
- the type of item emitted- Parameters:
iterable
- the data source- Returns:
- a publisher for the given iterable
-
publish
public static <T> TransformablePublisher<T> publish(Promise<? extends Iterable<T>> promise)
Converts aPromise
for an iterable into a publishable.Upon subscription the promise will be consumed and the promised iterable will be emitted to the subscriber one element at a time.
Any exception thrown by the the promise will be forwarded to the subscriber.
- Type Parameters:
T
- the element type of the promised iterable- Parameters:
promise
- the promise- Returns:
- a publisher for each element of the promised iterable
- Since:
- 1.1
-
yield
public static <T> TransformablePublisher<T> yield(Function<? super YieldRequest,? extends T> producer)
Creates a new publisher, backed by the given data producing function.As subscribers request data of the returned stream, the given function is invoked. The function returns the item to send downstream. If the function returns
null
, the stream is terminated. If the function throws an exception, the stream is terminated and the error is sent downstream.import ratpack.exec.stream.Streams; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.List; import static org.junit.jupiter.api.Assertions.*; public class Example { public static void main(String... args) throws Exception { List<String> strings = ExecHarness.yieldSingle(execControl -> Streams.yield(r -> { if (r.getRequestNum() < 2) { return Long.toString(r.getRequestNum()); } else { return null; } }).toList() ).getValue(); assertEquals(Arrays.asList("0", "1"), strings); } }
If the value producing function is asynchronous, use
flatYield(Function)
.- Type Parameters:
T
- the type of item emitted- Parameters:
producer
- the data source- Returns:
- a publisher backed by the given producer
- See Also:
flatYield(ratpack.func.Function<? super ratpack.exec.stream.YieldRequest, ? extends ratpack.exec.Promise<T>>)
-
flatYield
public static <T> TransformablePublisher<T> flatYield(Function<? super YieldRequest,? extends Promise<T>> producer)
Creates a new publisher, backed by the given asynchronous data producing function.As subscribers request data of the returned stream, the given function is invoked. The function returns a promise for the item to send downstream. If the promise provides a value of
null
, the stream is terminated. If the promise produces an error, the stream is terminated and the error is sent downstream. If the promise producing function throws an exception, the stream is terminated and the error is sent downstream.import ratpack.exec.stream.Streams; import ratpack.exec.Promise; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.List; import static org.junit.jupiter.api.Assertions.*; public class Example { public static void main(String... args) throws Exception { List<String> strings = ExecHarness.yieldSingle(execControl -> Streams.flatYield(r -> { if (r.getRequestNum() < 2) { return Promise.value(Long.toString(r.getRequestNum())); } else { return Promise.ofNull(); } }).toList() ).getValue(); assertEquals(Arrays.asList("0", "1"), strings); } }
If the value producing function is not asynchronous, use
yield(Function)
.- Type Parameters:
T
- the type of item emitted- Parameters:
producer
- the data source- Returns:
- a publisher backed by the given producer
- See Also:
yield(ratpack.func.Function<? super ratpack.exec.stream.YieldRequest, ? extends T>)
-
constant
public static <T> TransformablePublisher<T> constant(T item)
Creates a new publisher, that indefinitely streams the given object to all subscribers.This is rarely useful for anything other than testing.
- Type Parameters:
T
- the type of item emitted- Parameters:
item
- the item to indefinitely stream- Returns:
- a publisher that indefinitely streams the given item
-
map
public static <I,O> TransformablePublisher<O> map(Publisher<I> input, Function<? super I,? extends O> function)
Returns a publisher that publishes items from the given input publisher after transforming each item via the given function.The returned publisher does not perform any flow control on the data stream.
If the given transformation errors, the exception will be forwarded to the subscriber and the subscription to the input stream will be cancelled.
- Type Parameters:
I
- the type of input itemO
- the type of output item- Parameters:
input
- the stream of input datafunction
- the transformation- Returns:
- a publisher that applies the given transformation to each item from the input stream
-
filter
public static <T> TransformablePublisher<T> filter(Publisher<T> input, Predicate<? super T> filter)
Returns a publisher that filters items from the given input stream by applying the given filter predicate.The returned stream is
buffered
, which means that if the downstream requests, say 5 items, which is filtered into only 3 items the publisher will ask for more from the upstream to meet the downstream demand.import org.reactivestreams.Publisher; import ratpack.exec.stream.Streams; import ratpack.exec.stream.TransformablePublisher; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.List; import static org.junit.jupiter.api.Assertions.*; public class Example { public static void main(String... args) throws Exception { List<Integer> result = ExecHarness.yieldSingle(execControl -> { TransformablePublisher<Integer> evens = Streams.publish(Arrays.asList(1, 2, 3, 4, 5, 6)).filter(i -> i % 2 == 0); return evens.toList(); }).getValue(); assertEquals(Arrays.asList(2, 4, 6), result); } }
- Type Parameters:
T
- the type of item emitted- Parameters:
input
- the stream to filterfilter
- the filter predicate- Returns:
- the input stream filtered
-
streamMap
public static <U,D> TransformablePublisher<D> streamMap(Publisher<? extends U> input, StreamMapper<? super U,D> mapper)
Allows transforming a stream into an entirely different stream.While the
map(Publisher, Function)
method support transforming individual items, this method supports transforming the stream as a whole. This is necessary when the transformation causes a different number of items to be emitted than the original stream.import org.reactivestreams.Publisher; import ratpack.exec.stream.Streams; import ratpack.exec.stream.TransformablePublisher; import ratpack.exec.stream.WriteStream; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.List; import static org.junit.jupiter.api.Assertions.*; public class Example { public static void main(String... args) throws Exception { List<String> result = ExecHarness.yieldSingle(execControl -> { Publisher<String> chars = Streams.publish(Arrays.asList("a", "b", "c")); TransformablePublisher<String> mapped = Streams.streamMap(chars, (subscription, out) -> new WriteStream<String>() { public void item(String item) { out.item(item); out.item(item.toUpperCase()); } public void error(Throwable error) { out.error(error); } public void complete() { out.complete(); } } ); return mapped.toList(); }).getValue(); assertEquals(Arrays.asList("a", "A", "b", "B", "c", "C"), result); } }
The
mapper
function receives aWriteStream
for emitting items and returns aWriteStream
that will be written to by the upstream publisher. CallingWriteStream.complete()
orWriteStream.error(Throwable)
on the received write stream willcancel
the upstream subscription and it is guaranteed that no more items will be emitted from the upstream. If the complete/error signals from upstream don't need to be intercepted, theWriteStream.itemMap(Subscription, Action)
can be used on the write stream given to the mapping function to of the return write stream.Implementations must take care to call
Subscription.cancel()
on the provided subscription if they introduce an error. This is not required when simply forwarding an upstream error.The returned stream is
buffered
, which allows the stream transformation to emit more items downstream than what was received from the upstream. Currently, the upstream subscription will always receive a singlerequest
forLong.MAX_VALUE
, effectively asking upstream to emit as fast as it can. Future versions may propagate backpressure more intelligently.- Type Parameters:
U
- the type of item receivedD
- the type of item produced- Parameters:
input
- the stream to mapmapper
- the mapping function- Returns:
- the input stream transformed
- Since:
- 1.4
-
flatMap
public static <I,O> TransformablePublisher<O> flatMap(Publisher<I> input, Function<? super I,? extends Promise<? extends O>> function)
Returns a publisher that publishes items from the given input publisher after transforming each item via the given, promise returning, function.The returned publisher does not perform any flow control on the data stream.
If the given transformation errors, or if the returned promise fails, the exception will be forwarded to the subscriber and the subscription to the input stream will be cancelled.
- Type Parameters:
I
- the type of input itemO
- the type of output item- Parameters:
input
- the stream of input datafunction
- the transformation- Returns:
- a publisher that applies the given transformation to each item from the input stream
-
buffer
public static <T> TransformablePublisher<T> buffer(Publisher<T> publisher)
Returns a publisher that allows the given publisher to without respecting demand.The given publisher may violate the Reactive Streams contract in that it may emit more items than have been requested. Any excess will be buffered until there is more demand. All requests for items from the subscriber will be satisfied from the buffer first. If a request is made at any time for more items than are currently in the buffer, a request for the unmet demand will be made of the given publisher.
If the given producer emits far faster than the downstream subscriber requests, the intermediate queue will grow large and consume memory.
- Type Parameters:
T
- the type of item- Parameters:
publisher
- a data source- Returns:
- a publisher that buffers items emitted by the given publisher that were not requested
-
gate
public static <T> TransformablePublisher<T> gate(Publisher<T> publisher, Action<? super Runnable> valveReceiver)
Allows requests from the subscriber of the return publisher to be withheld from the given publisher until an externally defined moment.When the return publisher is subscribed to, the given publisher will be subscribed to. All requests made by the subscriber of the return publisher will not be forwarded to the subscription of the given publisher until the runnable given to the given action is run. Once the runnable is run, all requests are directly forwarded to the subscription of the given publisher.
The return publisher supports multi subscription, creating a new subscription to the given publisher each time. The given action will be invoke each time the return publisher is subscribed to with a distinct runnable for releasing the gate for that subscription.
The given action will be invoked immediately upon subscription of the return publisher. The runnable given to this action may be invoked any time (i.e. it does not need to be run during the action). If the action errors, the given publisher will not be subscribed to and the error will be sent to the return publisher subscriber.
- Type Parameters:
T
- the type of item emitted- Parameters:
publisher
- the data sourcevalveReceiver
- an action that receives a runnable “valve” that when run allows request to start flowing upstream- Returns:
- a publisher that is logically equivalent to the given publisher as far as subscribers are concerned
-
periodically
public static <T> TransformablePublisher<T> periodically(ScheduledExecutorService executorService, Duration duration, Function<? super Integer,? extends T> producer)
Executes the given function periodically, publishing the return value to the subscriber.When the return publisher is subscribed to, the given function is executed immediately (via the executor) with
0
as the input. The function will then be repeatedly executed again after the given delay (with an incrementing input) until the function returnsnull
. That is, a return value from the function ofnull
signals that the data stream has finished. The function will not be executed again after returningnull
.Each new subscription to the publisher will cause the function to be scheduled again. Due to this, it is generally desirable to wrap the return publisher in a multicasting publisher.
If the function throws an exception, the error will be sent to the subscribers and no more invocations of the function will occur.
The returned publisher is implicitly buffered to respect back pressure via
buffer(Publisher)
.- Type Parameters:
T
- the type of item- Parameters:
executorService
- the executor service that will periodically execute the functionduration
- the duration of the delay (Duration.ofSeconds(1) - delay will be 1 second)producer
- a function that produces values to emit- Returns:
- a publisher that applies respects back pressure, effectively throttling the given publisher
-
periodically
public static <T> TransformablePublisher<T> periodically(Registry registry, Duration duration, Function<? super Integer,? extends T> producer)
-
wiretap
public static <T> TransformablePublisher<T> wiretap(Publisher<T> publisher, Action<? super StreamEvent<T>> listener)
Allows listening to the events of the given publisher as they flow to subscribers.When the return publisher is subscribed to, the given publisher will be subscribed to. All events (incl. data, error and completion) emitted by the given publisher will be forwarded to the given listener before being forward to the subscriber of the return publisher.
If the listener errors, the upstream subscription will be cancelled (if appropriate) and the error sent downstream.
- Type Parameters:
T
- the type of item emitted- Parameters:
publisher
- the data sourcelistener
- the listener for emitted items- Returns:
- a publisher that is logically equivalent to the given publisher as far as subscribers are concerned
-
multicast
public static <T> TransformablePublisher<T> multicast(Publisher<T> publisher)
Returns a publisher that will stream events emitted from the given publisher to all of its subscribers.The return publisher allows the given publisher to emit as fast as it can, while applying flow control downstream to multiple subscribers. Each subscriber can signal its own demand. If the given publisher emits far faster than the downstream subscribers request, the intermediate queue of each subscriber will grow large and consume substantial memory. However, given this publisher is likely to be used with a periodic publisher or a regular indefinite stream it is unlikely to be a problem.
When a subscriber subscribes to the return publisher then it will not receive any events that have been emitted before it subscribed.
- Type Parameters:
T
- the type of item- Parameters:
publisher
- a data source- Returns:
- a publisher that respects back pressure for each of it's Subscribers.
-
fanOut
public static <T> TransformablePublisher<T> fanOut(Publisher<? extends Iterable<? extends T>> publisher)
Returns a publisher that publishes each element from Collections that are produced from the given input publisher.For each item the return publisher receives from the given input publisher, the return publisher will iterate over its elements and publish a new item for each element to its downstream subscriber e.g. if the return publisher receives a Collection with 10 elements then the downstream subscriber will receive 10 calls to its onNext method.
The returned publisher is implicitly buffered to respect back pressure via
buffer(Publisher)
.- Type Parameters:
T
- the type of item emitted- Parameters:
publisher
- the data source- Returns:
- a publisher that splits collection items into new items per collection element
-
fanOut
public static <T> TransformablePublisher<T> fanOut(Publisher<? extends Iterable<? extends T>> publisher, Action<? super T> disposer)
Returns a publisher that publishes each element from Collections that are produced from the given input publisher.For each item the return publisher receives from the given input publisher, the return publisher will iterate over its elements and publish a new item for each element to its downstream subscriber e.g. if the return publisher receives a Collection with 10 elements then the downstream subscriber will receive 10 calls to its onNext method.
The returned publisher is implicitly buffered to respect back pressure via
buffer(Publisher)
.- Type Parameters:
T
- the type of item emitted- Parameters:
publisher
- the data sourcedisposer
- the disposer of unhandled items- Returns:
- a publisher that splits collection items into new items per collection element
- Since:
- 1.5
-
merge
@SafeVarargs public static <T> TransformablePublisher<T> merge(Publisher<? extends T>... publishers)
Returns a publisher that merges the given input publishers into a single stream of elements.The returned publisher obeys the following rules:
- Only when all given input publishers have signalled completion will the downstream subscriber be completed.
- If one of the given input publishers errors then all other publisher subscriptions are cancelled and the error is propagated to the subscriber.
- If the subscription of the returned publisher is cancelled by the subscriber then all given input publisher subscriptions are cancelled.
The returned publisher is implicitly buffered to respect back pressure via
buffer(org.reactivestreams.Publisher)
.- Type Parameters:
T
- the type of item emitted- Parameters:
publishers
- the data sources to merge- Returns:
- a publisher that emits a single stream of elements from multiple publishers
-
toPromise
public static <T> Promise<T> toPromise(Publisher<T> publisher)
Creates a promise for the given publisher's single item.The given publisher is expected to produce zero or one items. If it produces zero, the promised value will be
null
. The it produces exactly one item, the promised value will be that item.If the stream produces more than one item, the promise will fail with an
IllegalStateException
. As soon as a second item is received, the subscription to the given publisher will be cancelled.The single item is not provided to the promise subscriber until the stream completes, to ensure that it is indeed a one element stream. If the stream errors before sending a second item, the promise will fail with that error. If it fails after sending a second item, that error will be ignored.
- Type Parameters:
T
- the type of promised value- Parameters:
publisher
- the publiser the convert to a promise- Returns:
- a promise for the publisher's single item
-
toList
public static <T> Promise<List<T>> toList(Publisher<T> publisher)
Creates a promise for the given publisher's items as a List.- Type Parameters:
T
- the type of item in the stream- Parameters:
publisher
- the stream to collect to a list- Returns:
- a promise for the streams contents as a list
-
reduce
public static <T,R> Promise<R> reduce(Publisher<T> publisher, R seed, BiFunction<? super R,? super T,? extends R> reducer)
Reduces the stream to a single value, by applying the given function successively.- Type Parameters:
R
- the type of result- Parameters:
publisher
- the publisher to reduceseed
- the initial valuereducer
- the reducing function- Returns:
- a promise for the reduced value
- Since:
- 1.4
-
bindExec
public static <T> TransformablePublisher<T> bindExec(Publisher<T> publisher)
Binds the given publisher to the currentExecution
.Calls
bindExec(Publisher, Action)
withAction.noop()
as the disposer.- Type Parameters:
T
- the type of item emitted by the publisher- Parameters:
publisher
- the publisher to bind to the execution- Returns:
- a new publisher that binds the given publisher to the current execution
-
bindExec
public static <T> TransformablePublisher<T> bindExec(Publisher<T> publisher, Action<? super T> disposer)
Binds the given publisher to the currentExecution
.Publishers may emit signals asynchronously and on any thread. An execution bound publisher emits all of its “signals” (e.g.
onNext()
) on its execution (and therefore same thread). By binding the publisher to the execution, the execution can remain open while the publisher is emitting and subscribers receive signals within the execution and can therefore usePromise
etc and have the appropriate execution state and error handling.There is a performance overhead in binding a publisher to an execution. It is typically only necessary to bind the last publisher in a chain to the execution. If the processing of items does not require execution mechanics, it can be faster to wrap the publisher subscription in
Promise.async(Upstream)
and complete the promise in the subscriber'sSubscriber.onComplete()
.The given
disposer
is used to “free” any items that were not yet received by the subscriber when the subscription is cancelled, or if the subscriber errors. This is only required if the emitted items are reference counted (e.g.ByteBuf
) or hold open resources (e.g. file handles). Any exceptions raised by the disposer will be logged then ignored. If items do not need disposing, passAction.noop()
.- Type Parameters:
T
- the type of item emitted by the publisher- Parameters:
publisher
- the publisher to bind to the executiondisposer
- the disposer of unhandled items- Returns:
- a new publisher that binds the given publisher to the current execution
- Since:
- 1.5
-
fork
public static <T> TransformablePublisher<T> fork(Publisher<T> publisher, Action<? super ExecSpec> execConfig, Action<? super T> disposer)
Consumes the given publisher eagerly in a forked execution, buffering results until ready to be consumed by this execution.This can be used when wanting to effectively parallelize the production of values and the consumption. The given publisher can emit items as fast as possible, independent of consumption.
If the given publisher emits faster than the consumer of the returned publisher, excessive memory may be used to buffer the items until the consumer can process them.
The given publisher will not be subscribed to until the returned publisher is. When the first
Subscription.request(long)
is issued, a request forLong.MAX_VALUE
will be issued to the subscription to the given publisher.The returned publisher is
execution bound
.- Type Parameters:
T
- the type of emitted item- Parameters:
publisher
- the publisher to consume as fast as possible in a forked executionexecConfig
- the configuration for the forked executiondisposer
- the disposer for any buffered items when the stream errors or is cancelled- Returns:
- an execution bound publisher that propagates the items of the given publisher
- Since:
- 1.5
-
take
public static <T> TransformablePublisher<T> take(long count, Publisher<T> upstreamPublisher)
Returns a publisher that emits only the firstn
elements from the given publisher, wheren
is the given count.On emitting the required number of elements, the upstream subscription will be cancelled and the stream completed.
The given required number of elements is a maximum. If the upstream publisher completes before the required number of elements is reached, then the stream completes as normal.
- Type Parameters:
T
- the type of emitted item- Parameters:
count
- the max number of items to emit before completingupstreamPublisher
- the publisher to take from- Returns:
- a publisher that will emit a max of
n
elements - Since:
- 1.5
-
takeWhile
public static <T> TransformablePublisher<T> takeWhile(Predicate<T> condition, Publisher<T> upstreamPublisher)
Returns a publisher that emits elements from the given publisher, while thecondition
is true.When the condition returns false, the upstream subscription will be cancelled and the stream completed.
If the upstream publisher completes before the condition returns false, then the stream completes as normal.
- Type Parameters:
T
- the type of emitted item- Parameters:
condition
- the predicate to determine if the stream should completeupstreamPublisher
- the publisher to take from- Returns:
- a publisher that will emit until the condition returns false
- Since:
- 2.0
-
concat
public static <T> TransformablePublisher<T> concat(Iterable<? extends Publisher<? extends T>> publishers, Action<? super T> disposer)
Returns a publisher that aggregates the given publishers into a single stream of elements, without interleaving them.The returned publisher obeys the following rules:
- Given publishers are subscribed to lazily and in the order they are supplied. That is, a publisher is not subscribed to until the previous publisher has completed.
- Elements emitted from the given publishers are not interleaved.
- Only when all given publishers have signalled completion will the downstream subscriber be completed.
- Type Parameters:
T
- the type of emitted item- Parameters:
publishers
- the publishers to concatenatedisposer
- the disposer of unhandled items- Returns:
- a publisher that emits a single stream of elements from multiple publishers
- Since:
- 1.5
-
concat
public static <T> TransformablePublisher<T> concat(Iterable<? extends Publisher<? extends T>> publishers)
Similar toconcat(Iterable, Action)
, but with a noop disposer.- Type Parameters:
T
- the type of emitted item- Parameters:
publishers
- the publishers to concatenate.- Returns:
- a publisher that emits a single stream of elements from multiple publishers
- Since:
- 1.5
-
batch
public static <T> TransformablePublisher<T> batch(int batchSize, Publisher<T> publisher, Action<? super T> disposer)
Batches and serialised demand.Subscribers often request items one at a time. This can cause inefficient data fetching patterns in publishers (e.g fetching one row at a time from a database result set). Such publishers can be wrapped in a batch publisher, which transforms demand into regular sizes.
Excess items are buffered until the subscriber wants them. Therefore, using a very large batch size with a very slow subscriber may require significant memory.
- Type Parameters:
T
- the type of emitted item- Parameters:
batchSize
- the batch sizepublisher
- the publisher to issue request to in batchesdisposer
- the disposer of unhandled items (e.g. buffered, unwanted, items)- Returns:
- a publisher that batches requests upstream
- Since:
- 1.5
-
flatten
public static <T> TransformablePublisher<T> flatten(Publisher<? extends Publisher<T>> publisher)
Creates a single publisher from a publisher of publishers.Delegates to
flatten(Publisher, Action)
withAction.noop()
.- Type Parameters:
T
- the type of emitted item- Parameters:
publisher
- the publisher of publishers- Returns:
- a publisher that flattens the given publisher
- Since:
- 1.5
-
flatten
public static <T> TransformablePublisher<T> flatten(Publisher<? extends Publisher<T>> publisher, Action<? super T> disposer)
Creates a single publisher from a publisher of publishers.Each emitted publisher is delegated to until it completes, upon which the next publisher will be requested and the actual demand delegated to it and so forth.
import org.reactivestreams.Publisher; import ratpack.exec.stream.Streams; import ratpack.test.exec.ExecHarness; import java.util.List; import static java.util.Arrays.asList; import static org.junit.jupiter.api.Assertions.assertEquals; public class Example { public static void main(String... args) throws Exception { List<Integer> value = ExecHarness.yieldSingle(e -> { Publisher<Integer> p1 = Streams.publish(asList(1, 2)); Publisher<Integer> p2 = Streams.publish(asList(3, 4)); Publisher<Publisher<Integer>> nested = Streams.publish(asList(p1, p2)); Publisher<Integer> flattened = Streams.flatten(nested); return Streams.toList(flattened); }).getValueOrThrow(); assertEquals(asList(1, 2, 3, 4), value); } }
- Type Parameters:
T
- the type of emitted item- Parameters:
publisher
- the publisher of publishersdisposer
- the disposer of unhandled items (e.g. buffered, unwanted, items)- Returns:
- a publisher that flattens the given publisher
- Since:
- 1.5
-
-