Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 6f5177363 -> dc765718c


Added standard way to setup per exporter configuration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/dc765718
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/dc765718
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/dc765718

Branch: refs/heads/master
Commit: dc765718c59104a1643cbaa0ae81d565ae5e4146
Parents: 6f51773
Author: Keith Turner <ktur...@apache.org>
Authored: Wed Sep 21 18:02:23 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Sep 22 12:24:18 2016 -0400

----------------------------------------------------------------------
 docs/accumulo-export-queue.md                   |  24 +-
 .../accumulo/export/AccumuloExportQueue.java    | 278 -------------------
 .../accumulo/export/AccumuloExporter.java       |  84 +++++-
 .../accumulo/export/AccumuloReplicator.java     |   2 +-
 .../recipes/accumulo/export/AccumuloWriter.java | 169 +++++++++++
 .../accumulo/export/AccumuloExportTest.java     |   2 +-
 .../recipes/core/export/ExportObserver.java     |  19 +-
 .../fluo/recipes/core/export/ExportQueue.java   |  32 +++
 .../fluo/recipes/core/export/Exporter.java      |  13 +-
 .../fluo/recipes/core/export/OptionsTest.java   |  12 +-
 .../recipes/test/export/AccumuloExporterIT.java |  12 +-
 .../test/export/AccumuloReplicatorIT.java       |  13 +-
 .../recipes/test/export/SimpleExporter.java     |   2 +-
 13 files changed, 347 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/docs/accumulo-export-queue.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md
index dde04fb..33a1c97 100644
--- a/docs/accumulo-export-queue.md
+++ b/docs/accumulo-export-queue.md
@@ -3,8 +3,8 @@
 ## Background
 
 The [Export Queue Recipe][1] provides a generic foundation for building export 
mechanism to any
-external data store. The [AccumuloExportQueue] provides an implementation of 
this recipe for
-Accumulo. The [AccumuloExportQueue] is located the 'fluo-recipes-accumulo' 
module and provides the
+external data store. The [AccumuloExporter] provides an implementation of this 
recipe for
+Accumulo. The [AccumuloExporter] is located the `fluo-recipes-accumulo` module 
and provides the
 following functionality:
 
  * Safely batches writes to Accumulo made by multiple transactions exporting 
data.
@@ -24,7 +24,7 @@ Exporting to Accumulo is easy. Follow the steps below:
     public class SimpleExporter extends AccumuloExporter<String, String> {
 
       @Override
-      protected Collection<Mutation> processExport(SequencedExport<String, 
String> export) {
+      protected Collection<Mutation> translate(SequencedExport<String, String> 
export) {
         Mutation m = new Mutation(export.getKey());
         m.put("cf", "cq", export.getSequence(), export.getValue());
         return Collections.singleton(m);
@@ -32,7 +32,7 @@ Exporting to Accumulo is easy. Follow the steps below:
     }
     ```
 
-2. With a `SimpleExporter` created, configure a [AccumuloExportQueue] to use 
`SimpleExporter` and
+2. With a `SimpleExporter` created, configure an `ExportQueue` to use 
`SimpleExporter` and
    give it information on how to connect to Accumulo. 
 
     ```java
@@ -46,10 +46,17 @@ Exporting to Accumulo is easy. Follow the steps below:
     String password =       // Accumulo user password
     String exportTable =    // Name of table to export to
 
-    // Configure accumulo export queue
-    AccumuloExportQueue.configure(fluoConfig, new 
ExportQueue.Options(EXPORT_QUEUE_ID,
-        SimpleExporter.class.getName(), String.class.getName(), 
String.class.getName(), numMapBuckets),
-        new AccumuloExportQueue.Options(instance, zookeepers, user, password, 
exportTable));
+
+    // Create config for export table.
+    AccumuloExporter.Configuration exportTableCfg =
+        new AccumuloExporter.Configuration(instance, zookeepers, user, 
password, exportTable);
+
+    // Create config for export queue.
+    ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, 
SimpleExporter.class,
+        String.class, String.class, 
numMapBuckets).setExporterConfiguration(exportTableCfg);
+
+    // Configure export queue.  This will modify fluoConfig.
+    ExportQueue.configure(fluoConfig, qeOpts);
 
     // Initialize Fluo using fluoConfig
     ```
@@ -83,7 +90,6 @@ Exporting to Accumulo is easy. Follow the steps below:
 [AccumuloReplicator] is a specialized [AccumuloExporter] that replicates a 
Fluo table to Accumulo.
 
 [1]: export-queue.md
-[AccumuloExportQueue]: 
../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
 [AccumuloExporter]: 
../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 [AccumuloReplicator]: 
../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
deleted file mode 100644
index fdba5f3..0000000
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.accumulo.export;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.recipes.core.export.ExportQueue;
-
-public class AccumuloExportQueue {
-
-  /**
-   * Configures AccumuloExportQueue
-   *
-   * @param fc Fluo configuration
-   * @param eqo Export queue options
-   * @param ao Accumulo export queue options
-   */
-  public static void configure(FluoConfiguration fc, ExportQueue.Options eqo, 
Options ao) {
-    ExportQueue.configure(fc, eqo);
-    AccumuloWriter.setConfig(fc.getAppConfiguration(), eqo.getQueueId(), ao);
-  }
-
-  /**
-   * Generates Accumulo mutations by comparing the differences between a 
RowColumn/Bytes map that is
-   * generated for old and new data and represents how the data should exist 
in Accumulo. When
-   * comparing each row/column/value (RCV) of old and new data, mutations are 
generated using the
-   * following rules:
-   * <ul>
-   * <li>If old and new data have the same RCV, nothing is done.
-   * <li>If old and new data have same row/column but different values, an 
update mutation is
-   * created for the row/column.
-   * <li>If old data has a row/column that is not in the new data, a delete 
mutation is generated.
-   * <li>If new data has a row/column that is not in the old data, an insert 
mutation is generated.
-   * <li>Only one mutation is generated per row.
-   * <li>The export sequence number is used for the timestamp in the mutation.
-   * </ul>
-   *
-   * @param oldData Map containing old row/column data
-   * @param newData Map containing new row/column data
-   * @param seq Export sequence number
-   */
-  public static Collection<Mutation> generateMutations(long seq, 
Map<RowColumn, Bytes> oldData,
-      Map<RowColumn, Bytes> newData) {
-    Map<Bytes, Mutation> mutationMap = new HashMap<>();
-    for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
-      RowColumn rc = entry.getKey();
-      if (!newData.containsKey(rc)) {
-        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new 
Mutation(r.toArray()));
-        m.putDelete(rc.getColumn().getFamily().toArray(), 
rc.getColumn().getQualifier().toArray(),
-            seq);
-      }
-    }
-    for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
-      RowColumn rc = entry.getKey();
-      Column col = rc.getColumn();
-      Bytes newVal = entry.getValue();
-      Bytes oldVal = oldData.get(rc);
-      if (oldVal == null || !oldVal.equals(newVal)) {
-        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new 
Mutation(r.toArray()));
-        m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, 
newVal.toArray());
-      }
-    }
-    return mutationMap.values();
-  }
-
-  /**
-   * Writes mutations to Accumulo using a shared batch writer
-   *
-   * @since 1.0.0
-   */
-  static class AccumuloWriter {
-
-    private static class Mutations {
-      List<Mutation> mutations;
-      CountDownLatch cdl = new CountDownLatch(1);
-
-      Mutations(Collection<Mutation> mutations) {
-        this.mutations = new ArrayList<>(mutations);
-      }
-    }
-
-    /**
-     * Sets AccumuloWriter config in app configuration
-     */
-    static void setConfig(SimpleConfiguration sc, String id, Options ac) {
-      String prefix = "recipes.accumulo.writer." + id;
-      sc.setProperty(prefix + ".instance", ac.instanceName);
-      sc.setProperty(prefix + ".zookeepers", ac.zookeepers);
-      sc.setProperty(prefix + ".user", ac.user);
-      sc.setProperty(prefix + ".password", ac.password);
-      sc.setProperty(prefix + ".table", ac.table);
-    }
-
-    /**
-     * Gets Accumulo Options from app configuration
-     */
-    static Options getConfig(SimpleConfiguration sc, String id) {
-      String prefix = "recipes.accumulo.writer." + id;
-      String instanceName = sc.getString(prefix + ".instance");
-      String zookeepers = sc.getString(prefix + ".zookeepers");
-      String user = sc.getString(prefix + ".user");
-      String password = sc.getString(prefix + ".password");
-      String table = sc.getString(prefix + ".table");
-      return new Options(instanceName, zookeepers, user, password, table);
-    }
-
-    private static class ExportTask implements Runnable {
-
-      private BatchWriter bw;
-
-      ExportTask(String instanceName, String zookeepers, String user, String 
password, String table)
-          throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
-        ZooKeeperInstance zki =
-            new ZooKeeperInstance(new 
ClientConfiguration().withInstance(instanceName).withZkHosts(
-                zookeepers));
-
-        // TODO need to close batch writer
-        Connector conn = zki.getConnector(user, new PasswordToken(password));
-        try {
-          bw = conn.createBatchWriter(table, new BatchWriterConfig());
-        } catch (TableNotFoundException tnfe) {
-          try {
-            conn.tableOperations().create(table);
-          } catch (TableExistsException e) {
-            // nothing to do
-          }
-
-          bw = conn.createBatchWriter(table, new BatchWriterConfig());
-        }
-      }
-
-      @Override
-      public void run() {
-
-        ArrayList<Mutations> exports = new ArrayList<>();
-
-        while (true) {
-          try {
-            exports.clear();
-
-            // gather export from all threads that have placed an item on the 
queue
-            exports.add(exportQueue.take());
-            exportQueue.drainTo(exports);
-
-            for (Mutations ml : exports) {
-              bw.addMutations(ml.mutations);
-            }
-
-            bw.flush();
-
-            // notify all threads waiting after flushing
-            for (Mutations ml : exports) {
-              ml.cdl.countDown();
-            }
-
-          } catch (InterruptedException | MutationsRejectedException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      }
-
-    }
-
-    private static LinkedBlockingQueue<Mutations> exportQueue = null;
-
-    private AccumuloWriter(String instanceName, String zookeepers, String 
user, String password,
-        String table) {
-
-      // TODO: fix this write to static and remove findbugs max rank override 
in pom.xml
-      exportQueue = new LinkedBlockingQueue<>(10000);
-
-      try {
-        Thread queueProcessingTask =
-            new Thread(new ExportTask(instanceName, zookeepers, user, 
password, table));
-        queueProcessingTask.setDaemon(true);
-        queueProcessingTask.start();
-      } catch (Exception e) {
-        throw new IllegalStateException(e);
-      }
-    }
-
-    private static Map<String, AccumuloWriter> exporters = new HashMap<>();
-
-
-    static AccumuloWriter getInstance(SimpleConfiguration sc, String id) {
-      return getInstance(getConfig(sc, id));
-    }
-
-    static AccumuloWriter getInstance(Options ac) {
-      return getInstance(ac.instanceName, ac.zookeepers, ac.user, ac.password, 
ac.table);
-    }
-
-    static synchronized AccumuloWriter getInstance(String instanceName, String 
zookeepers,
-        String user, String password, String table) {
-
-      String key =
-          instanceName + ":" + zookeepers + ":" + user + ":" + 
password.hashCode() + ":" + table;
-
-      AccumuloWriter ret = exporters.get(key);
-
-      if (ret == null) {
-        ret = new AccumuloWriter(instanceName, zookeepers, user, password, 
table);
-        exporters.put(key, ret);
-      }
-
-      return ret;
-    }
-
-    void write(Collection<Mutation> mutations) {
-      Mutations work = new Mutations(mutations);
-      exportQueue.add(work);
-      try {
-        work.cdl.await();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-  }
-
-  /**
-   * Accumulo export queue options
-   *
-   * @since 1.0.0
-   */
-  public static class Options {
-    String instanceName;
-    String zookeepers;
-    String user;
-    String password;
-    String table;
-
-    public Options(String instanceName, String zookeepers, String user, String 
password,
-        String table) {
-      this.instanceName = instanceName;
-      this.zookeepers = zookeepers;
-      this.user = user;
-      this.password = password;
-      this.table = table;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
index 54fc8ca..4cb0730 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
@@ -17,27 +17,50 @@ package org.apache.fluo.recipes.accumulo.export;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.recipes.core.export.ExportQueue;
 import org.apache.fluo.recipes.core.export.Exporter;
 import org.apache.fluo.recipes.core.export.SequencedExport;
 
 /**
- * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo 
using a
- * {@link AccumuloExportQueue.AccumuloWriter}
+ * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo. 
For an overview of how
+ * to use this, see the project level documentation for exporting to Accumulo.
  *
  * @since 1.0.0
  */
 public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
 
-  private AccumuloExportQueue.AccumuloWriter accumuloWriter;
+  /**
+   * Use this to configure the Accumulo table where an AccumuloExporter's 
mutations will be written.
+   * Create and pass to {@link 
ExportQueue.Options#setExporterConfiguration(SimpleConfiguration)}
+   *
+   * @since 1.0.0
+   */
+  public static class Configuration extends SimpleConfiguration {
+
+    public Configuration(String instanceName, String zookeepers, String user, 
String password,
+        String table) {
+      super.setProperty("instanceName", instanceName);
+      super.setProperty("zookeepers", zookeepers);
+      super.setProperty("user", user);
+      super.setProperty("password", password);
+      super.setProperty("table", table);
+    }
+  }
+
+  private AccumuloWriter accumuloWriter;
 
   @Override
-  public void init(String queueId, Context context) throws Exception {
-    accumuloWriter =
-        
AccumuloExportQueue.AccumuloWriter.getInstance(context.getAppConfiguration(), 
queueId);
+  public void init(Exporter.Context context) throws Exception {
+    accumuloWriter = 
AccumuloWriter.getInstance(context.getExporterConfiguration());
   }
 
   @Override
@@ -47,7 +70,7 @@ public abstract class AccumuloExporter<K, V> extends 
Exporter<K, V> {
 
     while (exports.hasNext()) {
       SequencedExport<K, V> export = exports.next();
-      buffer.addAll(processExport(export));
+      buffer.addAll(translate(export));
     }
 
     if (buffer.size() > 0) {
@@ -55,6 +78,49 @@ public abstract class AccumuloExporter<K, V> extends 
Exporter<K, V> {
     }
   }
 
-  protected abstract Collection<Mutation> processExport(SequencedExport<K, V> 
export);
+  protected abstract Collection<Mutation> translate(SequencedExport<K, V> 
export);
+
+  /**
+   * Generates Accumulo mutations by comparing the differences between a 
RowColumn/Bytes map that is
+   * generated for old and new data and represents how the data should exist 
in Accumulo. When
+   * comparing each row/column/value (RCV) of old and new data, mutations are 
generated using the
+   * following rules:
+   * <ul>
+   * <li>If old and new data have the same RCV, nothing is done.
+   * <li>If old and new data have same row/column but different values, an 
update mutation is
+   * created for the row/column.
+   * <li>If old data has a row/column that is not in the new data, a delete 
mutation is generated.
+   * <li>If new data has a row/column that is not in the old data, an insert 
mutation is generated.
+   * <li>Only one mutation is generated per row.
+   * <li>The export sequence number is used for the timestamp in the mutation.
+   * </ul>
+   *
+   * @param oldData Map containing old row/column data
+   * @param newData Map containing new row/column data
+   * @param seq Export sequence number
+   */
+  public static Collection<Mutation> generateMutations(long seq, 
Map<RowColumn, Bytes> oldData,
+      Map<RowColumn, Bytes> newData) {
+    Map<Bytes, Mutation> mutationMap = new HashMap<>();
+    for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
+      RowColumn rc = entry.getKey();
+      if (!newData.containsKey(rc)) {
+        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new 
Mutation(r.toArray()));
+        m.putDelete(rc.getColumn().getFamily().toArray(), 
rc.getColumn().getQualifier().toArray(),
+            seq);
+      }
+    }
+    for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) {
+      RowColumn rc = entry.getKey();
+      Column col = rc.getColumn();
+      Bytes newVal = entry.getValue();
+      Bytes oldVal = oldData.get(rc);
+      if (oldVal == null || !oldVal.equals(newVal)) {
+        Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new 
Mutation(r.toArray()));
+        m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, 
newVal.toArray());
+      }
+    }
+    return mutationMap.values();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
index aaea742..abaecfd 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
@@ -34,7 +34,7 @@ import org.apache.fluo.recipes.core.transaction.TxLog;
 public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
 
   @Override
-  protected Collection<Mutation> processExport(SequencedExport<String, TxLog> 
export) {
+  protected Collection<Mutation> translate(SequencedExport<String, TxLog> 
export) {
     return generateMutations(export.getSequence(), export.getValue());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java
new file mode 100644
index 0000000..9016a2c
--- /dev/null
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.accumulo.export;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.fluo.api.config.SimpleConfiguration;
+
+/**
+ * Writes mutations to Accumulo using a shared batch writer
+ *
+ * @since 1.0.0
+ */
+class AccumuloWriter {
+
+  private static class Mutations {
+    List<Mutation> mutations;
+    CountDownLatch cdl = new CountDownLatch(1);
+
+    Mutations(Collection<Mutation> mutations) {
+      this.mutations = new ArrayList<>(mutations);
+    }
+  }
+
+  private static class ExportTask implements Runnable {
+
+    private BatchWriter bw;
+
+    ExportTask(String instanceName, String zookeepers, String user, String 
password, String table)
+        throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
+      ZooKeeperInstance zki =
+          new ZooKeeperInstance(new 
ClientConfiguration().withInstance(instanceName).withZkHosts(
+              zookeepers));
+
+      // TODO need to close batch writer
+      Connector conn = zki.getConnector(user, new PasswordToken(password));
+      try {
+        bw = conn.createBatchWriter(table, new BatchWriterConfig());
+      } catch (TableNotFoundException tnfe) {
+        try {
+          conn.tableOperations().create(table);
+        } catch (TableExistsException e) {
+          // nothing to do
+        }
+
+        bw = conn.createBatchWriter(table, new BatchWriterConfig());
+      }
+    }
+
+    @Override
+    public void run() {
+
+      ArrayList<AccumuloWriter.Mutations> exports = new ArrayList<>();
+
+      while (true) {
+        try {
+          exports.clear();
+
+          // gather export from all threads that have placed an item on the 
queue
+          exports.add(exportQueue.take());
+          exportQueue.drainTo(exports);
+
+          for (AccumuloWriter.Mutations ml : exports) {
+            bw.addMutations(ml.mutations);
+          }
+
+          bw.flush();
+
+          // notify all threads waiting after flushing
+          for (AccumuloWriter.Mutations ml : exports) {
+            ml.cdl.countDown();
+          }
+
+        } catch (InterruptedException | MutationsRejectedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+  }
+
+  private static LinkedBlockingQueue<AccumuloWriter.Mutations> exportQueue = 
null;
+
+  private AccumuloWriter(String instanceName, String zookeepers, String user, 
String password,
+      String table) {
+
+    // TODO: fix this write to static and remove findbugs max rank override in 
pom.xml
+    exportQueue = new LinkedBlockingQueue<>(10000);
+
+    try {
+      Thread queueProcessingTask =
+          new Thread(new ExportTask(instanceName, zookeepers, user, password, 
table));
+      queueProcessingTask.setDaemon(true);
+      queueProcessingTask.start();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private static Map<String, AccumuloWriter> exporters = new HashMap<>();
+
+
+  static AccumuloWriter getInstance(SimpleConfiguration sc) {
+    String instanceName = sc.getString("instanceName");
+    String zookeepers = sc.getString("zookeepers");
+    String user = sc.getString("user");
+    String password = sc.getString("password");
+    String table = sc.getString("table");
+    return getInstance(instanceName, zookeepers, user, password, table);
+  }
+
+  static synchronized AccumuloWriter getInstance(String instanceName, String 
zookeepers,
+      String user, String password, String table) {
+
+    String key =
+        instanceName + ":" + zookeepers + ":" + user + ":" + 
password.hashCode() + ":" + table;
+
+    AccumuloWriter ret = exporters.get(key);
+
+    if (ret == null) {
+      ret = new AccumuloWriter(instanceName, zookeepers, user, password, 
table);
+      exporters.put(key, ret);
+    }
+
+    return ret;
+  }
+
+  void write(Collection<Mutation> mutations) {
+    AccumuloWriter.Mutations work = new Mutations(mutations);
+    exportQueue.add(work);
+    try {
+      work.cdl.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
 
b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
index e4be08a..d0ebaee 100644
--- 
a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
+++ 
b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
@@ -45,7 +45,7 @@ public class AccumuloExportTest {
 
   public static Collection<Mutation> genMutations(String key, long seq, 
Optional<String> oldVal,
       Optional<String> newVal) {
-    return AccumuloExportQueue.generateMutations(seq, genData(key, oldVal), 
genData(key, newVal));
+    return AccumuloExporter.generateMutations(seq, genData(key, oldVal), 
genData(key, newVal));
   }
 
   public static Mutation makePut(String key, String val, long seq) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
 
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
index b3070e0..d086577 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
@@ -20,6 +20,7 @@ import java.util.NoSuchElementException;
 
 import com.google.common.collect.Iterators;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
@@ -98,7 +99,23 @@ public class ExportObserver<K, V> extends AbstractObserver {
 
     memLimit = opts.getBufferSize();
 
-    exporter.init(queueId, context);
+    exporter.init(new Exporter.Context() {
+
+      @Override
+      public String getQueueId() {
+        return queueId;
+      }
+
+      @Override
+      public SimpleConfiguration getExporterConfiguration() {
+        return opts.getExporterConfiguration();
+      }
+
+      @Override
+      public Context getObserverContext() {
+        return context;
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
 
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
index fcc3a74..150503e 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java
@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -187,6 +188,7 @@ public class ExportQueue<K, V> {
     String valueType;
     String exporterType;
     String queueId;
+    SimpleConfiguration exporterConfig;
 
     Options(String queueId, SimpleConfiguration appConfig) {
       this.queueId = queueId;
@@ -198,6 +200,8 @@ public class ExportQueue<K, V> {
       this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", 
DEFAULT_BUFFER_SIZE);
       this.bucketsPerTablet =
           appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", 
DEFAULT_BUCKETS_PER_TABLET);
+
+      this.exporterConfig = appConfig.subset(PREFIX + queueId + 
".exporterCfg");
     }
 
     public Options(String queueId, String exporterType, String keyType, String 
valueType,
@@ -262,6 +266,24 @@ public class ExportQueue<K, V> {
       return bucketsPerTablet;
     }
 
+    /**
+     * Sets exporter specific configuration. This configuration will be made 
available to an
+     * {@link Exporter} via {@link 
Exporter.Context#getExporterConfiguration()}.
+     */
+    public Options setExporterConfiguration(SimpleConfiguration config) {
+      Objects.requireNonNull(config);
+      this.exporterConfig = config;
+      return this;
+    }
+
+    public SimpleConfiguration getExporterConfiguration() {
+      if (exporterConfig == null) {
+        return new SimpleConfiguration();
+      }
+
+      return exporterConfig;
+    }
+
     public String getQueueId() {
       return queueId;
     }
@@ -275,9 +297,19 @@ public class ExportQueue<K, V> {
       if (bufferSize != null) {
         appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize);
       }
+
       if (bucketsPerTablet != null) {
         appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", 
bucketsPerTablet);
       }
+
+      if (exporterConfig != null) {
+        Iterator<String> keys = exporterConfig.getKeys();
+        while (keys.hasNext()) {
+          String key = keys.next();
+          appConfig.setProperty(PREFIX + queueId + ".exporterCfg." + key,
+              exporterConfig.getRawString(key));
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java 
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
index f00cfb0..66aa7e8 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java
@@ -17,14 +17,23 @@ package org.apache.fluo.recipes.core.export;
 
 import java.util.Iterator;
 
-import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.observer.Observer;
 
 /**
  * @since 1.0.0
  */
 public abstract class Exporter<K, V> {
 
-  public void init(String queueId, Context observerContext) throws Exception {}
+  public interface Context {
+    String getQueueId();
+
+    SimpleConfiguration getExporterConfiguration();
+
+    Observer.Context getObserverContext();
+  }
+
+  public void init(Exporter.Context exporterContext) throws Exception {}
 
   /**
    * Must be able to handle same key being exported multiple times and key 
being exported out of

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
 
b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
index b07caea..60982bd 100644
--- 
a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java
@@ -16,6 +16,7 @@
 package org.apache.fluo.recipes.core.export;
 
 import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.recipes.core.export.ExportQueue.Options;
 import org.junit.Assert;
 import org.junit.Test;
@@ -25,9 +26,13 @@ public class OptionsTest {
   public void testExportQueueOptions() {
     FluoConfiguration conf = new FluoConfiguration();
 
+    SimpleConfiguration ec1 = new SimpleConfiguration();
+    ec1.setProperty("ep1", "ev1");
+    ec1.setProperty("ep2", 3L);
+
     ExportQueue.configure(conf, new Options("Q1", "ET", "KT", "VT", 100));
     ExportQueue.configure(conf, new Options("Q2", "ET2", "KT2", "VT2", 
200).setBucketsPerTablet(20)
-        .setBufferSize(1000000));
+        .setBufferSize(1000000).setExporterConfiguration(ec1));
 
     Options opts1 = new Options("Q1", conf.getAppConfiguration());
 
@@ -47,5 +52,10 @@ public class OptionsTest {
     Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20);
     Assert.assertEquals(opts2.bufferSize.intValue(), 1000000);
 
+    SimpleConfiguration ec2 = opts2.getExporterConfiguration();
+
+    Assert.assertEquals("ev1", ec2.getString("ep1"));
+    Assert.assertEquals(3, ec2.getInt("ep2"));
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
----------------------------------------------------------------------
diff --git 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
index 38985f1..d4ad914 100644
--- 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
+++ 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java
@@ -30,7 +30,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
 import org.apache.fluo.recipes.core.export.ExportQueue;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
 import org.apache.hadoop.io.Text;
@@ -51,10 +51,12 @@ public class AccumuloExporterIT extends 
AccumuloExportITBase {
 
     MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster();
 
-    AccumuloExportQueue.configure(getFluoConfiguration(), new 
ExportQueue.Options(QUEUE_ID,
-        SimpleExporter.class.getName(), String.class.getName(), 
String.class.getName(), 5)
-        .setBucketsPerTablet(1), new 
AccumuloExportQueue.Options(miniAccumulo.getInstanceName(),
-        miniAccumulo.getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, 
exportTable));
+    ExportQueue.configure(
+        getFluoConfiguration(),
+        new ExportQueue.Options(QUEUE_ID, SimpleExporter.class, String.class, 
String.class, 5)
+            .setBucketsPerTablet(1).setExporterConfiguration(
+                new 
AccumuloExporter.Configuration(miniAccumulo.getInstanceName(), miniAccumulo
+                    .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, 
exportTable)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
----------------------------------------------------------------------
diff --git 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
index e5b4e1a..635f6e9 100644
--- 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
+++ 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java
@@ -28,7 +28,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue;
+import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
 import org.apache.fluo.recipes.accumulo.export.AccumuloReplicator;
 import org.apache.fluo.recipes.core.export.ExportQueue;
 import org.apache.fluo.recipes.core.transaction.TxLog;
@@ -55,12 +55,11 @@ public class AccumuloReplicatorIT extends 
AccumuloExportITBase {
 
     MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster();
 
-    AccumuloExportQueue.configure(
-        getFluoConfiguration(),
-        new ExportQueue.Options(QUEUE_ID, AccumuloReplicator.class.getName(), 
String.class
-            .getName(), TxLog.class.getName(), 5),
-        new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), 
miniAccumulo
-            .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable));
+    ExportQueue.configure(getFluoConfiguration(), new 
ExportQueue.Options(QUEUE_ID,
+        AccumuloReplicator.class.getName(), String.class.getName(), 
TxLog.class.getName(), 5)
+        .setExporterConfiguration(new AccumuloExporter.Configuration(
+            miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), 
ACCUMULO_USER,
+            ACCUMULO_PASSWORD, exportTable)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
----------------------------------------------------------------------
diff --git 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
index 5aa5812..14c1aa4 100644
--- 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
+++ 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
@@ -25,7 +25,7 @@ import org.apache.fluo.recipes.core.export.SequencedExport;
 public class SimpleExporter extends AccumuloExporter<String, String> {
 
   @Override
-  protected Collection<Mutation> processExport(SequencedExport<String, String> 
export) {
+  protected Collection<Mutation> translate(SequencedExport<String, String> 
export) {
     Mutation m = new Mutation(export.getKey());
     m.put("cf", "cq", export.getSequence(), export.getValue());
     return Collections.singleton(m);


Reply via email to