Interface ReadWriteAccess
-
public interface ReadWriteAccess
Provides read/write serialization, analogous toReadWriteLock
.Can be used whenever a “resource” has safe concurrent usages and mutually exclusive usages, such as updating a file.
The
read(Promise)
andwrite(Promise)
methods decorate promises with serialization. Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises. Write serialized promises may not execute concurrently with read or write serialized promises.Access is generally fair. That is, access is granted in the order that promises execute (n.b. not in the order they are decorated).
Access is not reentrant. Deadlocks are not detected or prevented.
import com.google.common.io.Files; import io.netty.buffer.ByteBufAllocator; import ratpack.exec.Promise; import ratpack.exec.util.ParallelBatch; import ratpack.exec.util.ReadWriteAccess; import ratpack.core.file.FileIo; import ratpack.test.embed.EmbeddedApp; import ratpack.test.embed.EphemeralBaseDir; import ratpack.test.exec.ExecHarness; import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.time.Duration; import static java.nio.file.StandardOpenOption.*; import static org.junit.jupiter.api.Assertions.assertEquals; public class Example { public static void main(String... args) throws Exception { EphemeralBaseDir.tmpDir().use(baseDir -> { ReadWriteAccess access = ReadWriteAccess.create(Duration.ofSeconds(5)); Path f = baseDir.write("f", "foo"); EmbeddedApp.of(a -> a .serverConfig(c -> c.baseDir(baseDir.getRoot())) .handlers(c -> { ByteBufAllocator allocator = c.getRegistry().get(ByteBufAllocator.class); c.path(ctx -> ctx.byMethod(m -> m .get(() -> FileIo.read(FileIo.open(f, READ, CREATE), allocator, 8192) .apply(access::read) .map(b -> { try { return b.toString(Charset.defaultCharset()); } finally { b.release(); } }) .then(ctx::render) ) .post(() -> FileIo.write(ctx.getRequest().getBodyStream(), FileIo.open(f, WRITE, CREATE, TRUNCATE_EXISTING)) .apply(access::write) .then(written -> ctx.render(written.toString())) ) ) ); }) ).test(httpClient -> { // Create a bunch of reads and writes List<Promise<String>> requests = new ArrayList<>(); for (int i = 0; i < 200; ++i) { requests.add(Promise.sync(httpClient::getText)); } for (int i = 0; i < 200; ++i) { requests.add(Promise.sync(() -> httpClient.request(r -> r .post().getBody().text("foo") ).getBody().getText() )); } // Interleave Collections.shuffle(requests); // Execute them in parallel List<String> results = ExecHarness.yieldSingle(r -> ParallelBatch.of(requests).yield() ).getValueOrThrow(); assertEquals("foo", Files.asCharSource(f.toFile(), Charset.defaultCharset()).read()); assertEquals(400, results.size()); }); }); } }
- Since:
- 1.5
-
-
Nested Class Summary
Nested Classes Modifier and Type Interface Description static class
ReadWriteAccess.TimeoutException
Thrown if access could not be acquired within the given timeout value.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Modifier and Type Method Description static ReadWriteAccess
create(Duration defaultTimeout)
Create a new read/write access object with the given default timeout.Duration
getDefaultTimeout()
The default timeout value.<T> Promise<T>
read(Promise<T> promise)
Decorates the given promise with read serialization.<T> Promise<T>
read(Promise<T> promise, Duration timeout)
Decorates the given promise with read serialization and the given timeout.<T> Promise<T>
write(Promise<T> promise)
Decorates the given promise with write serialization.<T> Promise<T>
write(Promise<T> promise, Duration timeout)
Decorates the given promise with write serialization.
-
-
-
Method Detail
-
create
static ReadWriteAccess create(Duration defaultTimeout)
Create a new read/write access object with the given default timeout.- Parameters:
defaultTimeout
- the default maximum amount of time to wait for access (must not be negative, 0 == infinite)- Returns:
- a new read/write access object
-
getDefaultTimeout
Duration getDefaultTimeout()
The default timeout value.- Returns:
- the default timeout value
-
read
<T> Promise<T> read(Promise<T> promise)
Decorates the given promise with read serialization.Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.
If access is not granted within the default timeout, the promise will wail with
ReadWriteAccess.TimeoutException
.- Type Parameters:
T
- the type of promised value- Parameters:
promise
- the promise to decorate- Returns:
- a decorated promise
-
read
<T> Promise<T> read(Promise<T> promise, Duration timeout)
Decorates the given promise with read serialization and the given timeout.Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.
If access is not granted within the given timeout, the promise will wail with
ReadWriteAccess.TimeoutException
.- Type Parameters:
T
- the type of promised value- Parameters:
promise
- the promise to decoratetimeout
- the maximum amount of time to wait for access (must not be negative, 0 == infinite)- Returns:
- a decorated promise
-
write
<T> Promise<T> write(Promise<T> promise)
Decorates the given promise with write serialization.Write serialized promises may not execute concurrently with read or write serialized promises.
If access is not granted within the default timeout, the promise will wail with
ReadWriteAccess.TimeoutException
.- Type Parameters:
T
- the type of promised value- Parameters:
promise
- the promise to decorate- Returns:
- a decorated promise
-
write
<T> Promise<T> write(Promise<T> promise, Duration timeout)
Decorates the given promise with write serialization.Write serialized promises may not execute concurrently with read or write serialized promises.
If access is not granted within the given timeout, the promise will wail with
ReadWriteAccess.TimeoutException
.- Type Parameters:
T
- the type of promised value- Parameters:
promise
- the promise to decoratetimeout
- the maximum amount of time to wait for access (must not be negative, 0 == infinite)- Returns:
- a decorated promise
-
-