keith-turner opened a new issue #968: Develop design for asynchronous observers
URL: https://github.com/apache/fluo/issues/968
 
 
   Currently when a Fluo worker thread executes an observer, the execution flow 
looks something like the following.
   
    * Worker Thread 1 : execute Observer process method for transaction 1
    * Worker Thread 1 : process method executes `get` on transaction 1.  Wait 
for this to return
    * Worker Thread 1 : process method executes `get` on transaction 1.  Wait 
for this to return
    * Worker Thread 1 : process method executes `get` on transaction 1.  Wait 
for this to return
    * Worker Thread 1 : process method return, transaction is queued for 
asynchronous commit
   
   While executing get methods above, the worker thread is idle.  I am curious 
if we could design an asynchronous API  for observers that allows the worker 
thread to do other work while a get method is executing.
   
   One possible design would be to have new observer interface like the 
following.
   
   ```java
   public interface AsyncObserver {
     /**
      * This method should immediately return a completable future.  When the 
returned 
      * future completes, the transaction will be committed.
      */
     CompletableFuture<Void> process(TransactionBase tx, Bytes row, Column col);
   }
   ```
   
   Below is an example AsyncObserver that is rewrite of the 
[ContentObserver](https://gist.github.com/keith-turner/57e124c715c2542242f11eda85b3128c#file-contentobserver-java-L20)
 from a Fluo Tour exercise solution.  The example below use the getAsync 
methods proposed in #967 
   
   ```java
    public static class ContentObserver implements AsyncObserver
     {
   
       private List<String> tokenize(String document){//TODO}
       
       private CompletableFuture<Void> adjustCounts(TransactionBase tx, int 
delta, 
                                                    List<String> words) {
   
         List<CompletableFuture<Void>> futures = new ArrayList<>();
   
         for (String word : new HashSet<>(words)) {
           Future<Void> future = tx.getsAsync("w:" + word, WORD_COUNT, "0")
               .thenApply(Integer::parseInt)
               .thenApply(count -> delta + count)
               .thenAccept(newCount -> {
                 if (newCount == 0)
                   tx.delete("w:" + word, WORD_COUNT);
                 else
                   tx.set("w:" + word, WORD_COUNT, newCount + "");
               });
         }
   
         // Return a future that is only complete after all async word 
operations 
         // are complete. Yuck, why isn't there an allOf method that takes a 
         // Collection?
         return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]));
       }
       
       @Override
       public CompletableFuture<Void> process(TransactionBase tx, Bytes row, 
Column col) {
         
         CompletableFuture<String> content = tx.getsAsync(row, CONTENT_COL);
         CompletableFuture<String> status = tx.getsAsync(row, REF_STATUS_COL);
         CompletableFuture<String> processed = tx.getsAsync(row, PROCESSED_COL, 
"false");
         
         // After all three of the operations complete, then the lambda passed 
to thenCompose
         // will either do nothing or return another async operation to compute 
word counts.
   
         return CompletableFuture.allOf(content, status, 
processed).thenCompose(v -> {
           int delta = 0;
           if (status.get().equals("referenced") && 
processed.get().equals("false")) {
             tx.set(row, PROCESSED_COL, "true");
             delta = 1;
           }
   
           if (status.get().equals("unreferenced")) {
             for (Column c : new Column[] {PROCESSED_COL, CONTENT_COL, 
                                           REF_COUNT_COL, REF_STATUS_COL})
               tx.delete(row, c);
   
             if (processed.get().equals("true")) {
               delta = -1;
             }
           }
   
           if (delta == 0) {
             // nothing to do, so return a completed operation
             return CompletableFuture.completedFuture(null);
           } else {
             // return another async operation to update word counts
             return adjustCounts(tx, delta, tokenize(content.get()));
           }
         });
       }
     }
   ``` 
   
   The purpose of having this API would be to increase throughput.  To 
realistically achieve this, Accumulo would need to support async methods as 
mentioned in #651.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to