Interface ReadWriteAccess


  • public interface ReadWriteAccess
    Provides read/write serialization, analogous to ReadWriteLock.

    Can be used whenever a “resource” has safe concurrent usages and mutually exclusive usages, such as updating a file.

    The read(Promise) and write(Promise) methods decorate promises with serialization. Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises. Write serialized promises may not execute concurrently with read or write serialized promises.

    Access is generally fair. That is, access is granted in the order that promises execute (n.b. not in the order they are decorated).

    Access is not reentrant. Deadlocks are not detected or prevented.

    
     import com.google.common.io.Files;
     import io.netty.buffer.ByteBufAllocator;
     import ratpack.exec.Promise;
     import ratpack.exec.util.ParallelBatch;
     import ratpack.exec.util.ReadWriteAccess;
     import ratpack.core.file.FileIo;
     import ratpack.test.embed.EmbeddedApp;
     import ratpack.test.embed.EphemeralBaseDir;
     import ratpack.test.exec.ExecHarness;
    
     import java.nio.charset.Charset;
     import java.nio.file.Path;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.List;
     import java.time.Duration;
    
     import static java.nio.file.StandardOpenOption.*;
     import static org.junit.jupiter.api.Assertions.assertEquals;
    
     public class Example {
    
       public static void main(String... args) throws Exception {
         EphemeralBaseDir.tmpDir().use(baseDir -> {
           ReadWriteAccess access = ReadWriteAccess.create(Duration.ofSeconds(5));
           Path f = baseDir.write("f", "foo");
    
           EmbeddedApp.of(a -> a
             .serverConfig(c -> c.baseDir(baseDir.getRoot()))
             .handlers(c -> {
               ByteBufAllocator allocator = c.getRegistry().get(ByteBufAllocator.class);
    
               c.path(ctx ->
                 ctx.byMethod(m -> m
                   .get(() ->
                     FileIo.read(FileIo.open(f, READ, CREATE), allocator, 8192)
                       .apply(access::read)
                       .map(b -> { try { return b.toString(Charset.defaultCharset()); } finally { b.release(); } })
                       .then(ctx::render)
                   )
                   .post(() ->
                     FileIo.write(ctx.getRequest().getBodyStream(), FileIo.open(f, WRITE, CREATE, TRUNCATE_EXISTING))
                       .apply(access::write)
                       .then(written -> ctx.render(written.toString()))
                   )
                 )
               );
             })
           ).test(httpClient -> {
    
             // Create a bunch of reads and writes
             List<Promise<String>> requests = new ArrayList<>();
             for (int i = 0; i < 200; ++i) {
               requests.add(Promise.sync(httpClient::getText));
             }
             for (int i = 0; i < 200; ++i) {
               requests.add(Promise.sync(() ->
                 httpClient.request(r -> r
                   .post().getBody().text("foo")
                 ).getBody().getText()
               ));
             }
    
             // Interleave
             Collections.shuffle(requests);
    
             // Execute them in parallel
             List<String> results = ExecHarness.yieldSingle(r ->
               ParallelBatch.of(requests).yield()
             ).getValueOrThrow();
    
             assertEquals("foo", Files.asCharSource(f.toFile(), Charset.defaultCharset()).read());
             assertEquals(400, results.size());
           });
         });
       }
     }
    
     
    Since:
    1.5
    • Method Detail

      • create

        static ReadWriteAccess create​(Duration defaultTimeout)
        Create a new read/write access object with the given default timeout.
        Parameters:
        defaultTimeout - the default maximum amount of time to wait for access (must not be negative, 0 == infinite)
        Returns:
        a new read/write access object
      • getDefaultTimeout

        Duration getDefaultTimeout()
        The default timeout value.
        Returns:
        the default timeout value
      • read

        <T> Promise<T> read​(Promise<T> promise)
        Decorates the given promise with read serialization.

        Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.

        If access is not granted within the default timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        Returns:
        a decorated promise
      • read

        <T> Promise<T> read​(Promise<T> promise,
                            Duration timeout)
        Decorates the given promise with read serialization and the given timeout.

        Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.

        If access is not granted within the given timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        timeout - the maximum amount of time to wait for access (must not be negative, 0 == infinite)
        Returns:
        a decorated promise
      • write

        <T> Promise<T> write​(Promise<T> promise)
        Decorates the given promise with write serialization.

        Write serialized promises may not execute concurrently with read or write serialized promises.

        If access is not granted within the default timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        Returns:
        a decorated promise
      • write

        <T> Promise<T> write​(Promise<T> promise,
                             Duration timeout)
        Decorates the given promise with write serialization.

        Write serialized promises may not execute concurrently with read or write serialized promises.

        If access is not granted within the given timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        timeout - the maximum amount of time to wait for access (must not be negative, 0 == infinite)
        Returns:
        a decorated promise