Github user keith-turner commented on a diff in the pull request:

    
https://github.com/apache/incubator-fluo-recipes/pull/103#discussion_r79656908
  
    --- Diff: 
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 ---
    @@ -16,60 +16,58 @@
     package org.apache.fluo.recipes.accumulo.export;
     
     import java.util.ArrayList;
    +import java.util.Collection;
     import java.util.Iterator;
     
     import org.apache.accumulo.core.data.Mutation;
     import org.apache.fluo.api.config.FluoConfiguration;
    -import org.apache.fluo.api.config.SimpleConfiguration;
     import org.apache.fluo.api.observer.Observer.Context;
    +import org.apache.fluo.recipes.core.export.ExportQueue;
     import org.apache.fluo.recipes.core.export.Exporter;
     import org.apache.fluo.recipes.core.export.SequencedExport;
     
     /**
    - * An {@link Exporter} that takes {@link AccumuloExport} objects and 
writes mutations to Accumulo
    + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo 
using a
    + * {@link AccumuloWriter}
      *
    - * @param <K> Export queue key type
      * @since 1.0.0
      */
    -public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> {
    +public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
     
    -  private SharedBatchWriter sbw;
    +  private AccumuloWriter accumuloWriter;
     
    -  @Override
    -  public void init(String queueId, Context context) throws Exception {
    -
    -    SimpleConfiguration appConf = context.getAppConfiguration();
    -
    -    String instanceName = appConf.getString("recipes.accumuloExporter." + 
queueId + ".instance");
    -    String zookeepers = appConf.getString("recipes.accumuloExporter." + 
queueId + ".zookeepers");
    -    String user = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".user");
    -    // TODO look into using delegation token
    -    String password = appConf.getString("recipes.accumuloExporter." + 
queueId + ".password");
    -    String table = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".table");
    -
    -    sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, 
password, table);
    +  /**
    +   * Configures AccumuloExporter
    +   *
    +   * @param config Fluo configuration
    +   * @param opts Export queue options
    +   * @param writerConf Accumulo writer config
    +   */
    +  public static void configure(FluoConfiguration config, 
ExportQueue.Options opts,
    +      AccumuloWriter.Config writerConf) {
    +    ExportQueue.configure(config, opts);
    +    AccumuloWriter.setConfig(config.getAppConfiguration(), 
opts.getQueueId(), writerConf);
       }
     
    -  public static void setExportTableInfo(FluoConfiguration fconf, String 
queueId, TableInfo ti) {
    -    SimpleConfiguration appConf = fconf.getAppConfiguration();
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".instance", ti.instanceName);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".zookeepers", ti.zookeepers);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", 
ti.user);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".password", ti.password);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", 
ti.table);
    +  @Override
    +  public void init(String queueId, Context context) throws Exception {
    +    accumuloWriter = 
AccumuloWriter.getInstance(context.getAppConfiguration(), queueId);
       }
     
       @Override
    -  protected void processExports(Iterator<SequencedExport<K, 
AccumuloExport<K>>> exports) {
    +  protected void processExports(Iterator<SequencedExport<K, V>> exports) {
    +
         ArrayList<Mutation> buffer = new ArrayList<>();
     
         while (exports.hasNext()) {
    -      SequencedExport<K, AccumuloExport<K>> export = exports.next();
    -      buffer.addAll(export.getValue().toMutations(export.getKey(), 
export.getSequence()));
    +      SequencedExport<K, V> export = exports.next();
    +      buffer.addAll(processExport(export));
         }
     
         if (buffer.size() > 0) {
    -      sbw.write(buffer);
    +      accumuloWriter.write(buffer);
         }
       }
    +
    +  protected abstract Collection<Mutation> processExport(SequencedExport<K, 
V> export);
    --- End diff --
    
    Thinking out loud again, not suggesting we actually do the following at 
this point.  We could have an interface called MutationTranslator.
    
    ```java
    interface MutationTranslator<K,V> {
      Collection<Mutation> translate(SequencedExport<K,V> export);
    }
    ```
    
    Instead of extending AccumuloExporter, would configure AccumuloExporter to 
use a MutationTranslator.
    
    We could then have a DispatchingMutationTranslator that user could extend.
    
    ```java
    abstract class DispatchingMutationTranslator implements MutationTranslator {
      public void addTranslator(Class clazz, MutationTranslator mt) 
      public abstract void setupTranslators(); //user must make multiple calls 
to addTranslator for each pojo they want to handle
    }
    ```
    
    For example user could do the following.
    
    ```java
    class MyMutTrans extends DispatchingMutationTranslator {
     public abstract void setupTranslators() {
       //For type Pojo1 execute the following lambda
       addTranslator(Pojo1.class, se -> trans1(se.getKey(), se.getValue(), 
se.getSequence());
       addTranslator(Pojo2.class, se -> trans44(se.getValue(), se.getSequence())
    }
    ```
    
    Then user would configure exporter to use MyMutTrans.  This reduce boiler 
plate code,buts adds a bit to configure.   I have been thinking of how to 
minimize the code required to configure an Accumulo export queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to