Repository: accumulo
Updated Branches:
  refs/heads/master a40733374 -> fa80ae12b


ACCUMULO-4229 Add breaking test to reproduce sync problem

Add two tests in BatchWriterInTabletServerIT.
The first passes and establishes a baseline.
The second fails, proving the issue.

I added an extra iterator and a utility class for the IT.
These are in the test package, so they do not get packaged into normal builds.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5275f335
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5275f335
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5275f335

Branch: refs/heads/master
Commit: 5275f335cf6ba22a017edaccf64e096f7d59ceb2
Parents: c044127
Author: Dylan Hutchison <dhutc...@cs.washington.edu>
Authored: Sat Apr 23 01:02:01 2016 -0700
Committer: Dylan Hutchison <dhutc...@cs.washington.edu>
Committed: Thu May 5 14:14:40 2016 -0700

----------------------------------------------------------------------
 .../accumulo/test/BatchWriterIterator.java      | 256 ++++++++++++++++++
 .../apache/accumulo/test/SerializationUtil.java | 261 +++++++++++++++++++
 .../test/BatchWriterInTabletServerIT.java       | 118 +++++++++
 3 files changed, 635 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5275f335/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java 
b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
new file mode 100644
index 0000000..7d44729
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Iterator that opens a BatchWriter and writes to another table.
+ * <p>
+ * For each entry passed to this iterator, this writes a certain number of 
entries with the same key to another table and passes the entry downstream of 
this
+ * iterator with its value replaced by either "{@value SUCCESS_STRING}" or a 
description of what failed. Success counts as all entries writing to the result
+ * table within a timeout period. Failure counts as one of the entries taking 
longer than the timeout period.
+ * <p>
+ * Configure this iterator by calling the static {@link #iteratorSetting} 
method.
+ */
+public class BatchWriterIterator extends WrappingIterator {
+  private static final Logger log = 
LoggerFactory.getLogger(BatchWriterIterator.class);
+
+  private int sleepAfterFirstWrite = 0;
+  private int numEntriesToWritePerEntry = 10;
+  private long batchWriterTimeout = 0;
+  private long batchWriterMaxMemory = 0;
+  private boolean clearCacheAfterFirstWrite = false;
+  private boolean splitAfterFirstWrite = false;
+
+  public static final String OPT_sleepAfterFirstWrite = 
"sleepAfterFirstWrite", OPT_numEntriesToWritePerEntry = 
"numEntriesToWritePerEntry",
+      OPT_batchWriterTimeout = "batchWriterTimeout", OPT_batchWriterMaxMemory 
= "batchWriterMaxMemory",
+      OPT_clearCacheAfterFirstWrite = "clearCacheAfterFirstWrite", 
OPT_splitAfterFirstWrite = "splitAfterFirstWrite";
+
+  private String instanceName;
+  private String tableName;
+  private String zookeeperHost;
+  private int zookeeperTimeout = -1;
+  private String username;
+  private AuthenticationToken auth = null;
+
+  public static final String ZOOKEEPERHOST = "zookeeperHost", INSTANCENAME = 
"instanceName", TABLENAME = "tableName", USERNAME = "username",
+      ZOOKEEPERTIMEOUT = "zookeeperTimeout", AUTHENTICATION_TOKEN = 
"authenticationToken", // base64 encoding of token
+      AUTHENTICATION_TOKEN_CLASS = "authenticationTokenClass"; // class of 
token
+
+  private BatchWriter batchWriter;
+  private boolean firstWrite = true;
+  private Value topValue = null;
+  private Connector connector;
+
+  public static final String SUCCESS_STRING = "success";
+  public static final Value SUCCESS_VALUE = new 
Value(SUCCESS_STRING.getBytes());
+
+  public static IteratorSetting iteratorSetting(int priority, int 
sleepAfterFirstWrite, long batchWriterTimeout, long batchWriterMaxMemory,
+      int numEntriesToWrite, String tableName, Connector connector, 
AuthenticationToken token, boolean clearCacheAfterFirstWrite, boolean 
splitAfterFirstWrite) {
+    return iteratorSetting(priority, sleepAfterFirstWrite, batchWriterTimeout, 
batchWriterMaxMemory, numEntriesToWrite, tableName, connector.getInstance()
+        .getZooKeepers(), connector.getInstance().getInstanceName(), 
connector.getInstance().getZooKeepersSessionTimeOut(), connector.whoami(), 
token,
+        clearCacheAfterFirstWrite, splitAfterFirstWrite);
+  }
+
+  public static IteratorSetting iteratorSetting(int priority, int 
sleepAfterFirstWrite, long batchWriterTimeout, long batchWriterMaxMemory,
+      int numEntriesToWrite, String tableName, String zookeeperHost, String 
instanceName, int zookeeperTimeout, String username, AuthenticationToken token,
+      boolean clearCacheAfterFirstWrite, boolean splitAfterFirstWrite) {
+    IteratorSetting itset = new IteratorSetting(priority, 
BatchWriterIterator.class);
+    itset.addOption(OPT_sleepAfterFirstWrite, 
Integer.toString(sleepAfterFirstWrite));
+    itset.addOption(OPT_numEntriesToWritePerEntry, 
Integer.toString(numEntriesToWrite));
+    itset.addOption(OPT_batchWriterTimeout, Long.toString(batchWriterTimeout));
+    itset.addOption(OPT_batchWriterMaxMemory, 
Long.toString(batchWriterMaxMemory));
+    itset.addOption(OPT_clearCacheAfterFirstWrite, 
Boolean.toString(clearCacheAfterFirstWrite));
+    itset.addOption(OPT_splitAfterFirstWrite, 
Boolean.toString(splitAfterFirstWrite));
+
+    itset.addOption(TABLENAME, tableName);
+    itset.addOption(ZOOKEEPERHOST, zookeeperHost);
+    itset.addOption(ZOOKEEPERTIMEOUT, Integer.toString(zookeeperTimeout));
+    itset.addOption(INSTANCENAME, instanceName);
+    itset.addOption(USERNAME, username);
+    itset.addOption(AUTHENTICATION_TOKEN_CLASS, token.getClass().getName());
+    itset.addOption(AUTHENTICATION_TOKEN, 
SerializationUtil.serializeWritableBase64(token));
+
+    return itset;
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    parseOptions(options);
+    initBatchWriter();
+  }
+
+  private void parseOptions(Map<String,String> options) {
+    if (options.containsKey(OPT_numEntriesToWritePerEntry))
+      numEntriesToWritePerEntry = 
Integer.parseInt(options.get(OPT_numEntriesToWritePerEntry));
+    if (options.containsKey(OPT_sleepAfterFirstWrite))
+      sleepAfterFirstWrite = 
Integer.parseInt(options.get(OPT_sleepAfterFirstWrite));
+    if (options.containsKey(OPT_batchWriterTimeout))
+      batchWriterTimeout = Long.parseLong(options.get(OPT_batchWriterTimeout));
+    if (options.containsKey(OPT_batchWriterMaxMemory))
+      batchWriterMaxMemory = 
Long.parseLong(options.get(OPT_batchWriterMaxMemory));
+    if (options.containsKey(OPT_clearCacheAfterFirstWrite))
+      clearCacheAfterFirstWrite = 
Boolean.parseBoolean(options.get(OPT_clearCacheAfterFirstWrite));
+    if (options.containsKey(OPT_splitAfterFirstWrite))
+      splitAfterFirstWrite = 
Boolean.parseBoolean(options.get(OPT_splitAfterFirstWrite));
+
+    instanceName = options.get(INSTANCENAME);
+    tableName = options.get(TABLENAME);
+    zookeeperHost = options.get(ZOOKEEPERHOST);
+    zookeeperTimeout = Integer.parseInt(options.get(ZOOKEEPERTIMEOUT));
+    username = options.get(USERNAME);
+    String authClass = options.get(AUTHENTICATION_TOKEN_CLASS);
+    String authString = options.get(AUTHENTICATION_TOKEN);
+    auth = SerializationUtil.subclassNewInstance(authClass, 
AuthenticationToken.class);
+    SerializationUtil.deserializeWritableBase64(auth, authString);
+  }
+
+  private void initBatchWriter() {
+    ClientConfiguration cc = 
ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeeperHost).withZkTimeout(zookeeperTimeout);
+    Instance instance = new ZooKeeperInstance(cc);
+    try {
+      connector = instance.getConnector(username, auth);
+    } catch (AccumuloException e) {
+      log.error("failed to connect to Accumulo instance " + instanceName, e);
+      throw new RuntimeException(e);
+    } catch (AccumuloSecurityException e) {
+      log.error("failed to connect to Accumulo instance " + instanceName, e);
+      throw new RuntimeException(e);
+    }
+
+    BatchWriterConfig bwc = new BatchWriterConfig();
+    bwc.setMaxMemory(batchWriterMaxMemory);
+    bwc.setTimeout(batchWriterTimeout, TimeUnit.SECONDS);
+
+    try {
+      batchWriter = connector.createBatchWriter(tableName, bwc);
+    } catch (TableNotFoundException e) {
+      log.error(tableName + " does not exist in instance " + instanceName, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Write numEntriesToWritePerEntry. Flush. Set topValue accordingly.
+   */
+  private void processNext() {
+    assert hasTop();
+    Key k = getTopKey();
+    Text row = k.getRow(), cf = k.getColumnFamily(), cq = 
k.getColumnQualifier();
+    Value v = super.getTopValue();
+    String failure = null;
+    try {
+      for (int i = 0; i < numEntriesToWritePerEntry; i++) {
+        Mutation m = new Mutation(row);
+        m.put(cf, cq, v);
+        batchWriter.addMutation(m);
+
+        if (firstWrite) {
+          batchWriter.flush();
+          if (clearCacheAfterFirstWrite)
+            TabletLocator.clearLocators();
+          if (splitAfterFirstWrite) {
+            SortedSet<Text> splits = new TreeSet<Text>();
+            splits.add(new Text(row));
+            connector.tableOperations().addSplits(tableName, splits);
+          }
+          if (sleepAfterFirstWrite > 0)
+            try {
+              Thread.sleep(sleepAfterFirstWrite);
+            } catch (InterruptedException ignored) {}
+          firstWrite = false;
+        }
+      }
+
+      batchWriter.flush();
+    } catch (MutationsRejectedException e) {
+      log.error("", e);
+      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
+    } catch (TimedOutException e) {
+      log.error("", e);
+      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
+    } catch (AccumuloSecurityException e) {
+      log.error("", e);
+      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
+    } catch (TableNotFoundException e) {
+      log.error("", e);
+      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
+    } catch (AccumuloException e) {
+      log.error("", e);
+      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
+    }
+    topValue = failure == null ? SUCCESS_VALUE : new Value(failure.getBytes());
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    super.finalize();
+    batchWriter.close();
+  }
+
+  @Override
+  public void next() throws IOException {
+    super.next();
+    if (hasTop())
+      processNext();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) throws IOException {
+    super.seek(range, columnFamilies, inclusive);
+    if (hasTop())
+      processNext();
+  }
+
+  @Override
+  public Value getTopValue() {
+    return topValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5275f335/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java 
b/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java
new file mode 100644
index 0000000..a093f44
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Partially based from {@link org.apache.commons.lang3.SerializationUtils}.
+ *
+ * <p>
+ * For serializing and de-serializing objects.
+ * </p>
+ */
+public class SerializationUtil {
+  private static final Logger log = 
LoggerFactory.getLogger(SerializationUtil.class);
+
+  private SerializationUtil() {}
+
+  /**
+   * Create a new instance of a class whose name is given, as a descendent of 
a given subclass.
+   */
+  public static <E> E subclassNewInstance(String classname, Class<E> 
parentClass) {
+    Class<?> c;
+    try {
+      c = Class.forName(classname);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Can't find class: " + classname, e);
+    }
+    Class<? extends E> cm;
+    try {
+      cm = c.asSubclass(parentClass);
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(classname + " is not a subclass of " 
+ parentClass.getName(), e);
+    }
+    try {
+      return cm.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("can't instantiate new instance of " 
+ cm.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("can't instantiate new instance of " 
+ cm.getName(), e);
+    }
+  }
+
+  public static String serializeWritableBase64(Writable writable) {
+    byte[] b = serializeWritable(writable);
+    return org.apache.accumulo.core.util.Base64.encodeBase64String(b);
+  }
+
+  public static void deserializeWritableBase64(Writable writable, String str) {
+    byte[] b = Base64.decodeBase64(str);
+    deserializeWritable(writable, b);
+  }
+
+  public static String serializeBase64(Serializable obj) {
+    byte[] b = serialize(obj);
+    return org.apache.accumulo.core.util.Base64.encodeBase64String(b);
+  }
+
+  public static Object deserializeBase64(String str) {
+    byte[] b = Base64.decodeBase64(str);
+    return deserialize(b);
+  }
+
+  // Interop with Hadoop Writable
+  // -----------------------------------------------------------------------
+
+  public static byte[] serializeWritable(Writable writable) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+    serializeWritable(writable, baos);
+    return baos.toByteArray();
+  }
+
+  public static void serializeWritable(Writable obj, OutputStream 
outputStream) {
+    Preconditions.checkNotNull(obj);
+    Preconditions.checkNotNull(outputStream);
+    DataOutputStream out = null;
+    try {
+      out = new DataOutputStream(outputStream);
+      obj.write(out);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (out != null)
+        try {
+          out.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  public static void deserializeWritable(Writable writable, InputStream 
inputStream) {
+    Preconditions.checkNotNull(writable);
+    Preconditions.checkNotNull(inputStream);
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(inputStream);
+      writable.readFields(in);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (in != null)
+        try {
+          in.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  public static void deserializeWritable(Writable writable, byte[] objectData) 
{
+    Preconditions.checkNotNull(objectData);
+    deserializeWritable(writable, new ByteArrayInputStream(objectData));
+  }
+
+  // Serialize
+  // -----------------------------------------------------------------------
+
+  /**
+   * <p>
+   * Serializes an {@code Object} to the specified stream.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream will be closed once the object is written. This avoids the 
need for a finally clause, and maybe also exception handling, in the application
+   * code.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream passed in is not buffered internally within this method. This 
is the responsibility of your application if desired.
+   * </p>
+   *
+   * @param obj
+   *          the object to serialize to bytes, may be null
+   * @param outputStream
+   *          the stream to write to, must not be null
+   * @throws IllegalArgumentException
+   *           if {@code outputStream} is {@code null}
+   */
+  public static void serialize(Serializable obj, OutputStream outputStream) {
+    Preconditions.checkNotNull(outputStream);
+    ObjectOutputStream out = null;
+    try {
+      out = new ObjectOutputStream(outputStream);
+      out.writeObject(obj);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (out != null)
+        try {
+          out.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  /**
+   * <p>
+   * Serializes an {@code Object} to a byte array for storage/serialization.
+   * </p>
+   *
+   * @param obj
+   *          the object to serialize to bytes
+   * @return a byte[] with the converted Serializable
+   */
+  public static byte[] serialize(Serializable obj) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+    serialize(obj, baos);
+    return baos.toByteArray();
+  }
+
+  // Deserialize
+  // -----------------------------------------------------------------------
+
+  /**
+   * <p>
+   * Deserializes an {@code Object} from the specified stream.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream will be closed once the object is written. This avoids the 
need for a finally clause, and maybe also exception handling, in the application
+   * code.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream passed in is not buffered internally within this method. This 
is the responsibility of your application if desired.
+   * </p>
+   *
+   * @param inputStream
+   *          the serialized object input stream, must not be null
+   * @return the deserialized object
+   * @throws IllegalArgumentException
+   *           if {@code inputStream} is {@code null}
+   */
+  public static Object deserialize(InputStream inputStream) {
+    Preconditions.checkNotNull(inputStream);
+    ObjectInputStream in = null;
+    try {
+      in = new ObjectInputStream(inputStream);
+      return in.readObject();
+    } catch (ClassNotFoundException ex) {
+      throw new RuntimeException(ex);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (in != null)
+        try {
+          in.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  /**
+   * <p>
+   * Deserializes a single {@code Object} from an array of bytes.
+   * </p>
+   *
+   * @param objectData
+   *          the serialized object, must not be null
+   * @return the deserialized object
+   * @throws IllegalArgumentException
+   *           if {@code objectData} is {@code null}
+   */
+  public static Object deserialize(byte[] objectData) {
+    Preconditions.checkNotNull(objectData);
+    return deserialize(new ByteArrayInputStream(objectData));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5275f335/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java 
b/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
new file mode 100644
index 0000000..2551c8e
--- /dev/null
+++ 
b/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * Test writing to another table from inside an iterator.
+ *
+ * @see BatchWriterIterator
+ */
+public class BatchWriterInTabletServerIT extends AccumuloClusterIT {
+
+  /**
+   * This test should succeed.
+   */
+  @Test
+  public void testNormalWrite() throws Exception {
+    String[] uniqueNames = getUniqueNames(2);
+    String t1 = uniqueNames[0], t2 = uniqueNames[1];
+    Connector c = getConnector();
+    int numEntriesToWritePerEntry = 50;
+    IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 
1000, numEntriesToWritePerEntry, t2, c, getToken(), false, false);
+    test(t1, t2, c, itset, numEntriesToWritePerEntry);
+  }
+
+  /**
+   * ACCUMULO-4229
+   * <p>
+   * This test should fail because the client shares a LocatorCache with the 
tablet server. Adding a split after the Locator cache falls out of sync causes 
the
+   * BatchWriter to continuously attempt to write to an old, closed tablet. It 
will only do so for 15 seconds because we set a timeout on the BatchWriter.
+   */
+  @Test
+  public void testClearLocatorAndSplitWrite() throws Exception {
+    String[] uniqueNames = getUniqueNames(2);
+    String t1 = uniqueNames[0], t2 = uniqueNames[1];
+    Connector c = getConnector();
+    int numEntriesToWritePerEntry = 50;
+    IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 
1000, numEntriesToWritePerEntry, t2, c, getToken(), true, true);
+    test(t1, t2, c, itset, numEntriesToWritePerEntry);
+  }
+
+  private void test(String t1, String t2, Connector c, IteratorSetting itset, 
int numEntriesToWritePerEntry) throws Exception {
+    // Write an entry to t1
+    c.tableOperations().create(t1);
+    Key k = new Key(new Text("row"), new Text("cf"), new Text("cq"));
+    Value v = new Value("1".getBytes());
+    {
+      BatchWriterConfig config = new BatchWriterConfig();
+      config.setMaxMemory(0);
+      BatchWriter writer = c.createBatchWriter(t1, config);
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), v);
+      writer.addMutation(m);
+      writer.close();
+    }
+
+    // Create t2 with a combiner to count entries written to it
+    c.tableOperations().create(t2);
+    IteratorSetting summer = new IteratorSetting(2, "summer", 
SummingCombiner.class);
+    LongCombiner.setEncodingType(summer, LongCombiner.Type.STRING);
+    LongCombiner.setCombineAllColumns(summer, true);
+    c.tableOperations().attachIterator(t2, summer);
+
+    Map.Entry<Key,Value> actual;
+    // Scan t1 with an iterator that writes to table t2
+    Scanner scanner = c.createScanner(t1, Authorizations.EMPTY);
+    scanner.addScanIterator(itset);
+    actual = Iterators.getOnlyElement(scanner.iterator());
+    Assert.assertTrue(actual.getKey().equals(k, 
PartialKey.ROW_COLFAM_COLQUAL));
+    Assert.assertEquals(BatchWriterIterator.SUCCESS_VALUE, actual.getValue());
+    scanner.close();
+
+    // ensure entries correctly wrote to table t2
+    scanner = c.createScanner(t2, Authorizations.EMPTY);
+    actual = Iterators.getOnlyElement(scanner.iterator());
+    // System.out.println("t2 entry is " + actual.getKey().toStringNoTime() + 
" -> " + actual.getValue());
+    Assert.assertTrue(actual.getKey().equals(k, 
PartialKey.ROW_COLFAM_COLQUAL));
+    Assert.assertEquals(numEntriesToWritePerEntry, 
Integer.parseInt(actual.getValue().toString()));
+    scanner.close();
+
+    c.tableOperations().delete(t1);
+    c.tableOperations().delete(t2);
+  }
+
+}

Reply via email to