Repository: incubator-fluo-recipes Updated Branches: refs/heads/master 6f5177363 -> dc765718c
Added standard way to setup per exporter configuration. Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/dc765718 Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/dc765718 Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/dc765718 Branch: refs/heads/master Commit: dc765718c59104a1643cbaa0ae81d565ae5e4146 Parents: 6f51773 Author: Keith Turner <ktur...@apache.org> Authored: Wed Sep 21 18:02:23 2016 -0400 Committer: Keith Turner <ke...@deenlo.com> Committed: Thu Sep 22 12:24:18 2016 -0400 ---------------------------------------------------------------------- docs/accumulo-export-queue.md | 24 +- .../accumulo/export/AccumuloExportQueue.java | 278 ------------------- .../accumulo/export/AccumuloExporter.java | 84 +++++- .../accumulo/export/AccumuloReplicator.java | 2 +- .../recipes/accumulo/export/AccumuloWriter.java | 169 +++++++++++ .../accumulo/export/AccumuloExportTest.java | 2 +- .../recipes/core/export/ExportObserver.java | 19 +- .../fluo/recipes/core/export/ExportQueue.java | 32 +++ .../fluo/recipes/core/export/Exporter.java | 13 +- .../fluo/recipes/core/export/OptionsTest.java | 12 +- .../recipes/test/export/AccumuloExporterIT.java | 12 +- .../test/export/AccumuloReplicatorIT.java | 13 +- .../recipes/test/export/SimpleExporter.java | 2 +- 13 files changed, 347 insertions(+), 315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/docs/accumulo-export-queue.md ---------------------------------------------------------------------- diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md index dde04fb..33a1c97 100644 --- a/docs/accumulo-export-queue.md +++ b/docs/accumulo-export-queue.md @@ -3,8 +3,8 @@ ## Background The [Export Queue Recipe][1] provides a generic foundation for building export mechanism to any -external data store. The [AccumuloExportQueue] provides an implementation of this recipe for -Accumulo. The [AccumuloExportQueue] is located the 'fluo-recipes-accumulo' module and provides the +external data store. The [AccumuloExporter] provides an implementation of this recipe for +Accumulo. The [AccumuloExporter] is located the `fluo-recipes-accumulo` module and provides the following functionality: * Safely batches writes to Accumulo made by multiple transactions exporting data. @@ -24,7 +24,7 @@ Exporting to Accumulo is easy. Follow the steps below: public class SimpleExporter extends AccumuloExporter<String, String> { @Override - protected Collection<Mutation> processExport(SequencedExport<String, String> export) { + protected Collection<Mutation> translate(SequencedExport<String, String> export) { Mutation m = new Mutation(export.getKey()); m.put("cf", "cq", export.getSequence(), export.getValue()); return Collections.singleton(m); @@ -32,7 +32,7 @@ Exporting to Accumulo is easy. Follow the steps below: } ``` -2. With a `SimpleExporter` created, configure a [AccumuloExportQueue] to use `SimpleExporter` and +2. With a `SimpleExporter` created, configure an `ExportQueue` to use `SimpleExporter` and give it information on how to connect to Accumulo. ```java @@ -46,10 +46,17 @@ Exporting to Accumulo is easy. Follow the steps below: String password = // Accumulo user password String exportTable = // Name of table to export to - // Configure accumulo export queue - AccumuloExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID, - SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), numMapBuckets), - new AccumuloExportQueue.Options(instance, zookeepers, user, password, exportTable)); + + // Create config for export table. + AccumuloExporter.Configuration exportTableCfg = + new AccumuloExporter.Configuration(instance, zookeepers, user, password, exportTable); + + // Create config for export queue. + ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, SimpleExporter.class, + String.class, String.class, numMapBuckets).setExporterConfiguration(exportTableCfg); + + // Configure export queue. This will modify fluoConfig. + ExportQueue.configure(fluoConfig, qeOpts); // Initialize Fluo using fluoConfig ``` @@ -83,7 +90,6 @@ Exporting to Accumulo is easy. Follow the steps below: [AccumuloReplicator] is a specialized [AccumuloExporter] that replicates a Fluo table to Accumulo. [1]: export-queue.md -[AccumuloExportQueue]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java [AccumuloExporter]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java [AccumuloReplicator]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java deleted file mode 100644 index fdba5f3..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumn; -import org.apache.fluo.recipes.core.export.ExportQueue; - -public class AccumuloExportQueue { - - /** - * Configures AccumuloExportQueue - * - * @param fc Fluo configuration - * @param eqo Export queue options - * @param ao Accumulo export queue options - */ - public static void configure(FluoConfiguration fc, ExportQueue.Options eqo, Options ao) { - ExportQueue.configure(fc, eqo); - AccumuloWriter.setConfig(fc.getAppConfiguration(), eqo.getQueueId(), ao); - } - - /** - * Generates Accumulo mutations by comparing the differences between a RowColumn/Bytes map that is - * generated for old and new data and represents how the data should exist in Accumulo. When - * comparing each row/column/value (RCV) of old and new data, mutations are generated using the - * following rules: - * <ul> - * <li>If old and new data have the same RCV, nothing is done. - * <li>If old and new data have same row/column but different values, an update mutation is - * created for the row/column. - * <li>If old data has a row/column that is not in the new data, a delete mutation is generated. - * <li>If new data has a row/column that is not in the old data, an insert mutation is generated. - * <li>Only one mutation is generated per row. - * <li>The export sequence number is used for the timestamp in the mutation. - * </ul> - * - * @param oldData Map containing old row/column data - * @param newData Map containing new row/column data - * @param seq Export sequence number - */ - public static Collection<Mutation> generateMutations(long seq, Map<RowColumn, Bytes> oldData, - Map<RowColumn, Bytes> newData) { - Map<Bytes, Mutation> mutationMap = new HashMap<>(); - for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) { - RowColumn rc = entry.getKey(); - if (!newData.containsKey(rc)) { - Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); - m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(), - seq); - } - } - for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) { - RowColumn rc = entry.getKey(); - Column col = rc.getColumn(); - Bytes newVal = entry.getValue(); - Bytes oldVal = oldData.get(rc); - if (oldVal == null || !oldVal.equals(newVal)) { - Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); - m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray()); - } - } - return mutationMap.values(); - } - - /** - * Writes mutations to Accumulo using a shared batch writer - * - * @since 1.0.0 - */ - static class AccumuloWriter { - - private static class Mutations { - List<Mutation> mutations; - CountDownLatch cdl = new CountDownLatch(1); - - Mutations(Collection<Mutation> mutations) { - this.mutations = new ArrayList<>(mutations); - } - } - - /** - * Sets AccumuloWriter config in app configuration - */ - static void setConfig(SimpleConfiguration sc, String id, Options ac) { - String prefix = "recipes.accumulo.writer." + id; - sc.setProperty(prefix + ".instance", ac.instanceName); - sc.setProperty(prefix + ".zookeepers", ac.zookeepers); - sc.setProperty(prefix + ".user", ac.user); - sc.setProperty(prefix + ".password", ac.password); - sc.setProperty(prefix + ".table", ac.table); - } - - /** - * Gets Accumulo Options from app configuration - */ - static Options getConfig(SimpleConfiguration sc, String id) { - String prefix = "recipes.accumulo.writer." + id; - String instanceName = sc.getString(prefix + ".instance"); - String zookeepers = sc.getString(prefix + ".zookeepers"); - String user = sc.getString(prefix + ".user"); - String password = sc.getString(prefix + ".password"); - String table = sc.getString(prefix + ".table"); - return new Options(instanceName, zookeepers, user, password, table); - } - - private static class ExportTask implements Runnable { - - private BatchWriter bw; - - ExportTask(String instanceName, String zookeepers, String user, String password, String table) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - ZooKeeperInstance zki = - new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts( - zookeepers)); - - // TODO need to close batch writer - Connector conn = zki.getConnector(user, new PasswordToken(password)); - try { - bw = conn.createBatchWriter(table, new BatchWriterConfig()); - } catch (TableNotFoundException tnfe) { - try { - conn.tableOperations().create(table); - } catch (TableExistsException e) { - // nothing to do - } - - bw = conn.createBatchWriter(table, new BatchWriterConfig()); - } - } - - @Override - public void run() { - - ArrayList<Mutations> exports = new ArrayList<>(); - - while (true) { - try { - exports.clear(); - - // gather export from all threads that have placed an item on the queue - exports.add(exportQueue.take()); - exportQueue.drainTo(exports); - - for (Mutations ml : exports) { - bw.addMutations(ml.mutations); - } - - bw.flush(); - - // notify all threads waiting after flushing - for (Mutations ml : exports) { - ml.cdl.countDown(); - } - - } catch (InterruptedException | MutationsRejectedException e) { - throw new RuntimeException(e); - } - } - } - - } - - private static LinkedBlockingQueue<Mutations> exportQueue = null; - - private AccumuloWriter(String instanceName, String zookeepers, String user, String password, - String table) { - - // TODO: fix this write to static and remove findbugs max rank override in pom.xml - exportQueue = new LinkedBlockingQueue<>(10000); - - try { - Thread queueProcessingTask = - new Thread(new ExportTask(instanceName, zookeepers, user, password, table)); - queueProcessingTask.setDaemon(true); - queueProcessingTask.start(); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - private static Map<String, AccumuloWriter> exporters = new HashMap<>(); - - - static AccumuloWriter getInstance(SimpleConfiguration sc, String id) { - return getInstance(getConfig(sc, id)); - } - - static AccumuloWriter getInstance(Options ac) { - return getInstance(ac.instanceName, ac.zookeepers, ac.user, ac.password, ac.table); - } - - static synchronized AccumuloWriter getInstance(String instanceName, String zookeepers, - String user, String password, String table) { - - String key = - instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table; - - AccumuloWriter ret = exporters.get(key); - - if (ret == null) { - ret = new AccumuloWriter(instanceName, zookeepers, user, password, table); - exporters.put(key, ret); - } - - return ret; - } - - void write(Collection<Mutation> mutations) { - Mutations work = new Mutations(mutations); - exportQueue.add(work); - try { - work.cdl.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - } - - /** - * Accumulo export queue options - * - * @since 1.0.0 - */ - public static class Options { - String instanceName; - String zookeepers; - String user; - String password; - String table; - - public Options(String instanceName, String zookeepers, String user, String password, - String table) { - this.instanceName = instanceName; - this.zookeepers = zookeepers; - this.user = user; - this.password = password; - this.table = table; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java index 54fc8ca..4cb0730 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java @@ -17,27 +17,50 @@ package org.apache.fluo.recipes.accumulo.export; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.api.observer.Observer.Context; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.recipes.core.export.ExportQueue; import org.apache.fluo.recipes.core.export.Exporter; import org.apache.fluo.recipes.core.export.SequencedExport; /** - * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo using a - * {@link AccumuloExportQueue.AccumuloWriter} + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo. For an overview of how + * to use this, see the project level documentation for exporting to Accumulo. * * @since 1.0.0 */ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> { - private AccumuloExportQueue.AccumuloWriter accumuloWriter; + /** + * Use this to configure the Accumulo table where an AccumuloExporter's mutations will be written. + * Create and pass to {@link ExportQueue.Options#setExporterConfiguration(SimpleConfiguration)} + * + * @since 1.0.0 + */ + public static class Configuration extends SimpleConfiguration { + + public Configuration(String instanceName, String zookeepers, String user, String password, + String table) { + super.setProperty("instanceName", instanceName); + super.setProperty("zookeepers", zookeepers); + super.setProperty("user", user); + super.setProperty("password", password); + super.setProperty("table", table); + } + } + + private AccumuloWriter accumuloWriter; @Override - public void init(String queueId, Context context) throws Exception { - accumuloWriter = - AccumuloExportQueue.AccumuloWriter.getInstance(context.getAppConfiguration(), queueId); + public void init(Exporter.Context context) throws Exception { + accumuloWriter = AccumuloWriter.getInstance(context.getExporterConfiguration()); } @Override @@ -47,7 +70,7 @@ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> { while (exports.hasNext()) { SequencedExport<K, V> export = exports.next(); - buffer.addAll(processExport(export)); + buffer.addAll(translate(export)); } if (buffer.size() > 0) { @@ -55,6 +78,49 @@ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> { } } - protected abstract Collection<Mutation> processExport(SequencedExport<K, V> export); + protected abstract Collection<Mutation> translate(SequencedExport<K, V> export); + + /** + * Generates Accumulo mutations by comparing the differences between a RowColumn/Bytes map that is + * generated for old and new data and represents how the data should exist in Accumulo. When + * comparing each row/column/value (RCV) of old and new data, mutations are generated using the + * following rules: + * <ul> + * <li>If old and new data have the same RCV, nothing is done. + * <li>If old and new data have same row/column but different values, an update mutation is + * created for the row/column. + * <li>If old data has a row/column that is not in the new data, a delete mutation is generated. + * <li>If new data has a row/column that is not in the old data, an insert mutation is generated. + * <li>Only one mutation is generated per row. + * <li>The export sequence number is used for the timestamp in the mutation. + * </ul> + * + * @param oldData Map containing old row/column data + * @param newData Map containing new row/column data + * @param seq Export sequence number + */ + public static Collection<Mutation> generateMutations(long seq, Map<RowColumn, Bytes> oldData, + Map<RowColumn, Bytes> newData) { + Map<Bytes, Mutation> mutationMap = new HashMap<>(); + for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) { + RowColumn rc = entry.getKey(); + if (!newData.containsKey(rc)) { + Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); + m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(), + seq); + } + } + for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) { + RowColumn rc = entry.getKey(); + Column col = rc.getColumn(); + Bytes newVal = entry.getValue(); + Bytes oldVal = oldData.get(rc); + if (oldVal == null || !oldVal.equals(newVal)) { + Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); + m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray()); + } + } + return mutationMap.values(); + } } http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java index aaea742..abaecfd 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java @@ -34,7 +34,7 @@ import org.apache.fluo.recipes.core.transaction.TxLog; public class AccumuloReplicator extends AccumuloExporter<String, TxLog> { @Override - protected Collection<Mutation> processExport(SequencedExport<String, TxLog> export) { + protected Collection<Mutation> translate(SequencedExport<String, TxLog> export) { return generateMutations(export.getSequence(), export.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java new file mode 100644 index 0000000..9016a2c --- /dev/null +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloWriter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.accumulo.export; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.fluo.api.config.SimpleConfiguration; + +/** + * Writes mutations to Accumulo using a shared batch writer + * + * @since 1.0.0 + */ +class AccumuloWriter { + + private static class Mutations { + List<Mutation> mutations; + CountDownLatch cdl = new CountDownLatch(1); + + Mutations(Collection<Mutation> mutations) { + this.mutations = new ArrayList<>(mutations); + } + } + + private static class ExportTask implements Runnable { + + private BatchWriter bw; + + ExportTask(String instanceName, String zookeepers, String user, String password, String table) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + ZooKeeperInstance zki = + new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts( + zookeepers)); + + // TODO need to close batch writer + Connector conn = zki.getConnector(user, new PasswordToken(password)); + try { + bw = conn.createBatchWriter(table, new BatchWriterConfig()); + } catch (TableNotFoundException tnfe) { + try { + conn.tableOperations().create(table); + } catch (TableExistsException e) { + // nothing to do + } + + bw = conn.createBatchWriter(table, new BatchWriterConfig()); + } + } + + @Override + public void run() { + + ArrayList<AccumuloWriter.Mutations> exports = new ArrayList<>(); + + while (true) { + try { + exports.clear(); + + // gather export from all threads that have placed an item on the queue + exports.add(exportQueue.take()); + exportQueue.drainTo(exports); + + for (AccumuloWriter.Mutations ml : exports) { + bw.addMutations(ml.mutations); + } + + bw.flush(); + + // notify all threads waiting after flushing + for (AccumuloWriter.Mutations ml : exports) { + ml.cdl.countDown(); + } + + } catch (InterruptedException | MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + } + + } + + private static LinkedBlockingQueue<AccumuloWriter.Mutations> exportQueue = null; + + private AccumuloWriter(String instanceName, String zookeepers, String user, String password, + String table) { + + // TODO: fix this write to static and remove findbugs max rank override in pom.xml + exportQueue = new LinkedBlockingQueue<>(10000); + + try { + Thread queueProcessingTask = + new Thread(new ExportTask(instanceName, zookeepers, user, password, table)); + queueProcessingTask.setDaemon(true); + queueProcessingTask.start(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private static Map<String, AccumuloWriter> exporters = new HashMap<>(); + + + static AccumuloWriter getInstance(SimpleConfiguration sc) { + String instanceName = sc.getString("instanceName"); + String zookeepers = sc.getString("zookeepers"); + String user = sc.getString("user"); + String password = sc.getString("password"); + String table = sc.getString("table"); + return getInstance(instanceName, zookeepers, user, password, table); + } + + static synchronized AccumuloWriter getInstance(String instanceName, String zookeepers, + String user, String password, String table) { + + String key = + instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table; + + AccumuloWriter ret = exporters.get(key); + + if (ret == null) { + ret = new AccumuloWriter(instanceName, zookeepers, user, password, table); + exporters.put(key, ret); + } + + return ret; + } + + void write(Collection<Mutation> mutations) { + AccumuloWriter.Mutations work = new Mutations(mutations); + exportQueue.add(work); + try { + work.cdl.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java index e4be08a..d0ebaee 100644 --- a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java +++ b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java @@ -45,7 +45,7 @@ public class AccumuloExportTest { public static Collection<Mutation> genMutations(String key, long seq, Optional<String> oldVal, Optional<String> newVal) { - return AccumuloExportQueue.generateMutations(seq, genData(key, oldVal), genData(key, newVal)); + return AccumuloExporter.generateMutations(seq, genData(key, oldVal), genData(key, newVal)); } public static Mutation makePut(String key, String val, long seq) { http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java index b3070e0..d086577 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java @@ -20,6 +20,7 @@ import java.util.NoSuchElementException; import com.google.common.collect.Iterators; import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; @@ -98,7 +99,23 @@ public class ExportObserver<K, V> extends AbstractObserver { memLimit = opts.getBufferSize(); - exporter.init(queueId, context); + exporter.init(new Exporter.Context() { + + @Override + public String getQueueId() { + return queueId; + } + + @Override + public SimpleConfiguration getExporterConfiguration() { + return opts.getExporterConfiguration(); + } + + @Override + public Context getObserverContext() { + return context; + } + }); } @Override http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java index fcc3a74..150503e 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; @@ -187,6 +188,7 @@ public class ExportQueue<K, V> { String valueType; String exporterType; String queueId; + SimpleConfiguration exporterConfig; Options(String queueId, SimpleConfiguration appConfig) { this.queueId = queueId; @@ -198,6 +200,8 @@ public class ExportQueue<K, V> { this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE); this.bucketsPerTablet = appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET); + + this.exporterConfig = appConfig.subset(PREFIX + queueId + ".exporterCfg"); } public Options(String queueId, String exporterType, String keyType, String valueType, @@ -262,6 +266,24 @@ public class ExportQueue<K, V> { return bucketsPerTablet; } + /** + * Sets exporter specific configuration. This configuration will be made available to an + * {@link Exporter} via {@link Exporter.Context#getExporterConfiguration()}. + */ + public Options setExporterConfiguration(SimpleConfiguration config) { + Objects.requireNonNull(config); + this.exporterConfig = config; + return this; + } + + public SimpleConfiguration getExporterConfiguration() { + if (exporterConfig == null) { + return new SimpleConfiguration(); + } + + return exporterConfig; + } + public String getQueueId() { return queueId; } @@ -275,9 +297,19 @@ public class ExportQueue<K, V> { if (bufferSize != null) { appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize); } + if (bucketsPerTablet != null) { appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet); } + + if (exporterConfig != null) { + Iterator<String> keys = exporterConfig.getKeys(); + while (keys.hasNext()) { + String key = keys.next(); + appConfig.setProperty(PREFIX + queueId + ".exporterCfg." + key, + exporterConfig.getRawString(key)); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java index f00cfb0..66aa7e8 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java @@ -17,14 +17,23 @@ package org.apache.fluo.recipes.core.export; import java.util.Iterator; -import org.apache.fluo.api.observer.Observer.Context; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.api.observer.Observer; /** * @since 1.0.0 */ public abstract class Exporter<K, V> { - public void init(String queueId, Context observerContext) throws Exception {} + public interface Context { + String getQueueId(); + + SimpleConfiguration getExporterConfiguration(); + + Observer.Context getObserverContext(); + } + + public void init(Exporter.Context exporterContext) throws Exception {} /** * Must be able to handle same key being exported multiple times and key being exported out of http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java index b07caea..60982bd 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java @@ -16,6 +16,7 @@ package org.apache.fluo.recipes.core.export; import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.recipes.core.export.ExportQueue.Options; import org.junit.Assert; import org.junit.Test; @@ -25,9 +26,13 @@ public class OptionsTest { public void testExportQueueOptions() { FluoConfiguration conf = new FluoConfiguration(); + SimpleConfiguration ec1 = new SimpleConfiguration(); + ec1.setProperty("ep1", "ev1"); + ec1.setProperty("ep2", 3L); + ExportQueue.configure(conf, new Options("Q1", "ET", "KT", "VT", 100)); ExportQueue.configure(conf, new Options("Q2", "ET2", "KT2", "VT2", 200).setBucketsPerTablet(20) - .setBufferSize(1000000)); + .setBufferSize(1000000).setExporterConfiguration(ec1)); Options opts1 = new Options("Q1", conf.getAppConfiguration()); @@ -47,5 +52,10 @@ public class OptionsTest { Assert.assertEquals(opts2.bucketsPerTablet.intValue(), 20); Assert.assertEquals(opts2.bufferSize.intValue(), 1000000); + SimpleConfiguration ec2 = opts2.getExporterConfiguration(); + + Assert.assertEquals("ev1", ec2.getString("ep1")); + Assert.assertEquals(3, ec2.getInt("ep2")); + } } http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java index 38985f1..d4ad914 100644 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java @@ -30,7 +30,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue; +import org.apache.fluo.recipes.accumulo.export.AccumuloExporter; import org.apache.fluo.recipes.core.export.ExportQueue; import org.apache.fluo.recipes.test.AccumuloExportITBase; import org.apache.hadoop.io.Text; @@ -51,10 +51,12 @@ public class AccumuloExporterIT extends AccumuloExportITBase { MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster(); - AccumuloExportQueue.configure(getFluoConfiguration(), new ExportQueue.Options(QUEUE_ID, - SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), 5) - .setBucketsPerTablet(1), new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), - miniAccumulo.getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable)); + ExportQueue.configure( + getFluoConfiguration(), + new ExportQueue.Options(QUEUE_ID, SimpleExporter.class, String.class, String.class, 5) + .setBucketsPerTablet(1).setExporterConfiguration( + new AccumuloExporter.Configuration(miniAccumulo.getInstanceName(), miniAccumulo + .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable))); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java index e5b4e1a..635f6e9 100644 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java @@ -28,7 +28,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue; +import org.apache.fluo.recipes.accumulo.export.AccumuloExporter; import org.apache.fluo.recipes.accumulo.export.AccumuloReplicator; import org.apache.fluo.recipes.core.export.ExportQueue; import org.apache.fluo.recipes.core.transaction.TxLog; @@ -55,12 +55,11 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase { MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster(); - AccumuloExportQueue.configure( - getFluoConfiguration(), - new ExportQueue.Options(QUEUE_ID, AccumuloReplicator.class.getName(), String.class - .getName(), TxLog.class.getName(), 5), - new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), miniAccumulo - .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable)); + ExportQueue.configure(getFluoConfiguration(), new ExportQueue.Options(QUEUE_ID, + AccumuloReplicator.class.getName(), String.class.getName(), TxLog.class.getName(), 5) + .setExporterConfiguration(new AccumuloExporter.Configuration( + miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER, + ACCUMULO_PASSWORD, exportTable))); } @Test http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/dc765718/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java index 5aa5812..14c1aa4 100644 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java @@ -25,7 +25,7 @@ import org.apache.fluo.recipes.core.export.SequencedExport; public class SimpleExporter extends AccumuloExporter<String, String> { @Override - protected Collection<Mutation> processExport(SequencedExport<String, String> export) { + protected Collection<Mutation> translate(SequencedExport<String, String> export) { Mutation m = new Mutation(export.getKey()); m.put("cf", "cq", export.getSequence(), export.getValue()); return Collections.singleton(m);