http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 0e02f05,0000000..7158ea8
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@@ -1,598 -1,0 +1,598 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.log;
 +
++import static com.google.common.base.Charsets.UTF_8;
 +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 +import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 +
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.lang.reflect.InvocationTargetException;
 +import java.lang.reflect.Method;
 +import java.nio.channels.ClosedChannelException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.crypto.CryptoModule;
 +import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 +import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
 +import org.apache.accumulo.core.security.crypto.NoFlushOutputStream;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.StringUtil;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.tserver.TabletMutations;
 +import org.apache.accumulo.tserver.logger.LogFileKey;
 +import org.apache.accumulo.tserver.logger.LogFileValue;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Wrap a connection to a logger.
 + *
 + */
 +public class DfsLogger {
 +  public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) 
---";
 +  public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) 
---";
 +
 +  private static Logger log = Logger.getLogger(DfsLogger.class);
 +
 +  public static class LogClosedException extends IOException {
 +    private static final long serialVersionUID = 1L;
 +
 +    public LogClosedException() {
 +      super("LogClosed");
 +    }
 +  }
 +
 +  /**
 +   * A well-timed tabletserver failure could result in an incomplete header 
written to a write-ahead log. This exception is thrown when the header cannot be
 +   * read from a WAL which should only happen when the tserver dies as 
described.
 +   */
 +  public static class LogHeaderIncompleteException extends IOException {
 +    private static final long serialVersionUID = 1l;
 +
 +    public LogHeaderIncompleteException(String msg) {
 +      super(msg);
 +    }
 +
 +    public LogHeaderIncompleteException(String msg, Throwable cause) {
 +      super(msg, cause);
 +    }
 +
 +    public LogHeaderIncompleteException(Throwable cause) {
 +      super(cause);
 +    }
 +  }
 +
 +  public static class DFSLoggerInputStreams {
 +
 +    private FSDataInputStream originalInput;
 +    private DataInputStream decryptingInputStream;
 +
 +    public DFSLoggerInputStreams(FSDataInputStream originalInput, 
DataInputStream decryptingInputStream) {
 +      this.originalInput = originalInput;
 +      this.decryptingInputStream = decryptingInputStream;
 +    }
 +
 +    public FSDataInputStream getOriginalInput() {
 +      return originalInput;
 +    }
 +
 +    public void setOriginalInput(FSDataInputStream originalInput) {
 +      this.originalInput = originalInput;
 +    }
 +
 +    public DataInputStream getDecryptingInputStream() {
 +      return decryptingInputStream;
 +    }
 +
 +    public void setDecryptingInputStream(DataInputStream 
decryptingInputStream) {
 +      this.decryptingInputStream = decryptingInputStream;
 +    }
 +  }
 +
 +  public interface ServerResources {
 +    AccumuloConfiguration getConfiguration();
 +
 +    VolumeManager getFileSystem();
 +
 +    Set<TServerInstance> getCurrentTServers();
 +  }
 +
 +  private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new 
LinkedBlockingQueue<DfsLogger.LogWork>();
 +
 +  private final Object closeLock = new Object();
 +
 +  private static final DfsLogger.LogWork CLOSED_MARKER = new 
DfsLogger.LogWork(null);
 +
 +  private static final LogFileValue EMPTY = new LogFileValue();
 +
 +  private boolean closed = false;
 +
 +  private class LogSyncingTask implements Runnable {
 +
 +    @Override
 +    public void run() {
 +      ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
 +      boolean sawClosedMarker = false;
 +      while (!sawClosedMarker) {
 +        work.clear();
 +
 +        try {
 +          work.add(workQueue.take());
 +        } catch (InterruptedException ex) {
 +          continue;
 +        }
 +        workQueue.drainTo(work);
 +
 +        try {
 +          sync.invoke(logFile);
 +        } catch (Exception ex) {
 +          log.warn("Exception syncing " + ex);
 +          for (DfsLogger.LogWork logWork : work) {
 +            logWork.exception = ex;
 +          }
 +        }
 +
 +        for (DfsLogger.LogWork logWork : work)
 +          if (logWork == CLOSED_MARKER)
 +            sawClosedMarker = true;
 +          else
 +            logWork.latch.countDown();
 +      }
 +    }
 +  }
 +
 +  static class LogWork {
 +    CountDownLatch latch;
 +    volatile Exception exception;
 +
 +    public LogWork(CountDownLatch latch) {
 +      this.latch = latch;
 +    }
 +  }
 +
 +  public static class LoggerOperation {
 +    private final LogWork work;
 +
 +    public LoggerOperation(LogWork work) {
 +      this.work = work;
 +    }
 +
 +    public void await() throws IOException {
 +      try {
 +        work.latch.await();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +
 +      if (work.exception != null) {
 +        if (work.exception instanceof IOException)
 +          throw (IOException) work.exception;
 +        else if (work.exception instanceof RuntimeException)
 +          throw (RuntimeException) work.exception;
 +        else
 +          throw new RuntimeException(work.exception);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public boolean equals(Object obj) {
 +    // filename is unique
 +    if (obj == null)
 +      return false;
 +    if (obj instanceof DfsLogger)
 +      return getFileName().equals(((DfsLogger) obj).getFileName());
 +    return false;
 +  }
 +
 +  @Override
 +  public int hashCode() {
 +    // filename is unique
 +    return getFileName().hashCode();
 +  }
 +
 +  private final ServerResources conf;
 +  private FSDataOutputStream logFile;
 +  private DataOutputStream encryptingLogFile = null;
 +  private Method sync;
 +  private String logPath;
 +  private Daemon syncThread;
 +
 +  /* Track what's actually in +r/!0 for this logger ref */
 +  private String metaReference;
 +
 +  public DfsLogger(ServerResources conf) throws IOException {
 +    this.conf = conf;
 +  }
 +
 +  /**
 +   * Refernce a pre-existing log file.
 +   *
 +   * @param meta
 +   *          the cq for the "log" entry in +r/!0
 +   */
 +  public DfsLogger(ServerResources conf, String filename, String meta) throws 
IOException {
 +    this.conf = conf;
 +    this.logPath = filename;
 +    metaReference = meta;
 +  }
 +
 +  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager 
fs, Path path, AccumuloConfiguration conf) throws IOException {
 +    FSDataInputStream input = fs.open(path);
 +    DataInputStream decryptingInput = null;
 +
 +    byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
 +    byte[] magicBuffer = new byte[magic.length];
 +    try {
 +      input.readFully(magicBuffer);
 +      if (Arrays.equals(magicBuffer, magic)) {
 +        // additional parameters it needs from the underlying stream.
 +        String cryptoModuleClassname = input.readUTF();
 +        CryptoModule cryptoModule = 
CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 +
 +        // Create the parameters and set the input stream into those 
parameters
 +        CryptoModuleParameters params = 
CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 +        params.setEncryptedInputStream(input);
 +
 +        // Create the plaintext input stream from the encrypted one
 +        params = cryptoModule.getDecryptingInputStream(params);
 +
 +        if (params.getPlaintextInputStream() instanceof DataInputStream) {
 +          decryptingInput = (DataInputStream) 
params.getPlaintextInputStream();
 +        } else {
 +          decryptingInput = new 
DataInputStream(params.getPlaintextInputStream());
 +        }
 +      } else {
 +        input.seek(0);
 +        byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
 +        byte[] magicBufferV2 = new byte[magicV2.length];
 +        input.readFully(magicBufferV2);
 +
 +        if (Arrays.equals(magicBufferV2, magicV2)) {
 +          // Log files from 1.5 dump their options in raw to the logger 
files. Since we don't know the class
 +          // that needs to read those files, we can make a couple of basic 
assumptions. Either it's going to be
 +          // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
 +
 +          // If it's null, we won't have any parameters whatsoever. First, 
let's attempt to read
 +          // parameters
 +          Map<String,String> opts = new HashMap<String,String>();
 +          int count = input.readInt();
 +          for (int i = 0; i < count; i++) {
 +            String key = input.readUTF();
 +            String value = input.readUTF();
 +            opts.put(key, value);
 +          }
 +
 +          if (opts.size() == 0) {
 +            // NullCryptoModule, we're done
 +            decryptingInput = input;
 +          } else {
 +
 +            // The DefaultCryptoModule will want to read the parameters from 
the underlying file, so we will put the file back to that spot.
 +            org.apache.accumulo.core.security.crypto.CryptoModule 
cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
 +                .getCryptoModule(DefaultCryptoModule.class.getName());
 +
 +            CryptoModuleParameters params = 
CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 +
 +            input.seek(0);
 +            input.readFully(magicBufferV2);
 +            params.setEncryptedInputStream(input);
 +
 +            params = cryptoModule.getDecryptingInputStream(params);
 +            if (params.getPlaintextInputStream() instanceof DataInputStream) {
 +              decryptingInput = (DataInputStream) 
params.getPlaintextInputStream();
 +            } else {
 +              decryptingInput = new 
DataInputStream(params.getPlaintextInputStream());
 +            }
 +          }
 +
 +        } else {
 +
 +          input.seek(0);
 +          decryptingInput = input;
 +        }
 +
 +      }
 +    } catch (EOFException e) {
 +      log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file (" + path + ") has no data.");
 +      // A TabletServer might have died before the (complete) header was 
written
 +      throw new LogHeaderIncompleteException(e);
 +    }
 +
 +    return new DFSLoggerInputStreams(input, decryptingInput);
 +  }
 +
 +  public synchronized void open(String address) throws IOException {
 +    String filename = UUID.randomUUID().toString();
 +    String logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
 +
 +    log.debug("DfsLogger.open() begin");
 +    VolumeManager fs = conf.getFileSystem();
 +
 +    logPath = fs.choose(ServerConstants.getBaseUris()) + Path.SEPARATOR + 
ServerConstants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename;
 +
 +    metaReference = toString();
 +    try {
 +      short replication = (short) 
conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
 +      if (replication == 0)
 +        replication = fs.getDefaultReplication(new Path(logPath));
 +      long blockSize = 
conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
 +      if (blockSize == 0)
 +        blockSize = (long) 
(conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
 +      if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
 +        logFile = fs.createSyncable(new Path(logPath), 0, replication, 
blockSize);
 +      else
 +        logFile = fs.create(new Path(logPath), true, 0, replication, 
blockSize);
 +
 +      String syncMethod = 
conf.getConfiguration().get(Property.TSERV_WAL_SYNC_METHOD);
 +      try {
 +        // hsync: send data to datanodes and sync the data to disk
 +        sync = logFile.getClass().getMethod(syncMethod);
 +      } catch (Exception ex) {
 +        log.warn("Could not find configured " + syncMethod + " method, trying 
to fall back to old Hadoop sync method", ex);
 +
 +        try {
 +          // sync: send data to datanodes
 +          sync = logFile.getClass().getMethod("sync");
 +        } catch (Exception e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +
 +      // Initialize the crypto operations.
 +      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = 
org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
 +          .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
 +
 +      // Initialize the log file with a header and the crypto params used to 
set up this log file.
-       logFile.write(LOG_FILE_HEADER_V3.getBytes(Constants.UTF8));
++      logFile.write(LOG_FILE_HEADER_V3.getBytes(UTF_8));
 +
 +      CryptoModuleParameters params = 
CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
 +
 +      NoFlushOutputStream nfos = new NoFlushOutputStream(logFile);
 +      params.setPlaintextOutputStream(nfos);
 +
 +      // In order to bootstrap the reading of this file later, we have to 
record the CryptoModule that was used to encipher it here,
 +      // so that that crypto module can re-read its own parameters.
 +
 +      
logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
 +
 +      params = cryptoModule.getEncryptingOutputStream(params);
 +      OutputStream encipheringOutputStream = 
params.getEncryptedOutputStream();
 +
 +      // If the module just kicks back our original stream, then just use it, 
don't wrap it in
 +      // another data OutputStream.
 +      if (encipheringOutputStream == nfos) {
 +        log.debug("No enciphering, using raw output stream");
 +        encryptingLogFile = nfos;
 +      } else {
 +        log.debug("Enciphering found, wrapping in DataOutputStream");
 +        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
 +      }
 +
 +      LogFileKey key = new LogFileKey();
 +      key.event = OPEN;
 +      key.tserverSession = filename;
 +      key.filename = filename;
 +      write(key, EMPTY);
 +      sync.invoke(logFile);
 +      log.debug("Got new write-ahead log: " + this);
 +    } catch (Exception ex) {
 +      if (logFile != null)
 +        logFile.close();
 +      logFile = null;
 +      encryptingLogFile = null;
 +      throw new IOException(ex);
 +    }
 +
 +    syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask()));
 +    syncThread.setName("Accumulo WALog thread " + toString());
 +    syncThread.start();
 +  }
 +
 +  @Override
 +  public String toString() {
 +    String fileName = getFileName();
 +    if (fileName.contains(":"))
 +      return getLogger() + "/" + getFileName();
 +    return fileName;
 +  }
 +
 +  /**
 +   * get the cq needed to reference this logger's entry in +r/!0
 +   */
 +  public String getMeta() {
 +    if (null == metaReference) {
 +      throw new IllegalStateException("logger doesn't have meta reference. " 
+ this);
 +    }
 +    return metaReference;
 +  }
 +
 +  public String getFileName() {
 +    return logPath.toString();
 +  }
 +
 +  public void close() throws IOException {
 +
 +    synchronized (closeLock) {
 +      if (closed)
 +        return;
 +      // after closed is set to true, nothing else should be added to the 
queue
 +      // CLOSED_MARKER should be the last thing on the queue, therefore when 
the
 +      // background thread sees the marker and exits there should be nothing 
else
 +      // to process... so nothing should be left waiting for the background
 +      // thread to do work
 +      closed = true;
 +      workQueue.add(CLOSED_MARKER);
 +    }
 +
 +    // wait for background thread to finish before closing log file
 +    if (syncThread != null) {
 +      try {
 +        syncThread.join();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    // expect workq should be empty at this point
 +    if (workQueue.size() != 0) {
 +      log.error("WAL work queue not empty after sync thread exited");
 +      throw new IllegalStateException("WAL work queue not empty after sync 
thread exited");
 +    }
 +
 +    if (encryptingLogFile != null)
 +      try {
 +        logFile.close();
 +      } catch (IOException ex) {
 +        log.error(ex);
 +        throw new LogClosedException();
 +      }
 +  }
 +
 +  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) 
throws IOException {
 +    // write this log to the METADATA table
 +    final LogFileKey key = new LogFileKey();
 +    key.event = DEFINE_TABLET;
 +    key.seq = seq;
 +    key.tid = tid;
 +    key.tablet = tablet;
 +    try {
 +      write(key, EMPTY);
 +      sync.invoke(logFile);
 +    } catch (IllegalArgumentException e) {
 +      log.error("Signature of sync method changed. Accumulo is likely 
incompatible with this version of Hadoop.");
 +      throw new RuntimeException(e);
 +    } catch (IllegalAccessException e) {
 +      log.error("Could not invoke sync method due to permission error.");
 +      throw new RuntimeException(e);
 +    } catch (InvocationTargetException e) {
 +      Throwable cause = e.getCause();
 +      if (cause instanceof IOException) {
 +        throw (IOException) cause;
 +      } else if (cause instanceof RuntimeException) {
 +        throw (RuntimeException) cause;
 +      } else if (cause instanceof Error) {
 +        throw (Error) cause;
 +      } else {
 +        // Cause is null, or some other checked exception that was added 
later.
 +        throw new RuntimeException(e);
 +      }
 +    }
 +  }
 +
 +  private synchronized void write(LogFileKey key, LogFileValue value) throws 
IOException {
 +    key.write(encryptingLogFile);
 +    value.write(encryptingLogFile);
 +    encryptingLogFile.flush();
 +  }
 +
 +  public LoggerOperation log(int seq, int tid, Mutation mutation) throws 
IOException {
 +    return logManyTablets(Collections.singletonList(new TabletMutations(tid, 
seq, Collections.singletonList(mutation))));
 +  }
 +
 +  private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> 
keys) throws IOException {
 +    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
 +    synchronized (DfsLogger.this) {
 +      try {
 +        for (Pair<LogFileKey,LogFileValue> pair : keys) {
 +          write(pair.getFirst(), pair.getSecond());
 +        }
 +      } catch (ClosedChannelException ex) {
 +        throw new LogClosedException();
 +      } catch (Exception e) {
 +        log.error(e, e);
 +        work.exception = e;
 +      }
 +    }
 +
 +    synchronized (closeLock) {
 +      // use a different lock for close check so that adding to work queue 
does not need
 +      // to wait on walog I/O operations
 +
 +      if (closed)
 +        throw new LogClosedException();
 +      workQueue.add(work);
 +    }
 +
 +    return new LoggerOperation(work);
 +  }
 +
 +  public LoggerOperation logManyTablets(List<TabletMutations> mutations) 
throws IOException {
 +    List<Pair<LogFileKey,LogFileValue>> data = new 
ArrayList<Pair<LogFileKey,LogFileValue>>();
 +    for (TabletMutations tabletMutations : mutations) {
 +      LogFileKey key = new LogFileKey();
 +      key.event = MANY_MUTATIONS;
 +      key.seq = tabletMutations.getSeq();
 +      key.tid = tabletMutations.getTid();
 +      LogFileValue value = new LogFileValue();
 +      value.mutations = tabletMutations.getMutations();
 +      data.add(new Pair<LogFileKey,LogFileValue>(key, value));
 +    }
 +    return logFileData(data);
 +  }
 +
 +  public LoggerOperation minorCompactionFinished(int seq, int tid, String 
fqfn) throws IOException {
 +    LogFileKey key = new LogFileKey();
 +    key.event = COMPACTION_FINISH;
 +    key.seq = seq;
 +    key.tid = tid;
 +    return logFileData(Collections.singletonList(new 
Pair<LogFileKey,LogFileValue>(key, EMPTY)));
 +  }
 +
 +  public LoggerOperation minorCompactionStarted(int seq, int tid, String 
fqfn) throws IOException {
 +    LogFileKey key = new LogFileKey();
 +    key.event = COMPACTION_START;
 +    key.seq = seq;
 +    key.tid = tid;
 +    key.filename = fqfn;
 +    return logFileData(Collections.singletonList(new 
Pair<LogFileKey,LogFileValue>(key, EMPTY)));
 +  }
 +
 +  public String getLogger() {
 +    String parts[] = logPath.split("/");
 +    return StringUtil.join(Arrays.asList(parts[parts.length - 
2].split("[+]")), ":");
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
index 997f71b,0000000..2c807fd
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
@@@ -1,96 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.logger;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.server.data.ServerMutation;
 +import org.apache.hadoop.io.Writable;
 +
 +public class LogFileValue implements Writable {
 +  
 +  private static final List<Mutation> empty = Collections.emptyList();
 +  
 +  public List<Mutation> mutations = empty;
 +  
 +  @Override
 +  public void readFields(DataInput in) throws IOException {
 +    int count = in.readInt();
 +    mutations = new ArrayList<Mutation>(count);
 +    for (int i = 0; i < count; i++) {
 +      ServerMutation mutation = new ServerMutation();
 +      mutation.readFields(in);
 +      mutations.add(mutation);
 +    }
 +  }
 +  
 +  @Override
 +  public void write(DataOutput out) throws IOException {
 +    out.writeInt(mutations.size());
 +    for (Mutation m : mutations) {
 +      m.write(out);
 +    }
 +  }
 +  
 +  public static void print(LogFileValue value) {
 +    System.out.println(value.toString());
 +  }
 +  
 +  private static String displayLabels(byte[] labels) {
-     String s = new String(labels, Constants.UTF8);
++    String s = new String(labels, UTF_8);
 +    s = s.replace("&", " & ");
 +    s = s.replace("|", " | ");
 +    return s;
 +  }
 +  
 +  public static String format(LogFileValue lfv, int maxMutations) {
 +    if (lfv.mutations.size() == 0)
 +      return "";
 +    StringBuilder builder = new StringBuilder();
 +    builder.append(lfv.mutations.size() + " mutations:\n");
 +    int i = 0;
 +    for (Mutation m : lfv.mutations) {
 +      if (i++ >= maxMutations) {
 +        builder.append("...");
 +        break;
 +      }
-       builder.append("  ").append(new String(m.getRow(), 
Constants.UTF8)).append("\n");
++      builder.append("  ").append(new String(m.getRow(), UTF_8)).append("\n");
 +      for (ColumnUpdate update : m.getUpdates()) {
 +        String value = new String(update.getValue());
-         builder.append("      ").append(new String(update.getColumnFamily(), 
Constants.UTF8)).append(":")
-                 .append(new String(update.getColumnQualifier(), 
Constants.UTF8)).append(" ").append(update.hasTimestamp() ? "[user]:" : 
"[system]:")
++        builder.append("      ").append(new String(update.getColumnFamily(), 
UTF_8)).append(":")
++                .append(new String(update.getColumnQualifier(), 
UTF_8)).append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:")
 +                .append(update.getTimestamp()).append(" 
[").append(displayLabels(update.getColumnVisibility())).append("] ")
 +                .append(update.isDeleted() ? "<deleted>" : 
value).append("\n");
 +      }
 +    }
 +    return builder.toString();
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    return format(this, 5);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index c3f4fd0,0000000..29eefc8
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -1,178 -1,0 +1,179 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.logger;
 +
++import static com.google.common.base.Charsets.UTF_8;
++
 +import java.io.DataInputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
- import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.tserver.log.DfsLogger;
 +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 +import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 +import org.apache.accumulo.tserver.log.MultiReader;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +import com.beust.jcommander.JCommander;
 +import com.beust.jcommander.Parameter;
 +
 +public class LogReader {
 +  private static final Logger log = Logger.getLogger(LogReader.class);
 +
 +  static class Opts extends Help {
 +    @Parameter(names = "-r", description = "print only mutations associated 
with the given row")
 +    String row;
 +    @Parameter(names = "-m", description = "limit the number of mutations 
printed per row")
 +    int maxMutations = 5;
 +    @Parameter(names = "-t", description = "print only mutations that fall 
within the given key extent")
 +    String extent;
 +    @Parameter(names = "-p", description = "search for a row that matches the 
given regex")
 +    String regexp;
 +    @Parameter(description = "<logfile> { <logfile> ... }")
 +    List<String> files = new ArrayList<String>();
 +  }
 +
 +  /**
 +   * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or 
local file system.
 +   *
 +   * @param args
 +   *          - first argument is the file to print
 +   */
 +  public static void main(String[] args) throws IOException {
 +    Opts opts = new Opts();
 +    opts.parseArgs(LogReader.class.getName(), args);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +
 +    Matcher rowMatcher = null;
 +    KeyExtent ke = null;
 +    Text row = null;
 +    if (opts.files.isEmpty()) {
 +      new JCommander(opts).usage();
 +      return;
 +    }
 +    if (opts.row != null)
 +      row = new Text(opts.row);
 +    if (opts.extent != null) {
 +      String sa[] = opts.extent.split(";");
 +      ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
 +    }
 +    if (opts.regexp != null) {
 +      Pattern pattern = Pattern.compile(opts.regexp);
 +      rowMatcher = pattern.matcher("");
 +    }
 +
 +    Set<Integer> tabletIds = new HashSet<Integer>();
 +
 +    for (String file : opts.files) {
 +
 +      Path path = new Path(file);
 +      LogFileKey key = new LogFileKey();
 +      LogFileValue value = new LogFileValue();
 +
 +      if (fs.isFile(path)) {
 +        // read log entries from a simple hdfs file
 +        DFSLoggerInputStreams streams;
 +        try {
 +          streams = DfsLogger.readHeaderAndReturnStream(fs, path, 
ServerConfiguration.getSiteConfiguration());
 +        } catch (LogHeaderIncompleteException e) {
 +          log.warn("Could not read header for " + path + ". Ignoring...");
 +          continue;
 +        }
 +        DataInputStream input = streams.getDecryptingInputStream();
 +
 +        try {
 +          while (true) {
 +            try {
 +              key.readFields(input);
 +              value.readFields(input);
 +            } catch (EOFException ex) {
 +              break;
 +            }
 +            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
 +          }
 +        } finally {
 +          input.close();
 +        }
 +      } else {
 +        // read the log entries sorted in a map file
 +        MultiReader input = new MultiReader(fs, path);
 +        while (input.next(key, value)) {
 +          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
 +        }
 +      }
 +    }
 +  }
 +
 +  public static void printLogEvent(LogFileKey key, LogFileValue value, Text 
row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int 
maxMutations) {
 +
 +    if (ke != null) {
 +      if (key.event == LogEvents.DEFINE_TABLET) {
 +        if (key.tablet.equals(ke)) {
 +          tabletIds.add(key.tid);
 +        } else {
 +          return;
 +        }
 +      } else if (!tabletIds.contains(key.tid)) {
 +        return;
 +      }
 +    }
 +
 +    if (row != null || rowMatcher != null) {
 +      if (key.event == LogEvents.MUTATION || key.event == 
LogEvents.MANY_MUTATIONS) {
 +        boolean found = false;
 +        for (Mutation m : value.mutations) {
 +          if (row != null && new Text(m.getRow()).equals(row)) {
 +            found = true;
 +            break;
 +          }
 +
 +          if (rowMatcher != null) {
-             rowMatcher.reset(new String(m.getRow(), Constants.UTF8));
++            rowMatcher.reset(new String(m.getRow(), UTF_8));
 +            if (rowMatcher.matches()) {
 +              found = true;
 +              break;
 +            }
 +          }
 +        }
 +
 +        if (!found)
 +          return;
 +      } else {
 +        return;
 +      }
 +
 +    }
 +
 +    System.out.println(key);
 +    System.out.println(LogFileValue.format(value, maxMutations));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/NativeMapConcurrencyTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
index a679c48,ef6311b..3c20858
--- a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
@@@ -71,98 -72,71 +72,98 @@@ public class TestBinaryRows 
      return l;
    }
    
 -  static class Opts extends ClientOnRequiredTable {
 +  public static class Opts extends ClientOnRequiredTable {
      @Parameter(names="--mode", description="either 'ingest', 'delete', 
'randomLookups', 'split', 'verify', 'verifyDeleted'", required=true)
 -    String mode;
 +    public String mode;
      @Parameter(names="--start", description="the lowest numbered row")
 -    long start = 0;
 +    public long start = 0;
      @Parameter(names="--count", description="number of rows to ingest", 
required=true)
 -    long num = 0;
 +    public long num = 0;
    }
    
 -  public static void main(String[] args) {
 -    Opts opts = new Opts();
 -    BatchWriterOpts bwOpts = new BatchWriterOpts();
 -    ScannerOpts scanOpts = new ScannerOpts();
 -    opts.parseArgs(TestBinaryRows.class.getName(), args, scanOpts, bwOpts);
 +  public static void runTest(Connector connector, Opts opts, BatchWriterOpts 
bwOpts, ScannerOpts scanOpts) throws Exception {
      
 -    try {
 -      Connector connector = opts.getConnector();
 -      
 -      final Text CF = new Text("cf"), CQ = new Text("cq");
 -      final byte[] CF_BYTES = "cf".getBytes(UTF_8), CQ_BYTES = 
"cq".getBytes(UTF_8);
 +    final Text CF = new Text("cf"), CQ = new Text("cq");
-     final byte[] CF_BYTES = "cf".getBytes(Constants.UTF8), CQ_BYTES = 
"cq".getBytes(Constants.UTF8);
++    final byte[] CF_BYTES = "cf".getBytes(UTF_8), CQ_BYTES = 
"cq".getBytes(UTF_8);
 +    if (opts.mode.equals("ingest") || opts.mode.equals("delete")) {
 +      BatchWriter bw = connector.createBatchWriter(opts.tableName, 
bwOpts.getBatchWriterConfig());
 +      boolean delete = opts.mode.equals("delete");
        
 -      if (opts.mode.equals("ingest") || opts.mode.equals("delete")) {
 -        BatchWriter bw = connector.createBatchWriter(opts.tableName, 
bwOpts.getBatchWriterConfig());
 -        boolean delete = opts.mode.equals("delete");
 -        
 -        for (long i = 0; i < opts.num; i++) {
 -          byte[] row = encodeLong(i + opts.start);
 -          String value = "" + (i + opts.start);
 -          
 -          Mutation m = new Mutation(new Text(row));
 -          if (delete) {
 -            m.putDelete(CF, CQ);
 -          } else {
 -            m.put(CF, CQ, new Value(value.getBytes(UTF_8)));
 -          }
 -          bw.addMutation(m);
 +      for (long i = 0; i < opts.num; i++) {
 +        byte[] row = encodeLong(i + opts.start);
 +        String value = "" + (i + opts.start);
 +        
 +        Mutation m = new Mutation(new Text(row));
 +        if (delete) {
 +          m.putDelete(CF, CQ);
 +        } else {
-           m.put(CF, CQ, new Value(value.getBytes(Constants.UTF8)));
++          m.put(CF, CQ, new Value(value.getBytes(UTF_8)));
          }
 -        
 -        bw.close();
 -      } else if (opts.mode.equals("verifyDeleted")) {
 -        Scanner s = connector.createScanner(opts.tableName, opts.auths);
 -        s.setBatchSize(scanOpts.scanBatchSize);
 -        Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, 
new byte[0], Long.MAX_VALUE);
 -        Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), 
CF_BYTES, CQ_BYTES, new byte[0], 0);
 -        s.setBatchSize(50000);
 -        s.setRange(new Range(startKey, stopKey));
 -        
 -        for (Entry<Key,Value> entry : s) {
 -          System.err.println("ERROR : saw entries in range that should be 
deleted ( first value : " + entry.getValue().toString() + ")");
 -          System.err.println("exiting...");
 -          System.exit(1);
 -        }
 -        
 -      } else if (opts.mode.equals("verify")) {
 -        long t1 = System.currentTimeMillis();
 +        bw.addMutation(m);
 +      }
 +      
 +      bw.close();
 +    } else if (opts.mode.equals("verifyDeleted")) {
 +      Scanner s = connector.createScanner(opts.tableName, opts.auths);
 +      s.setBatchSize(scanOpts.scanBatchSize);
 +      Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new 
byte[0], Long.MAX_VALUE);
 +      Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, 
CQ_BYTES, new byte[0], 0);
 +      s.setBatchSize(50000);
 +      s.setRange(new Range(startKey, stopKey));
 +      
 +      for (Entry<Key,Value> entry : s) {
 +        throw new Exception("ERROR : saw entries in range that should be 
deleted ( first value : " + entry.getValue().toString() + ")");
 +      }
 +      
 +    } else if (opts.mode.equals("verify")) {
 +      long t1 = System.currentTimeMillis();
 +      
 +      Scanner s = connector.createScanner(opts.tableName, opts.auths);
 +      Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new 
byte[0], Long.MAX_VALUE);
 +      Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, 
CQ_BYTES, new byte[0], 0);
 +      s.setBatchSize(scanOpts.scanBatchSize);
 +      s.setRange(new Range(startKey, stopKey));
 +      
 +      long i = opts.start;
 +      
 +      for (Entry<Key,Value> e : s) {
 +        Key k = e.getKey();
 +        Value v = e.getValue();
 +
 +        checkKeyValue(i, k, v);
 +
 +        i++;
 +      }
 +      
 +      if (i != opts.start + opts.num) {
 +        throw new Exception("ERROR : did not see expected number of rows, saw 
" + (i - opts.start) + " expected " + opts.num);
 +      }
 +      
 +      long t2 = System.currentTimeMillis();
 +      
 +      System.out.printf("time : %9.2f secs%n", ((t2 - t1) / 1000.0));
 +      System.out.printf("rate : %9.2f entries/sec%n", opts.num / ((t2 - t1) / 
1000.0));
 +      
 +    } else if (opts.mode.equals("randomLookups")) {
 +      int numLookups = 1000;
 +      
 +      Random r = new Random();
 +      
 +      long t1 = System.currentTimeMillis();
 +      
 +      for (int i = 0; i < numLookups; i++) {
 +        long row = ((r.nextLong() & 0x7fffffffffffffffl) % opts.num) + 
opts.start;
          
          Scanner s = connector.createScanner(opts.tableName, opts.auths);
 -        Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, 
new byte[0], Long.MAX_VALUE);
 -        Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), 
CF_BYTES, CQ_BYTES, new byte[0], 0);
          s.setBatchSize(scanOpts.scanBatchSize);
 +        Key startKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new 
byte[0], Long.MAX_VALUE);
 +        Key stopKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new 
byte[0], 0);
          s.setRange(new Range(startKey, stopKey));
          
 -        long i = opts.start;
 +        Iterator<Entry<Key,Value>> si = s.iterator();
          
 -        for (Entry<Key,Value> e : s) {
 +        if (si.hasNext()) {
 +          Entry<Key,Value> e = si.next();
            Key k = e.getKey();
            Value v = e.getValue();
            

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TestIngest.java
index bc3ff05,5d8e268..c3e77af
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@@ -16,16 -16,14 +16,17 @@@
   */
  package org.apache.accumulo.test;
  
+ import static com.google.common.base.Charsets.UTF_8;
+ 
 +import java.io.IOException;
  import java.util.Map.Entry;
  import java.util.Random;
  import java.util.Set;
  import java.util.TreeSet;
  
- import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
index 7a848a3,125ef5f..16f0b3f
--- a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
@@@ -16,11 -16,11 +16,12 @@@
   */
  package org.apache.accumulo.test;
  
+ import static com.google.common.base.Charsets.UTF_8;
+ 
  import java.util.ArrayList;
 +import java.util.List;
  import java.util.Map.Entry;
  
- import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.cli.BatchWriterOpts;
  import org.apache.accumulo.core.cli.ScannerOpts;
  import org.apache.accumulo.core.client.AccumuloException;
@@@ -101,8 -97,8 +102,8 @@@ public class TestMultiTableIngest 
        
        // populate
        for (int i = 0; i < opts.count; i++) {
 -        Mutation m = new Mutation(new Text(String.format("%05d", i)));
 +        Mutation m = new Mutation(new Text(String.format("%06d", i)));
-         m.put(new Text("col" + Integer.toString((i % 3) + 1)), new 
Text("qual"), new Value("junk".getBytes(Constants.UTF8)));
+         m.put(new Text("col" + Integer.toString((i % 3) + 1)), new 
Text("qual"), new Value("junk".getBytes(UTF_8)));
          b.getBatchWriter(tableNames.get(i % 
tableNames.size())).addMutation(m);
        }
        try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
index 6082ebe,5962220..4f88c1b
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
@@@ -26,7 -28,7 +28,6 @@@ import java.util.Map.Entry
  import java.util.Random;
  import java.util.zip.CRC32;
  
--import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Scanner;
  import org.apache.accumulo.core.data.Key;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
index 4bd07ca,6124a3d..11d19c5
--- 
a/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
@@@ -82,9 -82,9 +83,9 @@@ public class UndefinedAnalyzer 
          parseLog(log);
        }
      }
 -    
 +
      private void parseLog(File log) throws Exception {
-       BufferedReader reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(log), Constants.UTF8));
+       BufferedReader reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(log), UTF_8));
        String line;
        TreeMap<Long,Long> tm = null;
        try {
@@@ -173,10 -172,10 +174,10 @@@
        SimpleDateFormat sdf = new SimpleDateFormat("dd HH:mm:ss,SSS yyyy MM");
        String currentYear = (Calendar.getInstance().get(Calendar.YEAR)) + "";
        String currentMonth = (Calendar.getInstance().get(Calendar.MONTH) + 1) 
+ "";
 -      
 +
        for (File masterLog : masterLogs) {
 -        
 +
-         BufferedReader reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(masterLog), Constants.UTF8));
+         BufferedReader reader = new BufferedReader(new InputStreamReader(new 
FileInputStream(masterLog), UTF_8));
          String line;
          try {
            while ((line = reader.readLine()) != null) {
@@@ -256,11 -252,11 +257,11 @@@
    public static void main(String[] args) throws Exception {
      Opts opts = new Opts();
      BatchScannerOpts bsOpts = new BatchScannerOpts();
 -    opts.parseArgs(UndefinedAnalyzer.class.getName(), args, opts);
 -    
 +    opts.parseArgs(UndefinedAnalyzer.class.getName(), args, bsOpts);
 +
      List<UndefinedNode> undefs = new ArrayList<UndefinedNode>();
 -    
 +
-     BufferedReader reader = new BufferedReader(new 
InputStreamReader(System.in, Constants.UTF8));
+     BufferedReader reader = new BufferedReader(new 
InputStreamReader(System.in, UTF_8));
      String line;
      while ((line = reader.readLine()) != null) {
        String[] tokens = line.split("\\s");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index f26c8d7,166e2ad..949fc52
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@@ -16,6 -16,10 +16,8 @@@
   */
  package org.apache.accumulo.test.functional;
  
+ import static com.google.common.base.Charsets.UTF_8;
+ 
 -import java.net.InetAddress;
 -import java.net.InetSocketAddress;
  import java.util.HashMap;
  import java.util.Random;
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
index cb17340,33ef0b5..72efee3
--- 
a/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
@@@ -24,7 -26,7 +26,6 @@@ import java.util.Random
  import java.util.TreeSet;
  import java.util.UUID;
  
--import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.BatchScanner;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.BatchWriterConfig;
@@@ -104,10 -100,10 +105,10 @@@ public class MetadataBatchScanTest 
          
          String dir = "/t-" + UUID.randomUUID();
          
-         TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new 
Value(dir.getBytes(Constants.UTF8)));
 -        Constants.METADATA_DIRECTORY_COLUMN.put(mut, new 
Value(dir.getBytes(UTF_8)));
++        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new 
Value(dir.getBytes(UTF_8)));
          
          for (int i = 0; i < 5; i++) {
-           mut.put(DataFileColumnFamily.NAME, new Text(dir + "/00000_0000" + i 
+ ".map"), new Value("10000,1000000".getBytes(Constants.UTF8)));
 -          mut.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(dir + 
"/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(UTF_8)));
++          mut.put(DataFileColumnFamily.NAME, new Text(dir + "/00000_0000" + i 
+ ".map"), new Value("10000,1000000".getBytes(UTF_8)));
          }
          
          bw.addMutation(mut);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index 5756934,0191b50..e4ce3f3
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@@ -41,9 -37,7 +43,8 @@@ import javax.xml.parsers.DocumentBuilde
  import javax.xml.validation.Schema;
  import javax.xml.validation.SchemaFactory;
  
- import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
  import org.apache.log4j.Level;
  import org.w3c.dom.Document;
  import org.w3c.dom.Element;
@@@ -201,163 -193,109 +202,163 @@@ public class Module extends Node 
      if (fixture != null) {
        fixture.setUp(state);
      }
 -    
 -    Node initNode = getNode(initNodeId);
 -    
 -    boolean test = false;
 -    if (initNode instanceof Test) {
 -      startTimer(initNode);
 -      test = true;
 -    }
 -    initNode.visit(state, getProps(initNodeId));
 -    if (test)
 -      stopTimer(initNode);
 -    
 -    state.visitedNode();
 -    // update aliases
 -    Set<String> aliases;
 -    if ((aliases = aliasMap.get(initNodeId)) != null)
 -      for (String alias : aliases) {
 -        ((Alias) nodes.get(alias)).update(initNodeId);
 -      }
 -    
 -    String curNodeId = initNodeId;
 -    int numHops = 0;
 -    long startTime = System.currentTimeMillis() / 1000;
 -    while (true) {
 -      // check if END state was reached
 -      if (curNodeId.equalsIgnoreCase("END")) {
 -        log.debug("reached END state");
 -        break;
 -      }
 -      // check if maxSec was reached
 -      long curTime = System.currentTimeMillis() / 1000;
 -      if ((curTime - startTime) > maxSec) {
 -        log.debug("reached maxSec(" + maxSec + ")");
 -        break;
 -      }
 -      // check if maxHops was reached
 -      if (numHops > maxHops) {
 -        log.debug("reached maxHops(" + maxHops + ")");
 -        break;
 -      }
 -      numHops++;
 -      
 -      if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
 -        throw new Exception("Reached node(" + curNodeId + ") without outgoing 
edges in module(" + this + ")");
 -      }
 -      AdjList adj = adjMap.get(curNodeId);
 -      String nextNodeId = adj.randomNeighbor();
 -      Node nextNode = getNode(nextNodeId);
 -      if (nextNode instanceof Alias) {
 -        nextNodeId = ((Alias) nextNode).getTargetId();
 -        nextNode = ((Alias) nextNode).get();
 +
 +    ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
 +
 +    try {
 +      Node initNode = getNode(initNodeId);
 +
 +      boolean test = false;
 +      if (initNode instanceof Test) {
 +        startTimer(initNode);
 +        test = true;
        }
 -      Properties nodeProps = getProps(nextNodeId);
 -      try {
 -        test = false;
 -        if (nextNode instanceof Test) {
 -          startTimer(nextNode);
 -          test = true;
 +      initNode.visit(state, getProps(initNodeId));
 +      if (test)
 +        stopTimer(initNode);
 +
 +      state.visitedNode();
 +      // update aliases
 +      Set<String> aliases;
 +      if ((aliases = aliasMap.get(initNodeId)) != null)
 +        for (String alias : aliases) {
 +          ((Alias) nodes.get(alias)).update(initNodeId);
          }
 -        nextNode.visit(state, nodeProps);
 -        if (test)
 -          stopTimer(nextNode);
 -      } catch (Exception e) {
 -        log.debug("Connector belongs to user: " + 
state.getConnector().whoami());
 -        log.debug("Exception occured at: " + System.currentTimeMillis());
 -        log.debug("Properties for node: " + nextNodeId);
 -        for (Entry<Object,Object> entry : nodeProps.entrySet()) {
 -          log.debug("  " + entry.getKey() + ": " + entry.getValue());
 +
 +      String curNodeId = initNodeId;
 +      int numHops = 0;
 +      long startTime = System.currentTimeMillis() / 1000;
 +      while (true) {
 +        // check if END state was reached
 +        if (curNodeId.equalsIgnoreCase("END")) {
 +          log.debug("reached END state");
 +          break;
          }
 -        log.debug("Overall Properties");
 -        for (Entry<Object,Object> entry : state.getProperties().entrySet()) {
 -          log.debug("  " + entry.getKey() + ": " + entry.getValue());
 +        // check if maxSec was reached
 +        long curTime = System.currentTimeMillis() / 1000;
 +        if ((curTime - startTime) > maxSec) {
 +          log.debug("reached maxSec(" + maxSec + ")");
 +          break;
          }
 -        log.debug("State information");
 -        for (String key : new TreeSet<String>(state.getMap().keySet()))  {
 -          Object value = state.getMap().get(key);
 -          String logMsg = "  " + key + ": ";
 -          if (value == null)
 -            logMsg += "null";
 -          else if (value instanceof String || value instanceof Map || value 
instanceof Collection || value instanceof Number)
 -            logMsg += value;
 -          else if (value instanceof byte[])
 -            logMsg += new String((byte[])value, UTF_8);
 -          else if (value instanceof PasswordToken)
 -            logMsg += new String(((PasswordToken) value).getPassword(), 
UTF_8);
 -          else
 -            logMsg += value.getClass()+ " - " + value;
 -          
 -          log.debug(logMsg);
 +
 +        // The number of seconds before the test should exit
 +        long secondsRemaining = maxSec - (curTime - startTime);
 +
 +        // check if maxHops was reached
 +        if (numHops > maxHops) {
 +          log.debug("reached maxHops(" + maxHops + ")");
 +          break;
          }
 -        throw new Exception("Error running node " + nextNodeId, e);
 -      }
 -      state.visitedNode();
 -      
 -      // update aliases
 -      if ((aliases = aliasMap.get(curNodeId)) != null)
 -        for (String alias : aliases) {
 -          ((Alias) nodes.get(alias)).update(curNodeId);
 +        numHops++;
 +
 +        if (!adjMap.containsKey(curNodeId) && 
!curNodeId.startsWith("alias.")) {
 +          throw new Exception("Reached node(" + curNodeId + ") without 
outgoing edges in module(" + this + ")");
 +        }
 +        AdjList adj = adjMap.get(curNodeId);
 +        String nextNodeId = adj.randomNeighbor();
 +        final Node nextNode;
 +        Node nextNodeOrAlias = getNode(nextNodeId);
 +        if (nextNodeOrAlias instanceof Alias) {
 +          nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
 +          nextNode = ((Alias) nextNodeOrAlias).get();
 +        } else {
 +          nextNode = nextNodeOrAlias;
 +        }
 +        final Properties nodeProps = getProps(nextNodeId);
 +        try {
 +          test = false;
 +          if (nextNode instanceof Test) {
 +            startTimer(nextNode);
 +            test = true;
 +          }
 +
 +          // Wrap the visit of the next node in the module in a callable that 
returns a thrown exception
 +          FutureTask<Exception> task = new FutureTask<Exception>(new 
Callable<Exception>() {
 +
 +            @Override
 +            public Exception call() throws Exception {
 +              try {
 +                nextNode.visit(state, nodeProps);
 +                return null;
 +              } catch (Exception e) {
 +                return e;
 +              }
 +            }
 +
 +          });
 +
 +          // Run the task (should execute immediately)
 +          service.submit(task);
 +
 +          Exception nodeException;
 +          try {
 +            // Bound the time we'll wait for the node to complete
 +            nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
 +          } catch (InterruptedException e) {
 +            log.warn("Interrupted waiting for " + 
nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
 +            break;
 +          } catch (ExecutionException e) {
 +            log.error("Caught error executing " + 
nextNode.getClass().getSimpleName(), e);
 +            throw e;
 +          } catch (TimeoutException e) {
 +            log.info("Timed out waiting for " + 
nextNode.getClass().getSimpleName() + " to complete (waited " + 
secondsRemaining + " seconds). Exiting.", e);
 +            break;
 +          }
 +
 +          // The RandomWalk node throw an Exception that that Callable handed 
back
 +          // Throw it and let the Module perform cleanup
 +          if (null != nodeException) {
 +            throw nodeException;
 +          }
 +
 +          if (test)
 +            stopTimer(nextNode);
 +        } catch (Exception e) {
 +          log.debug("Connector belongs to user: " + 
state.getConnector().whoami());
 +          log.debug("Exception occured at: " + System.currentTimeMillis());
 +          log.debug("Properties for node: " + nextNodeId);
 +          for (Entry<Object,Object> entry : nodeProps.entrySet()) {
 +            log.debug("  " + entry.getKey() + ": " + entry.getValue());
 +          }
 +          log.debug("Overall Properties");
 +          for (Entry<Object,Object> entry : state.getProperties().entrySet()) 
{
 +            log.debug("  " + entry.getKey() + ": " + entry.getValue());
 +          }
 +          log.debug("State information");
 +          for (String key : new TreeSet<String>(state.getMap().keySet())) {
 +            Object value = state.getMap().get(key);
 +            String logMsg = "  " + key + ": ";
 +            if (value == null)
 +              logMsg += "null";
 +            else if (value instanceof String || value instanceof Map || value 
instanceof Collection || value instanceof Number)
 +              logMsg += value;
 +            else if (value instanceof byte[])
-               logMsg += new String((byte[]) value, Constants.UTF8);
++              logMsg += new String((byte[]) value, UTF_8);
 +            else if (value instanceof PasswordToken)
-               logMsg += new String(((PasswordToken) value).getPassword(), 
Constants.UTF8);
++              logMsg += new String(((PasswordToken) value).getPassword(), 
UTF_8);
 +            else
 +              logMsg += value.getClass() + " - " + value;
 +
 +            log.debug(logMsg);
 +          }
 +          throw new Exception("Error running node " + nextNodeId, e);
          }
 -      
 -      curNodeId = nextNodeId;
 +        state.visitedNode();
 +
 +        // update aliases
 +        if ((aliases = aliasMap.get(curNodeId)) != null)
 +          for (String alias : aliases) {
 +            ((Alias) nodes.get(alias)).update(curNodeId);
 +          }
 +
 +        curNodeId = nextNodeId;
 +      }
 +    } finally {
 +      if (null != service) {
 +        service.shutdownNow();
 +      }
      }
 -    
 +
      if (teardown && (fixture != null)) {
        log.debug("tearing down module");
        fixture.tearDown(state);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
index 1704e49,05cc4f0..67438d9
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
@@@ -20,10 -21,10 +21,10 @@@ import static com.google.common.base.Ch
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.test.randomwalk.State;
  
 -public class BulkMinusOne extends BulkTest {
 -  
 +public class BulkMinusOne extends BulkImportTest {
 +
-   private static final Value negOne = new 
Value("-1".getBytes(Constants.UTF8));
+   private static final Value negOne = new Value("-1".getBytes(UTF_8));
 -  
 +
    @Override
    protected void runLater(State state) throws Exception {
      log.info("Decrementing");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
index f002274,7f9a218..3dcff6e
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
@@@ -51,10 -52,10 +52,10 @@@ public class BatchWrite extends Test 
        try {
          int numRows = rand.nextInt(100000);
          for (int i = 0; i < numRows; i++) {
 -          Mutation m = new Mutation(String.format("%016x", (rand.nextLong() & 
0x7fffffffffffffffl)));
 -          long val = (rand.nextLong() & 0x7fffffffffffffffl);
 +          Mutation m = new Mutation(String.format("%016x", rand.nextLong() & 
0x7fffffffffffffffl));
 +          long val = rand.nextLong() & 0x7fffffffffffffffl;
            for (int j = 0; j < 10; j++) {
-             m.put("cf", "cq" + j, new Value(String.format("%016x", 
val).getBytes(Constants.UTF8)));
+             m.put("cf", "cq" + j, new Value(String.format("%016x", 
val).getBytes(UTF_8)));
            }
            
            bw.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
index c8da3d8,c227a11..b3519b6
--- 
a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
@@@ -47,8 -50,8 +49,8 @@@ public class StopTabletServer extends T
            Collections.sort(children);
            Stat stat = new Stat();
            byte[] data = rdr.getData(base + "/" + child + "/" + 
children.get(0), stat);
-           if (!"master".equals(new String(data, Constants.UTF8))) {
+           if (!"master".equals(new String(data, UTF_8))) {
 -            result.add(new TServerInstance(AddressUtil.parseAddress(child, 
Property.TSERV_CLIENTPORT), stat.getEphemeralOwner()));
 +            result.add(new TServerInstance(AddressUtil.parseAddress(child, 
false), stat.getEphemeralOwner()));
            }
          }
        } catch (KeeperException.NoNodeException ex) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/image/Write.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/security/SecurityHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/Write.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/BulkInsert.java
----------------------------------------------------------------------

Reply via email to