Interface ParallelBatch<T>
-
- Type Parameters:
T
- the type of value produced by each promise in the batch
- All Superinterfaces:
Batch<T>
public interface ParallelBatch<T> extends Batch<T>
A batch of promises to be processed, in parallel.Parallel batches can be created via
of(Iterable)
.Each promise will be executed in a
forked execution
. TheexecInit(Action)
method allows each forked execution to be customised before executing the work.- Since:
- 1.4
-
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Modifier and Type Method Description ParallelBatch<T>
execInit(Action<? super Execution> execInit)
Specifies an initializer for each forked execution.Operation
forEach(BiAction<? super Integer,? super T> consumer)
Processes the promises of the batch, stopping at the first error, emitting results to the given callback.static <T> ParallelBatch<T>
of(Iterable<? extends Promise<? extends T>> promises)
Creates a new parallel batch of the given promises.static <T> ParallelBatch<T>
of(Promise<? extends T>... promises)
Creates a new parallel batch of the given promises.TransformablePublisher<T>
publisher()
Creates a publisher that emits the promised values.Promise<List<T>>
yield()
Processes all the promises of the batch, stopping at the first error.Promise<List<? extends ExecResult<T>>>
yieldAll()
Processes all the promises of the batch, collecting any errors.
-
-
-
Method Detail
-
of
static <T> ParallelBatch<T> of(Iterable<? extends Promise<? extends T>> promises)
Creates a new parallel batch of the given promises.- Type Parameters:
T
- the type of item produced by each promise- Parameters:
promises
- the promises- Returns:
- a
ParallelBatch
-
of
@SafeVarargs static <T> ParallelBatch<T> of(Promise<? extends T>... promises)
Creates a new parallel batch of the given promises.- Type Parameters:
T
- the type of item produced by each promise- Parameters:
promises
- the promises- Returns:
- a
ParallelBatch
-
execInit
ParallelBatch<T> execInit(Action<? super Execution> execInit)
Specifies an initializer for each forked execution.The given action will be called with each execution before processing the promise. This can be used to seed the execution registry.
The given function will be invoked from the execution in question, and will be executed concurrently.
- Parameters:
execInit
- the execution initializer- Returns:
- a new batch, configured to use the given initializer
-
yieldAll
Promise<List<? extends ExecResult<T>>> yieldAll()
Processes all the promises of the batch, collecting any errors.This method differs from
Batch.yield()
in that every promise will be processed, regardless of any failure. As such, it returnsExecResult
objects representing the outcome as it may be an error.The promise returned from this method will not fail, as failure is conveyed via the result objects of the list.
The order of the entries in the promised list corresponds to the order of the promises originally. That is, it is guaranteed that the 2nd item in the list was the 2nd promise specified.
-
yield
Promise<List<T>> yield()
Processes all the promises of the batch, stopping at the first error.This method differs from
Batch.yieldAll()
in that processing will be halted as soon as the first error occurs. The error will be propagated through the returned promise.The order of the entries in the promised list corresponds to the order of the promises originally. That is, it is guaranteed that the 2nd item in the list was the 2nd promise specified. It does not reflect the order in which promises completed.
Multiple errors may occur due to promises being in-flight when the first error occurs. Subsequent errors will be
Throwable.addSuppressed(Throwable)
suppressed by the first error.
-
forEach
Operation forEach(BiAction<? super Integer,? super T> consumer)
Processes the promises of the batch, stopping at the first error, emitting results to the given callback.This method is useful for aggregating or reducing the batch.
The returned operation will complete after all items have been consumed or if there is an error.
The integer value given the to consumer indicates the source position of the corresponding promise.
Multiple errors may occur due to promises being in-flight when the first error occurs. Subsequent errors will be
Throwable.addSuppressed(Throwable)
suppressed by the first error.Note that the given function will be executed concurrently, as values become available.
import ratpack.exec.Promise; import ratpack.exec.util.ParallelBatch; import ratpack.func.Pair; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static org.junit.jupiter.api.Assertions.assertEquals; public class Example { public static void main(String... args) throws Exception { Map<String, Integer> map = new ConcurrentHashMap<>(); ExecHarness.runSingle(e -> { List<Promise<Pair<String, Integer>>> promises = Arrays.asList( Promise.value(Pair.of("a", 1)), Promise.value(Pair.of("b", 2)), Promise.value(Pair.of("c", 3)), Promise.value(Pair.of("d", 4)) ); ParallelBatch.of(promises) .forEach((i, v) -> map.put(v.left, v.right)) .then(); }); assertEquals(Integer.valueOf(1), map.get("a")); assertEquals(Integer.valueOf(2), map.get("b")); assertEquals(Integer.valueOf(3), map.get("c")); assertEquals(Integer.valueOf(4), map.get("d")); } }
-
publisher
TransformablePublisher<T> publisher()
Creates a publisher that emits the promised values.This method differs to
Batch.yield()
andBatch.yieldAll()
in that items are emitted as soon as they have completed. As such, it is more appropriate when wanting to stream the results in some fashion.Items are emitted in completion order, not source order.
Processing is effectively halted when the first error occurs.
The returned publisher is NOT
execution bound
.Any errors that occur after the initial will be ignored.
-
-