Github user mikewalch commented on a diff in the pull request:

    
https://github.com/apache/incubator-fluo-recipes/pull/103#discussion_r79850487
  
    --- Diff: 
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 ---
    @@ -16,60 +16,58 @@
     package org.apache.fluo.recipes.accumulo.export;
     
     import java.util.ArrayList;
    +import java.util.Collection;
     import java.util.Iterator;
     
     import org.apache.accumulo.core.data.Mutation;
     import org.apache.fluo.api.config.FluoConfiguration;
    -import org.apache.fluo.api.config.SimpleConfiguration;
     import org.apache.fluo.api.observer.Observer.Context;
    +import org.apache.fluo.recipes.core.export.ExportQueue;
     import org.apache.fluo.recipes.core.export.Exporter;
     import org.apache.fluo.recipes.core.export.SequencedExport;
     
     /**
    - * An {@link Exporter} that takes {@link AccumuloExport} objects and 
writes mutations to Accumulo
    + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo 
using a
    + * {@link AccumuloWriter}
      *
    - * @param <K> Export queue key type
      * @since 1.0.0
      */
    -public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> {
    +public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
     
    -  private SharedBatchWriter sbw;
    +  private AccumuloWriter accumuloWriter;
     
    -  @Override
    -  public void init(String queueId, Context context) throws Exception {
    -
    -    SimpleConfiguration appConf = context.getAppConfiguration();
    -
    -    String instanceName = appConf.getString("recipes.accumuloExporter." + 
queueId + ".instance");
    -    String zookeepers = appConf.getString("recipes.accumuloExporter." + 
queueId + ".zookeepers");
    -    String user = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".user");
    -    // TODO look into using delegation token
    -    String password = appConf.getString("recipes.accumuloExporter." + 
queueId + ".password");
    -    String table = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".table");
    -
    -    sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, 
password, table);
    +  /**
    +   * Configures AccumuloExporter
    +   *
    +   * @param config Fluo configuration
    +   * @param opts Export queue options
    +   * @param writerConf Accumulo writer config
    +   */
    +  public static void configure(FluoConfiguration config, 
ExportQueue.Options opts,
    +      AccumuloWriter.Config writerConf) {
    +    ExportQueue.configure(config, opts);
    +    AccumuloWriter.setConfig(config.getAppConfiguration(), 
opts.getQueueId(), writerConf);
       }
     
    -  public static void setExportTableInfo(FluoConfiguration fconf, String 
queueId, TableInfo ti) {
    -    SimpleConfiguration appConf = fconf.getAppConfiguration();
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".instance", ti.instanceName);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".zookeepers", ti.zookeepers);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", 
ti.user);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".password", ti.password);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", 
ti.table);
    +  @Override
    +  public void init(String queueId, Context context) throws Exception {
    +    accumuloWriter = 
AccumuloWriter.getInstance(context.getAppConfiguration(), queueId);
       }
     
       @Override
    -  protected void processExports(Iterator<SequencedExport<K, 
AccumuloExport<K>>> exports) {
    +  protected void processExports(Iterator<SequencedExport<K, V>> exports) {
    +
         ArrayList<Mutation> buffer = new ArrayList<>();
     
         while (exports.hasNext()) {
    -      SequencedExport<K, AccumuloExport<K>> export = exports.next();
    -      buffer.addAll(export.getValue().toMutations(export.getKey(), 
export.getSequence()));
    +      SequencedExport<K, V> export = exports.next();
    +      buffer.addAll(processExport(export));
         }
     
         if (buffer.size() > 0) {
    -      sbw.write(buffer);
    +      accumuloWriter.write(buffer);
         }
       }
    +
    +  protected abstract Collection<Mutation> processExport(SequencedExport<K, 
V> export);
    --- End diff --
    
    I thought about this and thought that we should just extend 
AccumuloExporter to do this.  I wrote some of the code below but I don't know 
if it makes sense or makes anything simpler. 
    ```java
    public abstract class AccumuloTranslationExporter<K> extends 
AccumuloExporter<K, Object> {
    
      public interface MutationTranslator<K, V> {
        Collection<Mutation> translate(SequencedExport<K, V> export);
      }
    
      public abstract void setup();
    
      public void addTranslator(Class clazz, MutationTranslator mt) {
        translatorMap.put(clazz, mt);
      }
    
      private Map<Class, MutationTranslator> translatorMap = new HashMap<>();
    
      @Override
      protected Collection<Mutation> processExport(SequencedExport<K, Object> 
export) {
        for (Map.Entry<Class, MutationTranslator> entry : 
translatorMap.entrySet()) {
          if (entry.getKey().isInstance(export)) {
            return entry.getValue().translate(export);
          }
        }
        return Collections.emptyList();
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to