Github user mikewalch commented on a diff in the pull request: https://github.com/apache/incubator-fluo-recipes/pull/103#discussion_r79850487 --- Diff: modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java --- @@ -16,60 +16,58 @@ package org.apache.fluo.recipes.accumulo.export; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import org.apache.accumulo.core.data.Mutation; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.observer.Observer.Context; +import org.apache.fluo.recipes.core.export.ExportQueue; import org.apache.fluo.recipes.core.export.Exporter; import org.apache.fluo.recipes.core.export.SequencedExport; /** - * An {@link Exporter} that takes {@link AccumuloExport} objects and writes mutations to Accumulo + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo using a + * {@link AccumuloWriter} * - * @param <K> Export queue key type * @since 1.0.0 */ -public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> { +public abstract class AccumuloExporter<K, V> extends Exporter<K, V> { - private SharedBatchWriter sbw; + private AccumuloWriter accumuloWriter; - @Override - public void init(String queueId, Context context) throws Exception { - - SimpleConfiguration appConf = context.getAppConfiguration(); - - String instanceName = appConf.getString("recipes.accumuloExporter." + queueId + ".instance"); - String zookeepers = appConf.getString("recipes.accumuloExporter." + queueId + ".zookeepers"); - String user = appConf.getString("recipes.accumuloExporter." + queueId + ".user"); - // TODO look into using delegation token - String password = appConf.getString("recipes.accumuloExporter." + queueId + ".password"); - String table = appConf.getString("recipes.accumuloExporter." + queueId + ".table"); - - sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, password, table); + /** + * Configures AccumuloExporter + * + * @param config Fluo configuration + * @param opts Export queue options + * @param writerConf Accumulo writer config + */ + public static void configure(FluoConfiguration config, ExportQueue.Options opts, + AccumuloWriter.Config writerConf) { + ExportQueue.configure(config, opts); + AccumuloWriter.setConfig(config.getAppConfiguration(), opts.getQueueId(), writerConf); } - public static void setExportTableInfo(FluoConfiguration fconf, String queueId, TableInfo ti) { - SimpleConfiguration appConf = fconf.getAppConfiguration(); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".instance", ti.instanceName); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".zookeepers", ti.zookeepers); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", ti.user); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".password", ti.password); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", ti.table); + @Override + public void init(String queueId, Context context) throws Exception { + accumuloWriter = AccumuloWriter.getInstance(context.getAppConfiguration(), queueId); } @Override - protected void processExports(Iterator<SequencedExport<K, AccumuloExport<K>>> exports) { + protected void processExports(Iterator<SequencedExport<K, V>> exports) { + ArrayList<Mutation> buffer = new ArrayList<>(); while (exports.hasNext()) { - SequencedExport<K, AccumuloExport<K>> export = exports.next(); - buffer.addAll(export.getValue().toMutations(export.getKey(), export.getSequence())); + SequencedExport<K, V> export = exports.next(); + buffer.addAll(processExport(export)); } if (buffer.size() > 0) { - sbw.write(buffer); + accumuloWriter.write(buffer); } } + + protected abstract Collection<Mutation> processExport(SequencedExport<K, V> export); --- End diff -- I thought about this and thought that we should just extend AccumuloExporter to do this. I wrote some of the code below but I don't know if it makes sense or makes anything simpler. ```java public abstract class AccumuloTranslationExporter<K> extends AccumuloExporter<K, Object> { public interface MutationTranslator<K, V> { Collection<Mutation> translate(SequencedExport<K, V> export); } public abstract void setup(); public void addTranslator(Class clazz, MutationTranslator mt) { translatorMap.put(clazz, mt); } private Map<Class, MutationTranslator> translatorMap = new HashMap<>(); @Override protected Collection<Mutation> processExport(SequencedExport<K, Object> export) { for (Map.Entry<Class, MutationTranslator> entry : translatorMap.entrySet()) { if (entry.getKey().isInstance(export)) { return entry.getValue().translate(export); } } return Collections.emptyList(); } } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---