Github user keith-turner commented on a diff in the pull request:

    
https://github.com/apache/incubator-fluo-recipes/pull/103#discussion_r79858177
  
    --- Diff: 
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 ---
    @@ -16,60 +16,58 @@
     package org.apache.fluo.recipes.accumulo.export;
     
     import java.util.ArrayList;
    +import java.util.Collection;
     import java.util.Iterator;
     
     import org.apache.accumulo.core.data.Mutation;
     import org.apache.fluo.api.config.FluoConfiguration;
    -import org.apache.fluo.api.config.SimpleConfiguration;
     import org.apache.fluo.api.observer.Observer.Context;
    +import org.apache.fluo.recipes.core.export.ExportQueue;
     import org.apache.fluo.recipes.core.export.Exporter;
     import org.apache.fluo.recipes.core.export.SequencedExport;
     
     /**
    - * An {@link Exporter} that takes {@link AccumuloExport} objects and 
writes mutations to Accumulo
    + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo 
using a
    + * {@link AccumuloWriter}
      *
    - * @param <K> Export queue key type
      * @since 1.0.0
      */
    -public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> {
    +public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
     
    -  private SharedBatchWriter sbw;
    +  private AccumuloWriter accumuloWriter;
     
    -  @Override
    -  public void init(String queueId, Context context) throws Exception {
    -
    -    SimpleConfiguration appConf = context.getAppConfiguration();
    -
    -    String instanceName = appConf.getString("recipes.accumuloExporter." + 
queueId + ".instance");
    -    String zookeepers = appConf.getString("recipes.accumuloExporter." + 
queueId + ".zookeepers");
    -    String user = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".user");
    -    // TODO look into using delegation token
    -    String password = appConf.getString("recipes.accumuloExporter." + 
queueId + ".password");
    -    String table = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".table");
    -
    -    sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, 
password, table);
    +  /**
    +   * Configures AccumuloExporter
    +   *
    +   * @param config Fluo configuration
    +   * @param opts Export queue options
    +   * @param writerConf Accumulo writer config
    +   */
    +  public static void configure(FluoConfiguration config, 
ExportQueue.Options opts,
    +      AccumuloWriter.Config writerConf) {
    +    ExportQueue.configure(config, opts);
    +    AccumuloWriter.setConfig(config.getAppConfiguration(), 
opts.getQueueId(), writerConf);
       }
     
    -  public static void setExportTableInfo(FluoConfiguration fconf, String 
queueId, TableInfo ti) {
    -    SimpleConfiguration appConf = fconf.getAppConfiguration();
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".instance", ti.instanceName);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".zookeepers", ti.zookeepers);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", 
ti.user);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".password", ti.password);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", 
ti.table);
    +  @Override
    +  public void init(String queueId, Context context) throws Exception {
    +    accumuloWriter = 
AccumuloWriter.getInstance(context.getAppConfiguration(), queueId);
       }
     
       @Override
    -  protected void processExports(Iterator<SequencedExport<K, 
AccumuloExport<K>>> exports) {
    +  protected void processExports(Iterator<SequencedExport<K, V>> exports) {
    +
         ArrayList<Mutation> buffer = new ArrayList<>();
     
         while (exports.hasNext()) {
    -      SequencedExport<K, AccumuloExport<K>> export = exports.next();
    -      buffer.addAll(export.getValue().toMutations(export.getKey(), 
export.getSequence()));
    +      SequencedExport<K, V> export = exports.next();
    +      buffer.addAll(processExport(export));
         }
     
         if (buffer.size() > 0) {
    -      sbw.write(buffer);
    +      accumuloWriter.write(buffer);
         }
       }
    +
    +  protected abstract Collection<Mutation> processExport(SequencedExport<K, 
V> export);
    --- End diff --
    
    One slight advantage of configuring a MutationTranslator instance over 
extending AccumuloExporter is that its easier for users to write unit test.  If 
a user extends AccumuloExporter, they will have to out some thought into how to 
structure their export code so that it can be unit tested.  If a user 
implements a MutationTranslator, this is automatically very easy to unit test.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to