Github user keith-turner commented on a diff in the pull request:

    
https://github.com/apache/incubator-fluo-recipes/pull/103#discussion_r79469462
  
    --- Diff: 
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 ---
    @@ -16,60 +16,58 @@
     package org.apache.fluo.recipes.accumulo.export;
     
     import java.util.ArrayList;
    +import java.util.Collection;
     import java.util.Iterator;
     
     import org.apache.accumulo.core.data.Mutation;
     import org.apache.fluo.api.config.FluoConfiguration;
    -import org.apache.fluo.api.config.SimpleConfiguration;
     import org.apache.fluo.api.observer.Observer.Context;
    +import org.apache.fluo.recipes.core.export.ExportQueue;
     import org.apache.fluo.recipes.core.export.Exporter;
     import org.apache.fluo.recipes.core.export.SequencedExport;
     
     /**
    - * An {@link Exporter} that takes {@link AccumuloExport} objects and 
writes mutations to Accumulo
    + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo 
using a
    + * {@link AccumuloWriter}
      *
    - * @param <K> Export queue key type
      * @since 1.0.0
      */
    -public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> {
    +public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
     
    -  private SharedBatchWriter sbw;
    +  private AccumuloWriter accumuloWriter;
     
    -  @Override
    -  public void init(String queueId, Context context) throws Exception {
    -
    -    SimpleConfiguration appConf = context.getAppConfiguration();
    -
    -    String instanceName = appConf.getString("recipes.accumuloExporter." + 
queueId + ".instance");
    -    String zookeepers = appConf.getString("recipes.accumuloExporter." + 
queueId + ".zookeepers");
    -    String user = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".user");
    -    // TODO look into using delegation token
    -    String password = appConf.getString("recipes.accumuloExporter." + 
queueId + ".password");
    -    String table = appConf.getString("recipes.accumuloExporter." + queueId 
+ ".table");
    -
    -    sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, 
password, table);
    +  /**
    +   * Configures AccumuloExporter
    +   *
    +   * @param config Fluo configuration
    +   * @param opts Export queue options
    +   * @param writerConf Accumulo writer config
    +   */
    +  public static void configure(FluoConfiguration config, 
ExportQueue.Options opts,
    +      AccumuloWriter.Config writerConf) {
    +    ExportQueue.configure(config, opts);
    +    AccumuloWriter.setConfig(config.getAppConfiguration(), 
opts.getQueueId(), writerConf);
       }
     
    -  public static void setExportTableInfo(FluoConfiguration fconf, String 
queueId, TableInfo ti) {
    -    SimpleConfiguration appConf = fconf.getAppConfiguration();
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".instance", ti.instanceName);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".zookeepers", ti.zookeepers);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", 
ti.user);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + 
".password", ti.password);
    -    appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", 
ti.table);
    +  @Override
    +  public void init(String queueId, Context context) throws Exception {
    +    accumuloWriter = 
AccumuloWriter.getInstance(context.getAppConfiguration(), queueId);
       }
     
       @Override
    -  protected void processExports(Iterator<SequencedExport<K, 
AccumuloExport<K>>> exports) {
    +  protected void processExports(Iterator<SequencedExport<K, V>> exports) {
    +
         ArrayList<Mutation> buffer = new ArrayList<>();
     
         while (exports.hasNext()) {
    -      SequencedExport<K, AccumuloExport<K>> export = exports.next();
    -      buffer.addAll(export.getValue().toMutations(export.getKey(), 
export.getSequence()));
    +      SequencedExport<K, V> export = exports.next();
    +      buffer.addAll(processExport(export));
         }
     
         if (buffer.size() > 0) {
    -      sbw.write(buffer);
    +      accumuloWriter.write(buffer);
         }
       }
    +
    +  protected abstract Collection<Mutation> processExport(SequencedExport<K, 
V> export);
    --- End diff --
    
    I have been thinking about this change and way to possibly make it easier 
to use.   I was thinking of the following, but I am uncertain about it.  Maybe 
instead of the user extending AccumuloExporter we allow users to configure 
classes to translate Pojos to mutations.
    
    ```java
    //when configuring accumulo exporter... configure classes to translate 
pojos to mutation
    AccumuloExporter.setExportTableInfo(fluoConfig, EXPORT_QUEUE_ID,...);
    AccumuloExporter.addTranslator(EXPORT_QUEUE_ID, Pojo1.class, 
Translator1.class)
    AccumuloExporter.addTranslator(EXPORT_QUEUE_ID, Pojo2.class, 
Translator2.class)
    ```
    
    I Am still thinking about this... not advocating any specific change at the 
moment.  Just putting this up for discussion/consideration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to