keith-turner closed pull request #1053: Accumulo 2.0 and Hadoop 3.0 updates
URL: https://github.com/apache/fluo/pull/1053
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index 5a97b504..b6850d15 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,5 @@ before_script:
   - unset _JAVA_OPTIONS
 env:
   - BUILD_ARGS="clean verify javadoc:jar"
-  - BUILD_ARGS="clean verify javadoc:jar -Daccumulo.version=1.9.2"
 script:
   - mvn $BUILD_ARGS
diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml
index c423d859..e43759c2 100644
--- a/modules/accumulo/pom.xml
+++ b/modules/accumulo/pom.xml
@@ -42,7 +42,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+      <artifactId>hadoop-client-api</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
index 75ba96f4..3279ec2f 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -16,11 +16,9 @@
 package org.apache.fluo.accumulo.iterators;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Map;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -28,14 +26,7 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The purpose of this iterator is to make seeking within a columns timestamp 
range efficient.
@@ -49,12 +40,6 @@
   private Range range;
   private Collection<ByteSequence> fams;
   private boolean inclusive;
-  private boolean hasSeeked = false;
-
-  private boolean removedDeletingIterator = false;
-  private int removalFailures = 0;
-
-  private static final Logger log = 
LoggerFactory.getLogger(TimestampSkippingIterator.class);
 
   public TimestampSkippingIterator(SortedKeyValueIterator<Key, Value> source) {
     this.source = source;
@@ -82,7 +67,7 @@ public void skipToTimestamp(Key curCol, long timestamp) 
throws IOException {
     while (source.hasTop()
         && curCol.equals(source.getTopKey(), 
PartialKey.ROW_COLFAM_COLQUAL_COLVIS)
         && timestamp < source.getTopKey().getTimestamp()) {
-      if (count == 10 && shouldSeek()) {
+      if (count == 10) {
         // seek to prefix
         Key seekKey = new Key(curCol);
         seekKey.setTimestamp(timestamp);
@@ -124,93 +109,8 @@ public void skipColumn(Key curCol) throws IOException {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private static SortedKeyValueIterator<Key, Value> getParent(
-      SortedKeyValueIterator<Key, Value> iter) {
-    try {
-      if (iter instanceof WrappingIterator) {
-        Field field = WrappingIterator.class.getDeclaredField("source");
-        field.setAccessible(true);
-        return (SortedKeyValueIterator<Key, Value>) field.get(iter);
-      } else if (iter instanceof SourceSwitchingIterator) {
-        // TODO could sync on SSI
-        Field field = SourceSwitchingIterator.class.getDeclaredField("iter");
-        field.setAccessible(true);
-        return (SortedKeyValueIterator<Key, Value>) field.get(iter);
-      } else if (iter instanceof SynchronizedIterator) {
-        Field field = SynchronizedIterator.class.getDeclaredField("source");
-        field.setAccessible(true);
-        return (SortedKeyValueIterator<Key, Value>) field.get(iter);
-      } else if (iter instanceof SortedMapIterator) {
-        return null;
-      } else {
-        return null;
-      }
-    } catch (NoSuchFieldException | IllegalArgumentException | 
IllegalAccessException e) {
-      log.debug(e.getMessage(), e);
-      return null;
-    }
-  }
-
-  private static boolean setParent(SortedKeyValueIterator<Key, Value> iter,
-      SortedKeyValueIterator<Key, Value> newParent) {
-    try {
-      if (iter instanceof WrappingIterator) {
-        Field field = WrappingIterator.class.getDeclaredField("source");
-        field.setAccessible(true);
-        field.set(iter, newParent);
-        return true;
-      }
-    } catch (NoSuchFieldException | IllegalArgumentException | 
IllegalAccessException e) {
-      log.debug(e.getMessage(), e);
-    }
-
-    return false;
-  }
-
-  private static boolean removeDeletingIterator(SortedKeyValueIterator<Key, 
Value> source) {
-
-    SortedKeyValueIterator<Key, Value> prev = source;
-    SortedKeyValueIterator<Key, Value> parent = getParent(source);
-
-    while (parent != null && !(parent instanceof DeletingIterator)) {
-      prev = parent;
-      parent = getParent(parent);
-    }
-
-    if (parent != null && parent instanceof DeletingIterator) {
-      SortedKeyValueIterator<Key, Value> delParent = getParent(parent);
-      if (delParent != null) {
-        return setParent(prev, delParent);
-      }
-    }
-
-    return false;
-  }
-
-  @VisibleForTesting
-  public final boolean shouldSeek() {
-    /*
-     * This method is a safety check to ensure the deleting iterator was 
removed. If this iterator
-     * was not removed for some reason, then the performance of seeking will 
be O(N^2). In the case
-     * where its not removed, it would be better to just scan forward.
-     */
-    return !hasSeeked || removedDeletingIterator || removalFailures < 3;
-  }
-
   private void seek(Range range) throws IOException {
-    if (hasSeeked) {
-      // Making assumptions based on how Accumulo currently works. Currently 
Accumulo does not set
-      // up iterators until the 1st seek. Therefore can only remove the 
deleting iter after the 1st
-      // seek. Also, Accumulo may switch data sources and re-setup the 
deleting iterator, that's why
-      // this iterator keeps trying to remove it.
-      removedDeletingIterator |= removeDeletingIterator(source);
-      if (!removedDeletingIterator) {
-        removalFailures++;
-      }
-    }
     source.seek(range, fams, inclusive);
-    hasSeeked = true;
   }
 
   @Override
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
index cc63d6f3..9e9c84e1 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
@@ -19,11 +19,9 @@
 
   public static final String TABLE_BLOCKCACHE_ENABLED = 
"table.cache.block.enable";
   public static final String TABLE_CLASSPATH = "table.classpath.context";
+  public static final String TABLE_DELETE_BEHAVIOR = "table.delete.behavior";
+  public static final String TABLE_DELETE_BEHAVIOR_VALUE = "fail";
   public static final String TABLE_FORMATTER_CLASS = "table.formatter";
-  public static final String TABLE_GROUPS_ENABLED = "table.groups.enabled";
-  public static final String TABLE_GROUP_PREFIX = "table.group.";
-  public static final String TABLE_ITERATOR_PREFIX = "table.iterator.";
   public static final String TABLE_MAJC_RATIO = "table.compaction.major.ratio";
   public static final String VFS_CONTEXT_CLASSPATH_PROPERTY = 
"general.vfs.context.classpath.";
-
 }
diff --git a/modules/cluster/pom.xml b/modules/cluster/pom.xml
index 4fe810e8..197e412e 100644
--- a/modules/cluster/pom.xml
+++ b/modules/cluster/pom.xml
@@ -74,7 +74,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+      <artifactId>hadoop-client-api</artifactId>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
@@ -82,17 +82,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-      <exclusions>
-        <!-- Excluded so we use asm jar included by twill-yarn.  See FLUO-409 
-->
-        <exclusion>
-          <groupId>asm</groupId>
-          <artifactId>asm</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
     <dependency>
       <groupId>org.apache.twill</groupId>
       <artifactId>twill-api</artifactId>
diff --git 
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java 
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index 8621a907..78d301bb 100644
--- 
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ 
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -101,7 +101,7 @@ private long calculateSleep(long notifyCount, long 
numWorkers) {
   public long countNotifications(Environment env) {
     Scanner scanner = null;
     try {
-      scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+      scanner = env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
     } catch (TableNotFoundException e) {
       log.error("An exception was thrown -", e);
       throw new FluoException(e);
diff --git a/modules/command/pom.xml b/modules/command/pom.xml
index d34af393..84f3348c 100644
--- a/modules/command/pom.xml
+++ b/modules/command/pom.xml
@@ -58,6 +58,10 @@
       <groupId>org.apache.fluo</groupId>
       <artifactId>fluo-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git 
a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java 
b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index 9c0b191b..ce6d4599 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -42,7 +42,7 @@
   private static List<TableRange> getRanges(Environment env)
       throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
     List<TableRange> ranges =
-        
TableRange.fromTexts(env.getConnector().tableOperations().listSplits(env.getTable()));
+        
TableRange.fromTexts(env.getAccumuloClient().tableOperations().listSplits(env.getTable()));
     Collections.shuffle(ranges);
     return ranges;
   }
@@ -51,7 +51,7 @@ private static boolean hasNotifications(Environment env, 
TableRange range)
       throws TableNotFoundException {
     Scanner scanner = null;
     try {
-      scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+      scanner = env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
       scanner.setRange(range.getRange());
       Notification.configureScanner(scanner);
 
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 9bcb4bbf..7406acf0 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -33,10 +33,6 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-    <dependency>
-      <groupId>commons-configuration</groupId>
-      <artifactId>commons-configuration</artifactId>
-    </dependency>
     <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
@@ -71,7 +67,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+      <artifactId>hadoop-client-api</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.thrift</groupId>
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 40563665..95a8892c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -23,18 +23,18 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.iterators.IteratorUtil;
@@ -49,7 +49,6 @@
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.observer.ObserverUtil;
@@ -60,6 +59,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.slf4j.Logger;
@@ -117,7 +117,7 @@ public void initialize(InitializationOptions opts)
           "Fluo application already initialized at " + 
config.getAppZookeepers());
     }
 
-    Connector conn = AccumuloUtil.getConnector(config);
+    AccumuloClient conn = AccumuloUtil.getClient(config);
 
     boolean tableExists = 
conn.tableOperations().exists(config.getAccumuloTable());
     if (tableExists && !opts.getClearTable()) {
@@ -152,7 +152,6 @@ public void initialize(InitializationOptions opts)
 
     try {
       initializeApplicationInZooKeeper(conn);
-      Map<String, String> ntcProps = initializeApplicationTableProps();
 
       String accumuloJars;
       if (!config.getAccumuloJars().trim().isEmpty()) {
@@ -177,6 +176,8 @@ public void initialize(InitializationOptions opts)
         accumuloClasspath = tmpCP;
       }
 
+      Map<String, String> ntcProps = new HashMap<>();
+
       if (!accumuloClasspath.isEmpty()) {
         String contextName = "fluo-" + config.getApplicationName();
         conn.instanceOperations().setProperty(
@@ -190,8 +191,15 @@ public void initialize(InitializationOptions opts)
       }
 
       ntcProps.put(AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+      ntcProps.put(AccumuloProps.TABLE_DELETE_BEHAVIOR, 
AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE);
 
       NewTableConfiguration ntc = new 
NewTableConfiguration().withoutDefaultIterators();
+
+      
ntc.setLocalityGroups(Collections.singletonMap(ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
+          Collections.singleton(new 
Text(ColumnConstants.NOTIFY_CF.toArray()))));
+
+      configureIterators(ntc);
+
       ntc.setProperties(ntcProps);
       conn.tableOperations().create(config.getAccumuloTable(), ntc);
 
@@ -206,6 +214,20 @@ public void initialize(InitializationOptions opts)
     }
   }
 
+  private void configureIterators(NewTableConfiguration ntc) {
+    IteratorSetting gcIter =
+        new IteratorSetting(10, ColumnConstants.GC_CF.toString(), 
GarbageCollectionIterator.class);
+    GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
+    // the order relative to gc iter should not matter
+    IteratorSetting ntfyIter =
+        new IteratorSetting(11, ColumnConstants.NOTIFY_CF.toString(), 
NotificationIterator.class);
+
+    EnumSet<IteratorScope> scopes =
+        EnumSet.of(IteratorUtil.IteratorScope.majc, 
IteratorUtil.IteratorScope.minc);
+    ntc.attachIterator(gcIter, scopes);
+    ntc.attachIterator(ntfyIter, scopes);
+  }
+
   @Override
   public void remove() {
     if (applicationRunning()) {
@@ -224,7 +246,7 @@ public void remove() {
       throw new FluoException("Must stop the oracle server to remove an 
application");
     }
 
-    Connector conn = AccumuloUtil.getConnector(config);
+    AccumuloClient conn = AccumuloUtil.getClient(config);
 
     boolean tableExists = 
conn.tableOperations().exists(config.getAccumuloTable());
     // With preconditions met, it's now OK to delete table & zookeeper root 
(if they exist)
@@ -252,10 +274,10 @@ public void remove() {
     }
   }
 
-  private void initializeApplicationInZooKeeper(Connector conn) throws 
Exception {
+  private void initializeApplicationInZooKeeper(AccumuloClient client) throws 
Exception {
 
-    final String accumuloInstanceName = conn.getInstance().getInstanceName();
-    final String accumuloInstanceID = conn.getInstance().getInstanceID();
+    final String accumuloInstanceName = client.info().getInstanceName();
+    final String accumuloInstanceID = client.getInstanceID();
     final String fluoApplicationID = UUID.randomUUID().toString();
 
     // Create node specified by chroot suffix of Zookeeper connection string 
(if it doesn't exist)
@@ -285,49 +307,6 @@ private void initializeApplicationInZooKeeper(Connector 
conn) throws Exception {
         CuratorUtil.NodeExistsPolicy.FAIL);
   }
 
-  private String encodeColumnFamily(Bytes cf) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < cf.length(); i++) {
-      int c = 0xff & cf.byteAt(i);
-      if (c == '\\') {
-        sb.append("\\\\");
-      } else if (c >= 32 && c <= 126 && c != ',') {
-        sb.append((char) c);
-      } else {
-        sb.append("\\x").append(String.format("%02X", c));
-      }
-    }
-    return sb.toString();
-  }
-
-  private Map<String, String> initializeApplicationTableProps() {
-    Map<String, String> ntcProps = new HashMap<>();
-    ntcProps.put(AccumuloProps.TABLE_GROUP_PREFIX + 
ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
-        encodeColumnFamily(ColumnConstants.NOTIFY_CF));
-    ntcProps.put(AccumuloProps.TABLE_GROUPS_ENABLED, 
ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME);
-
-    IteratorSetting gcIter =
-        new IteratorSetting(10, ColumnConstants.GC_CF.toString(), 
GarbageCollectionIterator.class);
-    GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
-    // the order relative to gc iter should not matter
-    IteratorSetting ntfyIter =
-        new IteratorSetting(11, ColumnConstants.NOTIFY_CF.toString(), 
NotificationIterator.class);
-
-    for (IteratorSetting setting : new IteratorSetting[] {gcIter, ntfyIter}) {
-      for (IteratorScope scope : EnumSet.of(IteratorUtil.IteratorScope.majc,
-          IteratorUtil.IteratorScope.minc)) {
-        String root = String.format("%s%s.%s", 
AccumuloProps.TABLE_ITERATOR_PREFIX,
-            scope.name().toLowerCase(), setting.getName());
-        for (Entry<String, String> prop : setting.getOptions().entrySet()) {
-          ntcProps.put(root + ".opt." + prop.getKey(), prop.getValue());
-        }
-        ntcProps.put(root, setting.getPriority() + "," + 
setting.getIteratorClass());
-      }
-    }
-
-    return ntcProps;
-  }
-
   @Override
   public void updateSharedConfig() {
     if (!config.hasRequiredAdminProps()) {
@@ -566,7 +545,7 @@ public boolean accumuloTableExists() {
     if (!config.hasRequiredAdminProps()) {
       throw new IllegalArgumentException("Admin configuration is missing 
required properties");
     }
-    Connector conn = AccumuloUtil.getConnector(config);
-    return conn.tableOperations().exists(config.getAccumuloTable());
+    AccumuloClient client = AccumuloUtil.getClient(config);
+    return client.tableOperations().exists(config.getAccumuloTable());
   }
 }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
 
b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index c7117da1..87f9e3fa 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -176,7 +176,7 @@ private LoaderExecutorAsyncImpl(FluoConfiguration config, 
int numThreads, int qu
     this.semaphoreSize = queueSize == 0 ? 1 : queueSize;
     this.semaphore = new Semaphore(semaphoreSize);
     if (numThreads == 0) {
-      this.executor = MoreExecutors.sameThreadExecutor();
+      this.executor = MoreExecutors.newDirectExecutorService();
     } else {
       this.executor = FluoExecutors.newFixedThreadPool(numThreads, "loader");
     }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index f8aac725..766a4a92 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -18,12 +18,16 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
+import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.accumulo.core.client.Connector;
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.accumulo.util.AccumuloProps;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
@@ -45,7 +49,7 @@
   private Authorizations auths = new Authorizations();
   private String accumuloInstance;
   private RegisteredObservers observers;
-  private Connector conn;
+  private AccumuloClient client;
   private String accumuloInstanceID;
   private String fluoApplicationID;
   private FluoConfiguration config;
@@ -54,6 +58,26 @@
   private SimpleConfiguration appConfig;
   private String metricsReporterID;
 
+  private void ensureDeletesAreDisabled() {
+    String value = null;
+    Iterable<Entry<String, String>> props;
+    try {
+      props = client.tableOperations().getProperties(table);
+    } catch (AccumuloException | TableNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+
+    for (Entry<String, String> entry : props) {
+      if (entry.getKey().equals(AccumuloProps.TABLE_DELETE_BEHAVIOR)) {
+        value = entry.getValue();
+      }
+    }
+
+    
Preconditions.checkState(AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE.equals(value),
+        "The Accumulo table %s is not configured correctly.  Please set %s=%s 
for this table in Accumulo.",
+        table, AccumuloProps.TABLE_DELETE_BEHAVIOR, 
AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE);
+  }
+
   /**
    * Constructs an environment from given FluoConfiguration
    *
@@ -61,18 +85,20 @@
    */
   public Environment(FluoConfiguration configuration) {
     config = configuration;
-    conn = AccumuloUtil.getConnector(config);
+    client = AccumuloUtil.getClient(config);
 
     readZookeeperConfig();
 
-    if (!conn.getInstance().getInstanceName().equals(accumuloInstance)) {
+    ensureDeletesAreDisabled();
+
+    if (!client.info().getInstanceName().equals(accumuloInstance)) {
       throw new IllegalArgumentException("unexpected accumulo instance name "
-          + conn.getInstance().getInstanceName() + " != " + accumuloInstance);
+          + client.info().getInstanceName() + " != " + accumuloInstance);
     }
 
-    if (!conn.getInstance().getInstanceID().equals(accumuloInstanceID)) {
-      throw new IllegalArgumentException("unexpected accumulo instance id "
-          + conn.getInstance().getInstanceID() + " != " + accumuloInstanceID);
+    if (!client.getInstanceID().equals(accumuloInstanceID)) {
+      throw new IllegalArgumentException("unexpected accumulo instance id " + 
client.getInstanceID()
+          + " != " + accumuloInstanceID);
     }
 
     try {
@@ -93,7 +119,7 @@ public Environment(Environment env) throws Exception {
     this.auths = env.auths;
     this.accumuloInstance = env.accumuloInstance;
     this.observers = env.observers;
-    this.conn = env.conn;
+    this.client = env.client;
     this.accumuloInstanceID = env.accumuloInstanceID;
     this.fluoApplicationID = env.fluoApplicationID;
     this.config = env.config;
@@ -169,8 +195,8 @@ public String getTable() {
     return table;
   }
 
-  public Connector getConnector() {
-    return conn;
+  public AccumuloClient getAccumuloClient() {
+    return client;
   }
 
   public SharedResources getSharedResources() {
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index ea00c736..ea47bbfb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -319,7 +319,8 @@ private static boolean isPrimary(PrimaryRowColumn prc, Key 
k) {
 
     BatchScanner bscanner = null;
     try {
-      bscanner = env.getConnector().createBatchScanner(env.getTable(), 
env.getAuthorizations(), 1);
+      bscanner =
+          env.getAccumuloClient().createBatchScanner(env.getTable(), 
env.getAuthorizations(), 1);
 
       bscanner.setRanges(ranges);
       IteratorSetting iterCfg = new IteratorSetting(10, 
OpenReadLockIterator.class);
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
 
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 48250c02..f9f1e85a 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -95,7 +95,8 @@ private BatchScanner setupBatchScanner() {
     try {
       // TODO hardcoded number of threads!
       // one thread is probably good.. going for throughput
-      scanner = env.getConnector().createBatchScanner(env.getTable(), 
env.getAuthorizations(), 1);
+      scanner =
+          env.getAccumuloClient().createBatchScanner(env.getTable(), 
env.getAuthorizations(), 1);
     } catch (TableNotFoundException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java
index 10c79b81..308e8321 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java
@@ -68,18 +68,19 @@ public SharedResources(Environment env) throws 
TableNotFoundException {
     curator = CuratorUtil.newAppCurator(env.getConfiguration());
     curator.start();
 
-    int numTservers = 
env.getConnector().instanceOperations().getTabletServers().size();
+    int numTservers = 
env.getAccumuloClient().instanceOperations().getTabletServers().size();
     int numBWThreads = 
FluoConfigurationImpl.getNumBWThreads(env.getConfiguration(), numTservers);
-    bw = env.getConnector().createBatchWriter(env.getTable(),
+    bw = env.getAccumuloClient().createBatchWriter(env.getTable(),
         new BatchWriterConfig().setMaxWriteThreads(numBWThreads));
     sbw = new SharedBatchWriter(bw);
 
     int numCWThreads = 
FluoConfigurationImpl.getNumCWThreads(env.getConfiguration(), numTservers);
-    cw = env.getConnector().createConditionalWriter(env.getTable(), new 
ConditionalWriterConfig()
-        
.setAuthorizations(env.getAuthorizations()).setMaxWriteThreads(numCWThreads));
-    bulkCw =
-        env.getConnector().createConditionalWriter(env.getTable(), new 
ConditionalWriterConfig()
-            
.setAuthorizations(env.getAuthorizations()).setMaxWriteThreads(numCWThreads));
+    cw = env.getAccumuloClient().createConditionalWriter(env.getTable(),
+        new 
ConditionalWriterConfig().setAuthorizations(env.getAuthorizations())
+            .setMaxWriteThreads(numCWThreads));
+    bulkCw = env.getAccumuloClient().createConditionalWriter(env.getTable(),
+        new 
ConditionalWriterConfig().setAuthorizations(env.getAuthorizations())
+            .setMaxWriteThreads(numCWThreads));
 
     txInfoCache = new TxInfoCache(env);
     visCache = new VisibilityCache(env.getConfiguration());
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index a1db35b5..5db76025 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -113,7 +113,7 @@ static void setupScanner(ScannerBase scanner, 
Collection<Column> columns, long s
     private void setUpIterator() {
       Scanner scanner;
       try {
-        scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+        scanner = env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
       } catch (TableNotFoundException e) {
         throw new RuntimeException(e);
       }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index d7a541fc..ae87a24e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -679,7 +679,8 @@ private boolean checkForAckCollision(ConditionalMutation 
cm) {
           Scanner scanner;
           try {
             // TODO reuse or share scanner
-            scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+            scanner =
+                env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
           } catch (TableNotFoundException e) {
             // TODO proper exception handling
             throw new RuntimeException(e);
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java 
b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
index 1ec15916..3257bd5f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -15,13 +15,11 @@
 
 package org.apache.fluo.core.util;
 
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.fluo.api.config.FluoConfiguration;
 
 /**
@@ -29,25 +27,23 @@
  */
 public class AccumuloUtil {
 
-  /**
-   * Creates Accumulo instance given FluoConfiguration
-   */
-  public static Instance getInstance(FluoConfiguration config) {
-    ClientConfiguration clientConfig = new ClientConfiguration()
-        
.withInstance(config.getAccumuloInstance()).withZkHosts(config.getAccumuloZookeepers())
-        .withZkTimeout(config.getZookeeperTimeout() / 1000);
-    return new ZooKeeperInstance(clientConfig);
-  }
 
   /**
    * Creates Accumulo connector given FluoConfiguration
    */
-  public static Connector getConnector(FluoConfiguration config) {
+  public static AccumuloClient getClient(FluoConfiguration config) {
     try {
-      return getInstance(config).getConnector(config.getAccumuloUser(),
-          new PasswordToken(config.getAccumuloPassword()));
+      return Accumulo.newClient()
+          .forInstance(config.getAccumuloInstance(), 
config.getAccumuloZookeepers())
+          .usingPassword(config.getAccumuloUser(), 
config.getAccumuloPassword()).build();
     } catch (AccumuloException | AccumuloSecurityException e) {
       throw new IllegalStateException(e);
     }
   }
+
+  public static ClientInfo getClientInfo(FluoConfiguration config) {
+    return Accumulo.newClient()
+        .forInstance(config.getAccumuloInstance(), 
config.getAccumuloZookeepers())
+        .usingPassword(config.getAccumuloUser(), 
config.getAccumuloPassword()).info();
+  }
 }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java 
b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index 7d488ca5..bbf1d83d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -79,7 +79,7 @@ public static void commitColumn(Environment env, boolean 
isTrigger, boolean isPr
     Scanner scanner;
     try {
       // TODO reuse or share scanner
-      scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+      scanner = env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
     } catch (TableNotFoundException e) {
       // TODO proper exception handling
       throw new RuntimeException(e);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java 
b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index aa12c162..2c66cbf0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -32,7 +32,7 @@
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonIOException;
-import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.security.Authorizations;
@@ -157,14 +157,14 @@ public static void scanFluo(ScanOpts options, 
FluoConfiguration sConfig, PrintSt
   public static void scanNotifications(ScanOpts options, FluoConfiguration 
sConfig, PrintStream out)
       throws IOException {
 
-    Connector conn = AccumuloUtil.getConnector(sConfig);
+    AccumuloClient client = AccumuloUtil.getClient(sConfig);
 
     Span span = getSpan(options);
     Collection<Column> columns = getColumns(options);
 
     Scanner scanner = null;
     try {
-      scanner = conn.createScanner(sConfig.getAccumuloTable(), 
Authorizations.EMPTY);
+      scanner = client.createScanner(sConfig.getAccumuloTable(), 
Authorizations.EMPTY);
 
       scanner.setRange(SpanUtil.toRange(span));
 
@@ -210,13 +210,13 @@ private static void generateJson(CellScanner cellScanner, 
Function<Bytes, String
 
   public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig, 
PrintStream out) {
 
-    Connector conn = AccumuloUtil.getConnector(sConfig);
+    AccumuloClient client = AccumuloUtil.getClient(sConfig);
 
     Span span = getSpan(options);
     Collection<Column> columns = getColumns(options);
 
     try {
-      Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), 
Authorizations.EMPTY);
+      Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), 
Authorizations.EMPTY);
       scanner.setRange(SpanUtil.toRange(span));
       for (Column col : columns) {
         if (col.isQualifierSet()) {
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index 9f04ad6c..acea7108 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -179,7 +179,8 @@ private boolean isInterruptedException(Exception e) {
 
   private ScanCounts scan(Session session, PartitionInfo pi, Range range)
       throws TableNotFoundException {
-    Scanner scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+    Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
 
     scanner.setRange(range);
 
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
index fc44a64c..3ce5f757 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
@@ -108,7 +108,7 @@ public static void deserialize(Consumer<Bytes> 
splitConsumer, byte[] serializedS
   public static byte[] serializeTableSplits(Environment env) {
     List<Bytes> splits;
     try {
-      splits = 
env.getConnector().tableOperations().listSplits(env.getTable()).stream()
+      splits = 
env.getAccumuloClient().tableOperations().listSplits(env.getTable()).stream()
           .map(ByteUtil::toBytes).collect(Collectors.toList());
     } catch (TableNotFoundException | AccumuloSecurityException | 
AccumuloException e) {
       throw new RuntimeException(e);
diff --git a/modules/distribution/src/main/lib/ahz/pom.xml 
b/modules/distribution/src/main/lib/ahz/pom.xml
index ef8a6bd9..f2d609b0 100644
--- a/modules/distribution/src/main/lib/ahz/pom.xml
+++ b/modules/distribution/src/main/lib/ahz/pom.xml
@@ -33,7 +33,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+      <artifactId>hadoop-client-api</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
     <dependency>
diff --git a/modules/integration-tests/pom.xml 
b/modules/integration-tests/pom.xml
index 73714717..8a0adb94 100644
--- a/modules/integration-tests/pom.xml
+++ b/modules/integration-tests/pom.xml
@@ -45,6 +45,10 @@
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-minicluster</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-framework</artifactId>
@@ -67,7 +71,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+      <artifactId>hadoop-client-api</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
index d5af1c09..92436f41 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
@@ -19,12 +19,11 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.ClientInfo;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
-import org.apache.accumulo.minicluster.MiniAccumuloInstance;
 import org.apache.commons.io.FileUtils;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
@@ -48,8 +47,8 @@
       FluoConfiguration.FLUO_PREFIX + ".it.instance.clear";
 
   protected static String instanceName;
-  protected static Connector conn;
-  protected static Instance miniAccumulo;
+  protected static AccumuloClient aClient;
+  protected static ClientInfo clientInfo;
   private static MiniAccumuloCluster cluster;
   private static boolean startedCluster = false;
 
@@ -88,8 +87,8 @@ public static void setUpAccumulo() throws Exception {
       cluster.start();
       startedCluster = true;
     }
-    miniAccumulo = new MiniAccumuloInstance(instanceName, instanceDir);
-    conn = miniAccumulo.getConnector(USER, new PasswordToken(PASSWORD));
+    clientInfo = MiniAccumuloCluster.getClientInfo(instanceDir);
+    aClient = Accumulo.newClient().usingClientInfo(clientInfo).build();
   }
 
   protected Class<? extends ObserverProvider> getObserverProviderClass() {
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
index 481e6e89..43bc869c 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -68,12 +68,12 @@ public void setUpFluo() throws Exception {
 
     config = new FluoConfiguration();
     config.setApplicationName("impl-test" + testCounter.getAndIncrement());
-    config.setAccumuloInstance(miniAccumulo.getInstanceName());
+    config.setAccumuloInstance(clientInfo.getInstanceName());
     config.setAccumuloUser(USER);
     config.setAccumuloPassword(PASSWORD);
     config.setAccumuloTable(table);
-    config.setAccumuloZookeepers(miniAccumulo.getZooKeepers());
-    config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
+    config.setAccumuloZookeepers(clientInfo.getZooKeepers());
+    config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
     config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
     setupObservers(config);
     config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
@@ -107,6 +107,6 @@ public void tearDownFluo() throws Exception {
       oserver.stop();
     }
     env.close();
-    conn.tableOperations().delete(table);
+    aClient.tableOperations().delete(table);
   }
 }
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
index bcef827c..d31bf9f9 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -42,11 +42,11 @@ public void setUpFluo() throws Exception {
 
     config = new FluoConfiguration();
     config.setApplicationName("mini-test" + testCounter.getAndIncrement());
-    config.setAccumuloInstance(miniAccumulo.getInstanceName());
+    config.setAccumuloInstance(clientInfo.getInstanceName());
     config.setAccumuloUser(USER);
     config.setAccumuloPassword(PASSWORD);
-    config.setAccumuloZookeepers(miniAccumulo.getZooKeepers());
-    config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
+    config.setAccumuloZookeepers(clientInfo.getZooKeepers());
+    config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
     config.setAccumuloTable(getNextTableName());
     config.setWorkerThreads(5);
     setupObservers(config);
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
index 02ed35d6..ebb02d17 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
@@ -57,7 +57,7 @@
   public static long getNotificationTS(Environment env, String row, Column 
col) {
     Scanner scanner;
     try {
-      scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+      scanner = env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
     } catch (TableNotFoundException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
index aa8b2ef5..484f7c3f 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -32,14 +32,12 @@
   private TimestampSkippingIterator source;
   private boolean hasTop;
   private int goodVal;
-  private int goodSeek;
 
   @Override
   public void init(SortedKeyValueIterator<Key, Value> source, Map<String, 
String> options,
       IteratorEnvironment env) throws IOException {
 
     this.source = new TimestampSkippingIterator(source);
-
   }
 
   @Override
@@ -62,15 +60,6 @@ public void seek(Range range, Collection<ByteSequence> 
columnFamilies, boolean i
 
     long ts = 99999;
     goodVal = 0;
-    hasTop = true;
-
-    /*
-     * If the TimestampSkippingIterator is not able to remove the 
DeletingIterator, then the
-     * following loop will have O(N^2) performance. This happens because every 
time the following
-     * loop seeks forward a little bit, the DeletingIterator scans from the 
start of the column
-     * looking for deletes. I manually commented out the code that removes the 
DeletingIterator and
-     * the following code took 50 to 100 times longer to run.
-     */
 
     while (source.hasTop() && ts > 0) {
       source.skipToTimestamp(k, ts);
@@ -78,14 +67,12 @@ public void seek(Range range, Collection<ByteSequence> 
columnFamilies, boolean i
         if (source.getTopValue().toString().equals("v" + ts)) {
           goodVal++;
         }
-
-        if (source.shouldSeek()) {
-          goodSeek++;
-        }
       }
 
       ts -= 100;
     }
+
+    hasTop = goodVal > 0;
   }
 
   @Override
@@ -95,7 +82,7 @@ public Key getTopKey() {
 
   @Override
   public Value getTopValue() {
-    return new Value(("" + goodVal + " " + goodSeek).getBytes());
+    return new Value("" + goodVal);
   }
 
   @Override
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
index 9aeca79f..387eed1f 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -15,6 +15,8 @@
 
 package org.apache.fluo.integration.accumulo;
 
+import java.util.Collections;
+
 import com.google.common.collect.Iterables;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -23,6 +25,7 @@
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.accumulo.util.AccumuloProps;
 import org.apache.fluo.integration.ITBase;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -40,9 +43,12 @@
   @Test
   public void testTimestampSkippingIterPerformance() throws Exception {
 
-    conn.tableOperations().create("ttsi", new 
NewTableConfiguration().withoutDefaultIterators());
+    aClient.tableOperations().create("ttsi",
+        new NewTableConfiguration().withoutDefaultIterators()
+            
.setProperties(Collections.singletonMap(AccumuloProps.TABLE_DELETE_BEHAVIOR,
+                AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE)));
 
-    BatchWriter bw = conn.createBatchWriter("ttsi", new BatchWriterConfig());
+    BatchWriter bw = aClient.createBatchWriter("ttsi", new 
BatchWriterConfig());
     Mutation m = new Mutation("r1");
     for (int i = 0; i < 100000; i++) {
       m.put("f1", "q1", i, "v" + i);
@@ -53,24 +59,24 @@ public void testTimestampSkippingIterPerformance() throws 
Exception {
 
     long t2 = System.currentTimeMillis();
 
-    Scanner scanner = conn.createScanner("ttsi", Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner("ttsi", Authorizations.EMPTY);
     scanner.addScanIterator(new IteratorSetting(10, 
Skip100StampsIterator.class));
 
-    Assert.assertEquals("999 1000", 
Iterables.getOnlyElement(scanner).getValue().toString());
+    Assert.assertEquals("999", 
Iterables.getOnlyElement(scanner).getValue().toString());
     long t3 = System.currentTimeMillis();
 
     if (t3 - t2 > 3000) {
-      log.warn("Timestamp skipping iterator took longer than expected " + (t3 
- t2));
+      log.error("Timestamp skipping iterator took longer than expected " + (t3 
- t2));
     }
 
-    conn.tableOperations().flush("ttsi", null, null, true);
+    aClient.tableOperations().flush("ttsi", null, null, true);
 
     long t4 = System.currentTimeMillis();
-    Assert.assertEquals("999 1000", 
Iterables.getOnlyElement(scanner).getValue().toString());
+    Assert.assertEquals("999", 
Iterables.getOnlyElement(scanner).getValue().toString());
     long t5 = System.currentTimeMillis();
 
     if (t5 - t4 > 3000) {
-      log.warn("Timestamp skipping iterator took longer than expected " + (t5 
- t4));
+      log.error("Timestamp skipping iterator took longer than expected " + (t5 
- t4));
     }
   }
 }
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 63d64721..a9222b0f 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -20,10 +20,7 @@
 import java.util.Set;
 
 import com.google.common.collect.Iterables;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
@@ -42,6 +39,7 @@
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.oracle.OracleServer;
+import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.CuratorUtil;
 import org.apache.fluo.integration.ITBaseImpl;
 import org.apache.hadoop.io.Text;
@@ -96,7 +94,7 @@ public void testInitializeTwiceFails() throws Exception {
       }
     }
 
-    assertTrue(conn.tableOperations().exists(config.getAccumuloTable()));
+    assertTrue(aClient.tableOperations().exists(config.getAccumuloTable()));
   }
 
   @Test
@@ -116,12 +114,9 @@ public void testInitializeConfig() throws Exception {
       admin.initialize(opts);
 
       // verify locality groups were set on the table
-      Instance inst =
-          new ZooKeeperInstance(config.getAccumuloInstance(), 
config.getAccumuloZookeepers());
-      Connector conn = inst.getConnector(config.getAccumuloUser(),
-          new PasswordToken(config.getAccumuloPassword()));
+      AccumuloClient client = AccumuloUtil.getClient(config);
       Map<String, Set<Text>> localityGroups =
-          conn.tableOperations().getLocalityGroups(config.getAccumuloTable());
+          
client.tableOperations().getLocalityGroups(config.getAccumuloTable());
       Assert.assertEquals("Unexpected locality group count.", 1, 
localityGroups.size());
       Entry<String, Set<Text>> localityGroup = 
localityGroups.entrySet().iterator().next();
       Assert.assertEquals("'notify' locality group not found.",
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/CollisionIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/CollisionIT.java
index 951f6a22..e64a669f 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -154,9 +154,9 @@ public void testLotsOfCollisions() throws Exception {
       oldestTS = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
     }
 
-    conn.tableOperations().compact(getCurTableName(), null, null, true, true);
+    aClient.tableOperations().compact(getCurTableName(), null, null, true, 
true);
 
-    Scanner scanner = conn.createScanner(getCurTableName(), 
Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(getCurTableName(), 
Authorizations.EMPTY);
 
     HashSet<String> rowCols = new HashSet<>();
 
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
index 05b5ebb0..546cf35c 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -407,7 +407,7 @@ public void testAcks() throws Exception {
     Assert.assertNull(tx3.gets("idx:def", docUrl));
     Assert.assertEquals("3", tx3.gets("url0000", lastUpdate));
 
-    Scanner scanner = env.getConnector().createScanner(env.getTable(), 
Authorizations.EMPTY);
+    Scanner scanner = env.getAccumuloClient().createScanner(env.getTable(), 
Authorizations.EMPTY);
     Notification.configureScanner(scanner);
     Iterator<Entry<Key, Value>> iter = scanner.iterator();
     Assert.assertTrue(iter.hasNext());
@@ -469,7 +469,7 @@ public void testStaleScanPrevention() throws Exception {
     BankUtil.transfer(env, "bob", "joe", 2);
     BankUtil.transfer(env, "jill", "joe", 2);
 
-    conn.tableOperations().flush(table, null, null, true);
+    aClient.tableOperations().flush(table, null, null, true);
 
     Assert.assertEquals("20", tx2.gets("joe", BALANCE));
 
@@ -523,7 +523,7 @@ public void testForcedStaleScan() throws Exception {
     }
 
     // GC iterator will clear data that tx2 wants to scan
-    conn.tableOperations().flush(table, null, null, true);
+    aClient.tableOperations().flush(table, null, null, true);
 
     // this data should have been GCed, but the problem is not detected here
     Assert.assertNull(tx2.gets("joe", BALANCE));
@@ -635,7 +635,7 @@ public void testRollbackSelf() throws Exception {
   private boolean wasRolledBackPrimary(long startTs, String rolledBackRow)
       throws TableNotFoundException {
     boolean sawExpected = false;
-    Scanner scanner = conn.createScanner(getCurTableName(), 
Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(getCurTableName(), 
Authorizations.EMPTY);
 
     for (Entry<Key, Value> entry : scanner) {
       long colType = entry.getKey().getTimestamp() & 
ColumnConstants.PREFIX_MASK;
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FaultyConfig.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FaultyConfig.java
index 62c48dd1..c5892b67 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FaultyConfig.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FaultyConfig.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -20,8 +20,8 @@
 import java.util.Iterator;
 import java.util.Random;
 
+import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.fluo.core.impl.Environment;
@@ -101,8 +101,8 @@ public void close() {
   }
 
   @Override
-  public Connector getConnector() {
-    return super.getConnector();
+  public AccumuloClient getAccumuloClient() {
+    return super.getAccumuloClient();
   }
 
   public ConditionalWriter createConditionalWriter() throws 
TableNotFoundException {
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FluoIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FluoIT.java
index 696497e2..87106007 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FluoIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -397,7 +397,7 @@ public void testWriteObserved() throws Exception {
   @Test
   public void testVisibility() throws Exception {
 
-    conn.securityOperations().changeUserAuthorizations(USER, new 
Authorizations("A", "B", "C"));
+    aClient.securityOperations().changeUserAuthorizations(USER, new 
Authorizations("A", "B", "C"));
 
     env.setAuthorizations(new Authorizations("A", "B", "C"));
 
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index c250de1c..b99b16d6 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -80,7 +80,7 @@ public void testVerifyAfterGC() throws Exception {
     Assert.assertEquals(tx2.getStartTs(), oldestTs);
 
     // Force a garbage collection
-    conn.tableOperations().flush(table, null, null, true);
+    aClient.tableOperations().flush(table, null, null, true);
 
     verify(oldestTs);
 
@@ -112,7 +112,7 @@ public void testDeletedDataIsDropped() throws Exception {
     waitForGcTime(tx2.getStartTimestamp());
 
     // Force a garbage collection
-    conn.tableOperations().compact(table, null, null, true, true);
+    aClient.tableOperations().compact(table, null, null, true, true);
 
     Assert.assertEquals("file:///abc.txt", tx2.gets("001", docUri));
 
@@ -121,11 +121,11 @@ public void testDeletedDataIsDropped() throws Exception {
     Assert.assertNull(tx4.gets("001", docUri));
 
     waitForGcTime(tx4.getStartTimestamp());
-    conn.tableOperations().compact(table, null, null, true, true);
+    aClient.tableOperations().compact(table, null, null, true, true);
 
     Assert.assertNull(tx4.gets("001", docUri));
 
-    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(table, Authorizations.EMPTY);
     Assert.assertEquals(0, Iterables.size(scanner));
 
     tx4.done();
@@ -163,14 +163,14 @@ public void testRolledBackDataIsDropped() throws 
Exception {
     Assert.assertEquals(20, countInTable("-DATA"));
 
     // flush should drop locks and data
-    conn.tableOperations().flush(table, null, null, true);
+    aClient.tableOperations().flush(table, null, null, true);
 
     Assert.assertEquals(0, countInTable("-LOCK"));
     Assert.assertEquals(20, countInTable("-DEL_LOCK"));
     Assert.assertEquals(0, countInTable("-DATA"));
 
     // compact should drop all del locks except for primary
-    conn.tableOperations().compact(table, null, null, true, true);
+    aClient.tableOperations().compact(table, null, null, true, true);
 
     Assert.assertEquals(0, countInTable("-LOCK"));
     Assert.assertEquals(1, countInTable("-DEL_LOCK"));
@@ -225,7 +225,7 @@ public void testReadLocks() throws Exception {
     tx2.done();
 
     // all read locks should be garbage collected because of the writes after 
the read locks
-    conn.tableOperations().compact(table, null, null, true, true);
+    aClient.tableOperations().compact(table, null, null, true, true);
 
     Assert.assertEquals(0, countInTable("-DEL_RLOCK"));
     Assert.assertEquals(0, countInTable("-RLOCK"));
@@ -251,7 +251,7 @@ public void testReadLocks() throws Exception {
     tx3.done();
 
     waitForGcTime(tx3.getStartTimestamp());
-    conn.tableOperations().compact(table, null, null, true, true);
+    aClient.tableOperations().compact(table, null, null, true, true);
 
 
     // all read locks older than GC time should be dropped
@@ -261,7 +261,7 @@ public void testReadLocks() throws Exception {
 
   private int countInTable(String str) throws TableNotFoundException {
     int count = 0;
-    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(table, Authorizations.EMPTY);
     for (String e : Iterables.transform(scanner, FluoFormatter::toString)) {
       if (e.contains(str)) {
         count++;
@@ -294,7 +294,7 @@ public void testGetOldestTimestamp() throws Exception {
    *
    */
   private void verify(long oldestTs) throws TableNotFoundException {
-    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(table, Authorizations.EMPTY);
 
     Iterator<Entry<Key, Value>> iter = scanner.iterator();
 
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/NotificationGcIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/NotificationGcIT.java
index b308197e..c8654fed 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/NotificationGcIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/NotificationGcIT.java
@@ -43,7 +43,8 @@
   private static final Logger log = 
LoggerFactory.getLogger(NotificationGcIT.class);
 
   private static void assertRawNotifications(int expected, Environment env) 
throws Exception {
-    Scanner scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+    Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
     scanner.fetchColumnFamily(ByteUtil.toText(ColumnConstants.NOTIFY_CF));
     int size = Iterables.size(scanner);
     if (size != expected) {
@@ -55,7 +56,8 @@ private static void assertRawNotifications(int expected, 
Environment env) throws
   }
 
   private static int countNotifications(Environment env) throws Exception {
-    Scanner scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+    Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
     Notification.configureScanner(scanner);
     return Iterables.size(scanner);
   }
@@ -100,7 +102,7 @@ public void testNotificationGC() throws Exception {
     assertRawNotifications(4, env);
     Assert.assertEquals(0, countNotifications(env));
 
-    env.getConnector().tableOperations().flush(env.getTable(), null, null, 
true);
+    env.getAccumuloClient().tableOperations().flush(env.getTable(), null, 
null, true);
 
     assertRawNotifications(0, env);
     Assert.assertEquals(0, countNotifications(env));
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
index ba93b6da..7a8b8d29 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockFailureIT.java
@@ -348,7 +348,7 @@ public void testWriteWoRead4() throws Exception {
 
   private int countInTable(String str) throws TableNotFoundException {
     int count = 0;
-    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(table, Authorizations.EMPTY);
     for (String e : Iterables.transform(scanner, FluoFormatter::toString)) {
       if (e.contains(str)) {
         count++;
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockIT.java
index d93d3985..d6dc7175 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/ReadLockIT.java
@@ -199,7 +199,7 @@ public void testWriteCausesReadLockToFail() throws 
Exception {
   }
 
   private void dumpRow(String row, Consumer<String> out) throws 
TableNotFoundException {
-    Scanner scanner = conn.createScanner(getCurTableName(), 
Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(getCurTableName(), 
Authorizations.EMPTY);
     scanner.setRange(Range.exact(row));
     for (Entry<Key, Value> entry : scanner) {
       out.accept(FluoFormatter.toString(entry));
@@ -207,7 +207,7 @@ private void dumpRow(String row, Consumer<String> out) 
throws TableNotFoundExcep
   }
 
   private void dumpTable(Consumer<String> out) throws TableNotFoundException {
-    Scanner scanner = conn.createScanner(getCurTableName(), 
Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(getCurTableName(), 
Authorizations.EMPTY);
     for (Entry<Key, Value> entry : scanner) {
       out.accept(FluoFormatter.toString(entry));
     }
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/StochasticBankIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/StochasticBankIT.java
index b0d6bd13..192ca014 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/StochasticBankIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/StochasticBankIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -31,6 +31,7 @@
 
 import com.google.common.collect.Iterables;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.accumulo.util.AccumuloProps;
 import org.apache.fluo.api.data.Column;
@@ -61,8 +62,8 @@
   @Test
   public void testConcurrency() throws Exception {
 
-    conn.tableOperations().setProperty(table, AccumuloProps.TABLE_MAJC_RATIO, 
"1");
-    conn.tableOperations().setProperty(table, 
AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+    aClient.tableOperations().setProperty(table, 
AccumuloProps.TABLE_MAJC_RATIO, "1");
+    aClient.tableOperations().setProperty(table, 
AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
 
     int numAccounts = 5000;
 
@@ -71,7 +72,7 @@ public void testConcurrency() throws Exception {
     splits.add(new Text(fmtAcct(numAccounts / 2)));
     splits.add(new Text(fmtAcct(3 * numAccounts / 4)));
 
-    conn.tableOperations().addSplits(table, splits);
+    aClient.tableOperations().addSplits(table, splits);
 
     AtomicBoolean runFlag = new AtomicBoolean(true);
 
@@ -188,23 +189,24 @@ private static void runVerifier(Environment env, int 
numAccounts, int num) {
       for (int i = 0; i < num; i++) {
 
         if (i == num / 2) {
-          env.getConnector().tableOperations().compact(env.getTable(), null, 
null, true, false);
+          env.getAccumuloClient().tableOperations().compact(env.getTable(), 
null, null, true,
+              false);
         }
 
         long t1 = System.currentTimeMillis();
 
         TestTransaction tx = new TestTransaction(env);
-        org.apache.accumulo.core.util.Stat stat = new 
org.apache.accumulo.core.util.Stat();
+        SummaryStatistics stat = new SummaryStatistics();
 
         for (RowColumnValue rcv : tx.scanner().build()) {
           int amt = Integer.parseInt(rcv.getValue().toString());
-          stat.addStat(amt);
+          stat.addValue(amt);
         }
 
         long t2 = System.currentTimeMillis();
 
         log.debug("avg : %,9.2f  min : %,6d  max : %,6d  stddev : %1.2f  rate 
: %,6.2f\n",
-            stat.getAverage(), stat.getMin(), stat.getMax(), stat.getStdDev(),
+            stat.getMean(), stat.getMin(), stat.getMax(), 
stat.getStandardDeviation(),
             numAccounts / ((t2 - t1) / 1000.0));
 
         if (stat.getSum() != numAccounts * 1000) {
@@ -213,7 +215,7 @@ private static void runVerifier(Environment env, int 
numAccounts, int num) {
           }
         }
 
-        Assert.assertEquals(numAccounts * 1000, stat.getSum());
+        Assert.assertEquals(numAccounts * 1000, (int) stat.getSum());
 
         lastTx = tx;
       }
@@ -253,7 +255,8 @@ private static void printDiffs(Environment env, 
TestTransaction lastTx, TestTran
     File tmpFile = File.createTempFile("sb_dump", ".txt");
     Writer fw = new BufferedWriter(new FileWriter(tmpFile));
 
-    Scanner scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+    Scanner scanner =
+        env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
 
     for (String cell : Iterables.transform(scanner, FluoFormatter::toString)) {
       fw.append(cell);
diff --git 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 1b5cf1a0..d41323be 100644
--- 
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ 
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -201,7 +201,7 @@ private int countNotifications() throws Exception {
     // deletes of notifications are queued async at end of transaction
     env.getSharedResources().getBatchWriter().waitForAsyncFlush();
 
-    Scanner scanner = conn.createScanner(getCurTableName(), 
Authorizations.EMPTY);
+    Scanner scanner = aClient.createScanner(getCurTableName(), 
Authorizations.EMPTY);
     Notification.configureScanner(scanner);
 
     int count = 0;
diff --git a/modules/mapreduce/pom.xml b/modules/mapreduce/pom.xml
index 034fa4f6..a6d95d73 100644
--- a/modules/mapreduce/pom.xml
+++ b/modules/mapreduce/pom.xml
@@ -26,6 +26,10 @@
   <description>This module provides utility code for MapReduce jobs that read 
from or write to a
     Apache Fluo table.</description>
   <dependencies>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-client-mapreduce</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
@@ -44,7 +48,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
+      <artifactId>hadoop-client-api</artifactId>
     </dependency>
     <dependency>
       <groupId>junit</groupId>
diff --git 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index 56418ce0..e09b86f6 100644
--- 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++ 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -25,7 +25,6 @@
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
@@ -35,6 +34,7 @@
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl;
+import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.SpanUtil;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -141,7 +141,6 @@ public boolean nextKeyValue() throws IOException, 
InterruptedException {
    * @param conf Job configuration
    * @param config use {@link FluoConfiguration} to configure programmatically
    */
-  @SuppressWarnings("deprecation")
   public static void configure(Job conf, SimpleConfiguration config) {
     try {
       FluoConfiguration fconfig = new FluoConfiguration(config);
@@ -155,10 +154,7 @@ public static void configure(Job conf, SimpleConfiguration 
config) {
         conf.getConfiguration().set(PROPS_CONF_KEY,
             new String(baos.toByteArray(), StandardCharsets.UTF_8));
 
-        AccumuloInputFormat.setZooKeeperInstance(conf, 
fconfig.getAccumuloInstance(),
-            fconfig.getAccumuloZookeepers());
-        AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
-            new PasswordToken(fconfig.getAccumuloPassword()));
+        AccumuloInputFormat.setClientInfo(conf, 
AccumuloUtil.getClientInfo(fconfig));
         AccumuloInputFormat.setInputTableName(conf, env.getTable());
         AccumuloInputFormat.setScanAuthorizations(conf, 
env.getAuthorizations());
       }
diff --git 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
index 37d7c588..8993f38c 100644
--- 
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
+++ 
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -25,7 +25,6 @@
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
@@ -35,6 +34,7 @@
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl;
+import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.SpanUtil;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -142,7 +142,6 @@ public boolean nextKeyValue() throws IOException, 
InterruptedException {
    * @param config use {@link org.apache.fluo.api.config.FluoConfiguration} to 
configure
    *        programmatically
    */
-  @SuppressWarnings("deprecation")
   public static void configure(Job conf, SimpleConfiguration config) {
     try {
       FluoConfiguration fconfig = new FluoConfiguration(config);
@@ -156,10 +155,7 @@ public static void configure(Job conf, SimpleConfiguration 
config) {
         conf.getConfiguration().set(PROPS_CONF_KEY,
             new String(baos.toByteArray(), StandardCharsets.UTF_8));
 
-        AccumuloInputFormat.setZooKeeperInstance(conf, 
fconfig.getAccumuloInstance(),
-            fconfig.getAccumuloZookeepers());
-        AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
-            new PasswordToken(fconfig.getAccumuloPassword()));
+        AccumuloInputFormat.setClientInfo(conf, 
AccumuloUtil.getClientInfo(fconfig));
         AccumuloInputFormat.setInputTableName(conf, env.getTable());
         AccumuloInputFormat.setScanAuthorizations(conf, 
env.getAuthorizations());
       }
diff --git 
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
 
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
index 05b163e4..c103567d 100644
--- 
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
+++ 
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
@@ -96,7 +96,7 @@ public void testImportFile() throws Exception {
     Assert.assertTrue(job.waitForCompletion(false));
 
     // bulk import rfiles
-    conn.tableOperations().importDirectory(table, outDir.toString(), 
failDir.toString(), false);
+    aClient.tableOperations().importDirectory(table, outDir.toString(), 
failDir.toString(), false);
 
     // read and update data using transactions
     TestTransaction tx1 = new TestTransaction(env);
diff --git 
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
 
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
index 7f06bf71..79a223d5 100644
--- 
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
+++ 
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
@@ -32,7 +32,7 @@ public void testBatchWrite() throws Exception {
     // test initializing a Fluo table by batch writing to it
 
     // use a batch writer to test this because its easier than using 
AccumuloOutputFormat
-    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    BatchWriter bw = aClient.createBatchWriter(table, new BatchWriterConfig());
     try {
 
       FluoMutationGenerator mb1 = new FluoMutationGenerator(Bytes.of("row1"));
diff --git a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java 
b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
index 96513b84..dc490fac 100644
--- a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
+++ b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
@@ -167,7 +167,8 @@ public void close() {
   @Override
   public void waitForObservers() {
     try {
-      Scanner scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
+      Scanner scanner =
+          env.getAccumuloClient().createScanner(env.getTable(), 
env.getAuthorizations());
       Notification.configureScanner(scanner);
 
       while (true) {
diff --git a/pom.xml b/pom.xml
index f4073b78..a230ecb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,11 +54,11 @@
     <url>https://github.com/apache/fluo/issues</url>
   </issueManagement>
   <properties>
-    <accumulo.version>1.7.3</accumulo.version>
+    <accumulo.version>2.0.0-alpha-1</accumulo.version>
     <curator.version>2.12.0</curator.version>
     <dropwizard.version>0.8.1</dropwizard.version>
     <findbugs.maxRank>11</findbugs.maxRank>
-    <hadoop.version>2.6.3</hadoop.version>
+    <hadoop.version>3.0.2</hadoop.version>
     <logback.version>1.1.3</logback.version>
     <releaseProfiles>fluo-release</releaseProfiles>
     <slf4j.version>1.7.12</slf4j.version>
@@ -88,10 +88,9 @@
         <version>2.8.0</version>
       </dependency>
       <dependency>
-        <!-- Guava 13.0.1 is required by Twill (due to beta method usage) -->
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
-        <version>13.0.1</version>
+        <version>26.0-jre</version>
       </dependency>
       <dependency>
         <groupId>commons-collections</groupId>
@@ -123,6 +122,11 @@
         <artifactId>junit</artifactId>
         <version>4.12</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-client-mapreduce</artifactId>
+        <version>${accumulo.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.accumulo</groupId>
         <artifactId>accumulo-core</artifactId>
@@ -138,6 +142,11 @@
         <artifactId>accumulo-test</artifactId>
         <version>${accumulo.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-math3</artifactId>
+        <version>3.6.1</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-client</artifactId>
@@ -195,17 +204,12 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-client</artifactId>
-        <version>${hadoop.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-minicluster</artifactId>
+        <artifactId>hadoop-client-api</artifactId>
         <version>${hadoop.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-yarn-client</artifactId>
+        <artifactId>hadoop-client-runtime</artifactId>
         <version>${hadoop.version}</version>
       </dependency>
       <dependency>
@@ -250,17 +254,53 @@
       </dependency>
     </dependencies>
   </dependencyManagement>
+  <pluginRepositories>
+    <!--
+    needed for accumulo-maven-plugin SNAPSHOT version
+    see PR apache/maven-apache-parent#1
+    -->
+    <pluginRepository>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <id>apache.snapshots</id>
+      <name>Apache Snapshot Repository</name>
+      <url>https://repository.apache.org/snapshots</url>
+    </pluginRepository>
+  </pluginRepositories>
   <build>
     <pluginManagement>
       <plugins>
         <plugin>
           <groupId>org.apache.accumulo</groupId>
           <artifactId>accumulo-maven-plugin</artifactId>
-          <version>${accumulo.version}</version>
+          <version>2.0.0-SNAPSHOT</version>
           <configuration>
             <instanceName>it-instance-maven</instanceName>
             <rootPassword>ITSecret</rootPassword>
           </configuration>
+          <dependencies>
+            <dependency>
+              <groupId>org.apache.accumulo</groupId>
+              <artifactId>accumulo-minicluster</artifactId>
+              <version>${accumulo.version}</version>
+            </dependency>
+            <dependency>
+              <groupId>org.apache.hadoop</groupId>
+              <artifactId>hadoop-client-api</artifactId>
+              <version>${hadoop.version}</version>
+            </dependency>
+            <dependency>
+              <groupId>org.apache.hadoop</groupId>
+              <artifactId>hadoop-client-runtime</artifactId>
+              <version>${hadoop.version}</version>
+            </dependency>
+            <dependency>
+              <groupId>org.apache.zookeeper</groupId>
+              <artifactId>zookeeper</artifactId>
+              <version>${zookeeper.version}</version>
+            </dependency>
+          </dependencies>
           <executions>
             <execution>
               <id>run-plugin</id>
@@ -315,11 +355,6 @@
               <failOnWarning>true</failOnWarning>
               <ignoredDependencies>
                 <ignoredDependency>log4j:log4j:jar:*</ignoredDependency>
-                
<ignoredDependency>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</ignoredDependency>
-                
<ignoredDependency>org.apache.hadoop:hadoop-client:jar:${hadoop.version}</ignoredDependency>
-                
<ignoredDependency>org.apache.hadoop:hadoop-mapreduce-client-core:jar:${hadoop.version}</ignoredDependency>
-                
<ignoredDependency>org.apache.hadoop:hadoop-yarn-api:jar:${hadoop.version}</ignoredDependency>
-                
<ignoredDependency>org.apache.hadoop:hadoop-yarn-client:jar:${hadoop.version}</ignoredDependency>
                 
<ignoredDependency>org.slf4j:slf4j-log4j12:jar:${slf4j.version}</ignoredDependency>
               </ignoredDependencies>
             </configuration>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to