Repository: accumulo Updated Branches: refs/heads/master a40733374 -> fa80ae12b
ACCUMULO-4229 Add breaking test to reproduce sync problem Add two tests in BatchWriterInTabletServerIT. The first passes and establishes a baseline. The second fails, proving the issue. I added an extra iterator and a utility class for the IT. These are in the test package, so they do not get packaged into normal builds. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5275f335 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5275f335 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5275f335 Branch: refs/heads/master Commit: 5275f335cf6ba22a017edaccf64e096f7d59ceb2 Parents: c044127 Author: Dylan Hutchison <dhutc...@cs.washington.edu> Authored: Sat Apr 23 01:02:01 2016 -0700 Committer: Dylan Hutchison <dhutc...@cs.washington.edu> Committed: Thu May 5 14:14:40 2016 -0700 ---------------------------------------------------------------------- .../accumulo/test/BatchWriterIterator.java | 256 ++++++++++++++++++ .../apache/accumulo/test/SerializationUtil.java | 261 +++++++++++++++++++ .../test/BatchWriterInTabletServerIT.java | 118 +++++++++ 3 files changed, 635 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5275f335/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java new file mode 100644 index 0000000..7d44729 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TimedOutException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +/** + * Iterator that opens a BatchWriter and writes to another table. + * <p> + * For each entry passed to this iterator, this writes a certain number of entries with the same key to another table and passes the entry downstream of this + * iterator with its value replaced by either "{@value SUCCESS_STRING}" or a description of what failed. Success counts as all entries writing to the result + * table within a timeout period. Failure counts as one of the entries taking longer than the timeout period. + * <p> + * Configure this iterator by calling the static {@link #iteratorSetting} method. + */ +public class BatchWriterIterator extends WrappingIterator { + private static final Logger log = LoggerFactory.getLogger(BatchWriterIterator.class); + + private int sleepAfterFirstWrite = 0; + private int numEntriesToWritePerEntry = 10; + private long batchWriterTimeout = 0; + private long batchWriterMaxMemory = 0; + private boolean clearCacheAfterFirstWrite = false; + private boolean splitAfterFirstWrite = false; + + public static final String OPT_sleepAfterFirstWrite = "sleepAfterFirstWrite", OPT_numEntriesToWritePerEntry = "numEntriesToWritePerEntry", + OPT_batchWriterTimeout = "batchWriterTimeout", OPT_batchWriterMaxMemory = "batchWriterMaxMemory", + OPT_clearCacheAfterFirstWrite = "clearCacheAfterFirstWrite", OPT_splitAfterFirstWrite = "splitAfterFirstWrite"; + + private String instanceName; + private String tableName; + private String zookeeperHost; + private int zookeeperTimeout = -1; + private String username; + private AuthenticationToken auth = null; + + public static final String ZOOKEEPERHOST = "zookeeperHost", INSTANCENAME = "instanceName", TABLENAME = "tableName", USERNAME = "username", + ZOOKEEPERTIMEOUT = "zookeeperTimeout", AUTHENTICATION_TOKEN = "authenticationToken", // base64 encoding of token + AUTHENTICATION_TOKEN_CLASS = "authenticationTokenClass"; // class of token + + private BatchWriter batchWriter; + private boolean firstWrite = true; + private Value topValue = null; + private Connector connector; + + public static final String SUCCESS_STRING = "success"; + public static final Value SUCCESS_VALUE = new Value(SUCCESS_STRING.getBytes()); + + public static IteratorSetting iteratorSetting(int priority, int sleepAfterFirstWrite, long batchWriterTimeout, long batchWriterMaxMemory, + int numEntriesToWrite, String tableName, Connector connector, AuthenticationToken token, boolean clearCacheAfterFirstWrite, boolean splitAfterFirstWrite) { + return iteratorSetting(priority, sleepAfterFirstWrite, batchWriterTimeout, batchWriterMaxMemory, numEntriesToWrite, tableName, connector.getInstance() + .getZooKeepers(), connector.getInstance().getInstanceName(), connector.getInstance().getZooKeepersSessionTimeOut(), connector.whoami(), token, + clearCacheAfterFirstWrite, splitAfterFirstWrite); + } + + public static IteratorSetting iteratorSetting(int priority, int sleepAfterFirstWrite, long batchWriterTimeout, long batchWriterMaxMemory, + int numEntriesToWrite, String tableName, String zookeeperHost, String instanceName, int zookeeperTimeout, String username, AuthenticationToken token, + boolean clearCacheAfterFirstWrite, boolean splitAfterFirstWrite) { + IteratorSetting itset = new IteratorSetting(priority, BatchWriterIterator.class); + itset.addOption(OPT_sleepAfterFirstWrite, Integer.toString(sleepAfterFirstWrite)); + itset.addOption(OPT_numEntriesToWritePerEntry, Integer.toString(numEntriesToWrite)); + itset.addOption(OPT_batchWriterTimeout, Long.toString(batchWriterTimeout)); + itset.addOption(OPT_batchWriterMaxMemory, Long.toString(batchWriterMaxMemory)); + itset.addOption(OPT_clearCacheAfterFirstWrite, Boolean.toString(clearCacheAfterFirstWrite)); + itset.addOption(OPT_splitAfterFirstWrite, Boolean.toString(splitAfterFirstWrite)); + + itset.addOption(TABLENAME, tableName); + itset.addOption(ZOOKEEPERHOST, zookeeperHost); + itset.addOption(ZOOKEEPERTIMEOUT, Integer.toString(zookeeperTimeout)); + itset.addOption(INSTANCENAME, instanceName); + itset.addOption(USERNAME, username); + itset.addOption(AUTHENTICATION_TOKEN_CLASS, token.getClass().getName()); + itset.addOption(AUTHENTICATION_TOKEN, SerializationUtil.serializeWritableBase64(token)); + + return itset; + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + parseOptions(options); + initBatchWriter(); + } + + private void parseOptions(Map<String,String> options) { + if (options.containsKey(OPT_numEntriesToWritePerEntry)) + numEntriesToWritePerEntry = Integer.parseInt(options.get(OPT_numEntriesToWritePerEntry)); + if (options.containsKey(OPT_sleepAfterFirstWrite)) + sleepAfterFirstWrite = Integer.parseInt(options.get(OPT_sleepAfterFirstWrite)); + if (options.containsKey(OPT_batchWriterTimeout)) + batchWriterTimeout = Long.parseLong(options.get(OPT_batchWriterTimeout)); + if (options.containsKey(OPT_batchWriterMaxMemory)) + batchWriterMaxMemory = Long.parseLong(options.get(OPT_batchWriterMaxMemory)); + if (options.containsKey(OPT_clearCacheAfterFirstWrite)) + clearCacheAfterFirstWrite = Boolean.parseBoolean(options.get(OPT_clearCacheAfterFirstWrite)); + if (options.containsKey(OPT_splitAfterFirstWrite)) + splitAfterFirstWrite = Boolean.parseBoolean(options.get(OPT_splitAfterFirstWrite)); + + instanceName = options.get(INSTANCENAME); + tableName = options.get(TABLENAME); + zookeeperHost = options.get(ZOOKEEPERHOST); + zookeeperTimeout = Integer.parseInt(options.get(ZOOKEEPERTIMEOUT)); + username = options.get(USERNAME); + String authClass = options.get(AUTHENTICATION_TOKEN_CLASS); + String authString = options.get(AUTHENTICATION_TOKEN); + auth = SerializationUtil.subclassNewInstance(authClass, AuthenticationToken.class); + SerializationUtil.deserializeWritableBase64(auth, authString); + } + + private void initBatchWriter() { + ClientConfiguration cc = ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeeperHost).withZkTimeout(zookeeperTimeout); + Instance instance = new ZooKeeperInstance(cc); + try { + connector = instance.getConnector(username, auth); + } catch (AccumuloException e) { + log.error("failed to connect to Accumulo instance " + instanceName, e); + throw new RuntimeException(e); + } catch (AccumuloSecurityException e) { + log.error("failed to connect to Accumulo instance " + instanceName, e); + throw new RuntimeException(e); + } + + BatchWriterConfig bwc = new BatchWriterConfig(); + bwc.setMaxMemory(batchWriterMaxMemory); + bwc.setTimeout(batchWriterTimeout, TimeUnit.SECONDS); + + try { + batchWriter = connector.createBatchWriter(tableName, bwc); + } catch (TableNotFoundException e) { + log.error(tableName + " does not exist in instance " + instanceName, e); + throw new RuntimeException(e); + } + } + + /** + * Write numEntriesToWritePerEntry. Flush. Set topValue accordingly. + */ + private void processNext() { + assert hasTop(); + Key k = getTopKey(); + Text row = k.getRow(), cf = k.getColumnFamily(), cq = k.getColumnQualifier(); + Value v = super.getTopValue(); + String failure = null; + try { + for (int i = 0; i < numEntriesToWritePerEntry; i++) { + Mutation m = new Mutation(row); + m.put(cf, cq, v); + batchWriter.addMutation(m); + + if (firstWrite) { + batchWriter.flush(); + if (clearCacheAfterFirstWrite) + TabletLocator.clearLocators(); + if (splitAfterFirstWrite) { + SortedSet<Text> splits = new TreeSet<Text>(); + splits.add(new Text(row)); + connector.tableOperations().addSplits(tableName, splits); + } + if (sleepAfterFirstWrite > 0) + try { + Thread.sleep(sleepAfterFirstWrite); + } catch (InterruptedException ignored) {} + firstWrite = false; + } + } + + batchWriter.flush(); + } catch (MutationsRejectedException e) { + log.error("", e); + failure = e.getClass().getSimpleName() + ": " + e.getMessage(); + } catch (TimedOutException e) { + log.error("", e); + failure = e.getClass().getSimpleName() + ": " + e.getMessage(); + } catch (AccumuloSecurityException e) { + log.error("", e); + failure = e.getClass().getSimpleName() + ": " + e.getMessage(); + } catch (TableNotFoundException e) { + log.error("", e); + failure = e.getClass().getSimpleName() + ": " + e.getMessage(); + } catch (AccumuloException e) { + log.error("", e); + failure = e.getClass().getSimpleName() + ": " + e.getMessage(); + } + topValue = failure == null ? SUCCESS_VALUE : new Value(failure.getBytes()); + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + batchWriter.close(); + } + + @Override + public void next() throws IOException { + super.next(); + if (hasTop()) + processNext(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + super.seek(range, columnFamilies, inclusive); + if (hasTop()) + processNext(); + } + + @Override + public Value getTopValue() { + return topValue; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5275f335/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java b/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java new file mode 100644 index 0000000..a093f44 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Partially based from {@link org.apache.commons.lang3.SerializationUtils}. + * + * <p> + * For serializing and de-serializing objects. + * </p> + */ +public class SerializationUtil { + private static final Logger log = LoggerFactory.getLogger(SerializationUtil.class); + + private SerializationUtil() {} + + /** + * Create a new instance of a class whose name is given, as a descendent of a given subclass. + */ + public static <E> E subclassNewInstance(String classname, Class<E> parentClass) { + Class<?> c; + try { + c = Class.forName(classname); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class: " + classname, e); + } + Class<? extends E> cm; + try { + cm = c.asSubclass(parentClass); + } catch (ClassCastException e) { + throw new IllegalArgumentException(classname + " is not a subclass of " + parentClass.getName(), e); + } + try { + return cm.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e); + } + } + + public static String serializeWritableBase64(Writable writable) { + byte[] b = serializeWritable(writable); + return org.apache.accumulo.core.util.Base64.encodeBase64String(b); + } + + public static void deserializeWritableBase64(Writable writable, String str) { + byte[] b = Base64.decodeBase64(str); + deserializeWritable(writable, b); + } + + public static String serializeBase64(Serializable obj) { + byte[] b = serialize(obj); + return org.apache.accumulo.core.util.Base64.encodeBase64String(b); + } + + public static Object deserializeBase64(String str) { + byte[] b = Base64.decodeBase64(str); + return deserialize(b); + } + + // Interop with Hadoop Writable + // ----------------------------------------------------------------------- + + public static byte[] serializeWritable(Writable writable) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + serializeWritable(writable, baos); + return baos.toByteArray(); + } + + public static void serializeWritable(Writable obj, OutputStream outputStream) { + Preconditions.checkNotNull(obj); + Preconditions.checkNotNull(outputStream); + DataOutputStream out = null; + try { + out = new DataOutputStream(outputStream); + obj.write(out); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (out != null) + try { + out.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + public static void deserializeWritable(Writable writable, InputStream inputStream) { + Preconditions.checkNotNull(writable); + Preconditions.checkNotNull(inputStream); + DataInputStream in = null; + try { + in = new DataInputStream(inputStream); + writable.readFields(in); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (in != null) + try { + in.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + public static void deserializeWritable(Writable writable, byte[] objectData) { + Preconditions.checkNotNull(objectData); + deserializeWritable(writable, new ByteArrayInputStream(objectData)); + } + + // Serialize + // ----------------------------------------------------------------------- + + /** + * <p> + * Serializes an {@code Object} to the specified stream. + * </p> + * <p/> + * <p> + * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application + * code. + * </p> + * <p/> + * <p> + * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired. + * </p> + * + * @param obj + * the object to serialize to bytes, may be null + * @param outputStream + * the stream to write to, must not be null + * @throws IllegalArgumentException + * if {@code outputStream} is {@code null} + */ + public static void serialize(Serializable obj, OutputStream outputStream) { + Preconditions.checkNotNull(outputStream); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(outputStream); + out.writeObject(obj); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (out != null) + try { + out.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + /** + * <p> + * Serializes an {@code Object} to a byte array for storage/serialization. + * </p> + * + * @param obj + * the object to serialize to bytes + * @return a byte[] with the converted Serializable + */ + public static byte[] serialize(Serializable obj) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + serialize(obj, baos); + return baos.toByteArray(); + } + + // Deserialize + // ----------------------------------------------------------------------- + + /** + * <p> + * Deserializes an {@code Object} from the specified stream. + * </p> + * <p/> + * <p> + * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application + * code. + * </p> + * <p/> + * <p> + * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired. + * </p> + * + * @param inputStream + * the serialized object input stream, must not be null + * @return the deserialized object + * @throws IllegalArgumentException + * if {@code inputStream} is {@code null} + */ + public static Object deserialize(InputStream inputStream) { + Preconditions.checkNotNull(inputStream); + ObjectInputStream in = null; + try { + in = new ObjectInputStream(inputStream); + return in.readObject(); + } catch (ClassNotFoundException ex) { + throw new RuntimeException(ex); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (in != null) + try { + in.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + /** + * <p> + * Deserializes a single {@code Object} from an array of bytes. + * </p> + * + * @param objectData + * the serialized object, must not be null + * @return the deserialized object + * @throws IllegalArgumentException + * if {@code objectData} is {@code null} + */ + public static Object deserialize(byte[] objectData) { + Preconditions.checkNotNull(objectData); + return deserialize(new ByteArrayInputStream(objectData)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5275f335/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java b/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java new file mode 100644 index 0000000..2551c8e --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import com.google.common.collect.Iterators; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.LongCombiner; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +/** + * Test writing to another table from inside an iterator. + * + * @see BatchWriterIterator + */ +public class BatchWriterInTabletServerIT extends AccumuloClusterIT { + + /** + * This test should succeed. + */ + @Test + public void testNormalWrite() throws Exception { + String[] uniqueNames = getUniqueNames(2); + String t1 = uniqueNames[0], t2 = uniqueNames[1]; + Connector c = getConnector(); + int numEntriesToWritePerEntry = 50; + IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getToken(), false, false); + test(t1, t2, c, itset, numEntriesToWritePerEntry); + } + + /** + * ACCUMULO-4229 + * <p> + * This test should fail because the client shares a LocatorCache with the tablet server. Adding a split after the Locator cache falls out of sync causes the + * BatchWriter to continuously attempt to write to an old, closed tablet. It will only do so for 15 seconds because we set a timeout on the BatchWriter. + */ + @Test + public void testClearLocatorAndSplitWrite() throws Exception { + String[] uniqueNames = getUniqueNames(2); + String t1 = uniqueNames[0], t2 = uniqueNames[1]; + Connector c = getConnector(); + int numEntriesToWritePerEntry = 50; + IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getToken(), true, true); + test(t1, t2, c, itset, numEntriesToWritePerEntry); + } + + private void test(String t1, String t2, Connector c, IteratorSetting itset, int numEntriesToWritePerEntry) throws Exception { + // Write an entry to t1 + c.tableOperations().create(t1); + Key k = new Key(new Text("row"), new Text("cf"), new Text("cq")); + Value v = new Value("1".getBytes()); + { + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxMemory(0); + BatchWriter writer = c.createBatchWriter(t1, config); + Mutation m = new Mutation(k.getRow()); + m.put(k.getColumnFamily(), k.getColumnQualifier(), v); + writer.addMutation(m); + writer.close(); + } + + // Create t2 with a combiner to count entries written to it + c.tableOperations().create(t2); + IteratorSetting summer = new IteratorSetting(2, "summer", SummingCombiner.class); + LongCombiner.setEncodingType(summer, LongCombiner.Type.STRING); + LongCombiner.setCombineAllColumns(summer, true); + c.tableOperations().attachIterator(t2, summer); + + Map.Entry<Key,Value> actual; + // Scan t1 with an iterator that writes to table t2 + Scanner scanner = c.createScanner(t1, Authorizations.EMPTY); + scanner.addScanIterator(itset); + actual = Iterators.getOnlyElement(scanner.iterator()); + Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL)); + Assert.assertEquals(BatchWriterIterator.SUCCESS_VALUE, actual.getValue()); + scanner.close(); + + // ensure entries correctly wrote to table t2 + scanner = c.createScanner(t2, Authorizations.EMPTY); + actual = Iterators.getOnlyElement(scanner.iterator()); + // System.out.println("t2 entry is " + actual.getKey().toStringNoTime() + " -> " + actual.getValue()); + Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL)); + Assert.assertEquals(numEntriesToWritePerEntry, Integer.parseInt(actual.getValue().toString())); + scanner.close(); + + c.tableOperations().delete(t1); + c.tableOperations().delete(t2); + } + +}