keith-turner opened a new issue #968: Develop design for asynchronous observers URL: https://github.com/apache/fluo/issues/968 Currently when a Fluo worker thread executes an observer, the execution flow looks something like the following. * Worker Thread 1 : execute Observer process method for transaction 1 * Worker Thread 1 : process method executes `get` on transaction 1. Wait for this to return * Worker Thread 1 : process method executes `get` on transaction 1. Wait for this to return * Worker Thread 1 : process method executes `get` on transaction 1. Wait for this to return * Worker Thread 1 : process method return, transaction is queued for asynchronous commit While executing get methods above, the worker thread is idle. I am curious if we could design an asynchronous API for observers that allows the worker thread to do other work while a get method is executing. One possible design would be to have new observer interface like the following. ```java public interface AsyncObserver { /** * This method should immediately return a completable future. When the returned * future completes, the transaction will be committed. */ CompletableFuture<Void> process(TransactionBase tx, Bytes row, Column col); } ``` Below is an example AsyncObserver that is rewrite of the [ContentObserver](https://gist.github.com/keith-turner/57e124c715c2542242f11eda85b3128c#file-contentobserver-java-L20) from a Fluo Tour exercise solution. The example below use the getAsync methods proposed in #967 ```java public static class ContentObserver implements AsyncObserver { private List<String> tokenize(String document){//TODO} private CompletableFuture<Void> adjustCounts(TransactionBase tx, int delta, List<String> words) { List<CompletableFuture<Void>> futures = new ArrayList<>(); for (String word : new HashSet<>(words)) { Future<Void> future = tx.getsAsync("w:" + word, WORD_COUNT, "0") .thenApply(Integer::parseInt) .thenApply(count -> delta + count) .thenAccept(newCount -> { if (newCount == 0) tx.delete("w:" + word, WORD_COUNT); else tx.set("w:" + word, WORD_COUNT, newCount + ""); }); } // Return a future that is only complete after all async word operations // are complete. Yuck, why isn't there an allOf method that takes a // Collection? return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); } @Override public CompletableFuture<Void> process(TransactionBase tx, Bytes row, Column col) { CompletableFuture<String> content = tx.getsAsync(row, CONTENT_COL); CompletableFuture<String> status = tx.getsAsync(row, REF_STATUS_COL); CompletableFuture<String> processed = tx.getsAsync(row, PROCESSED_COL, "false"); // After all three of the operations complete, then the lambda passed to thenCompose // will either do nothing or return another async operation to compute word counts. return CompletableFuture.allOf(content, status, processed).thenCompose(v -> { int delta = 0; if (status.get().equals("referenced") && processed.get().equals("false")) { tx.set(row, PROCESSED_COL, "true"); delta = 1; } if (status.get().equals("unreferenced")) { for (Column c : new Column[] {PROCESSED_COL, CONTENT_COL, REF_COUNT_COL, REF_STATUS_COL}) tx.delete(row, c); if (processed.get().equals("true")) { delta = -1; } } if (delta == 0) { // nothing to do, so return a completed operation return CompletableFuture.completedFuture(null); } else { // return another async operation to update word counts return adjustCounts(tx, delta, tokenize(content.get())); } }); } } ``` The purpose of having this API would be to increase throughput. To realistically achieve this, Accumulo would need to support async methods as mentioned in #651.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
