http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
new file mode 100644
index 0000000..d2a394a
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -0,0 +1,714 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import 
org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+
+/**
+ * WAL implementation of the ProcedureStore.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class WALProcedureStore implements ProcedureStore {
+  private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
+
+  public interface LeaseRecovery {
+    void recoverFileLease(FileSystem fs, Path path) throws IOException;
+  }
+
+  private static final int MAX_RETRIES_BEFORE_ABORT = 3;
+
+  private static final String SYNC_WAIT_MSEC_CONF_KEY = 
"hbase.procedure.store.wal.sync.wait.msec";
+  private static final int DEFAULT_SYNC_WAIT_MSEC = 50;
+
+  private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
+    new CopyOnWriteArrayList<ProcedureStoreListener>();
+
+  private final LinkedList<ProcedureWALFile> logs = new 
LinkedList<ProcedureWALFile>();
+  private final ProcedureStoreTracker storeTracker = new 
ProcedureStoreTracker();
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition waitCond = lock.newCondition();
+  private final Condition slotCond = lock.newCondition();
+  private final Condition syncCond = lock.newCondition();
+
+  private final LeaseRecovery leaseRecovery;
+  private final Configuration conf;
+  private final FileSystem fs;
+  private final Path logDir;
+
+  private AtomicBoolean inSync = new AtomicBoolean(false);
+  private ArrayBlockingQueue<ByteSlot> slotsCache = null;
+  private Set<ProcedureWALFile> corruptedLogs = null;
+  private FSDataOutputStream stream = null;
+  private long totalSynced = 0;
+  private long flushLogId = 0;
+  private int slotIndex = 0;
+  private Thread syncThread;
+  private ByteSlot[] slots;
+  private int syncWaitMsec;
+
+  public WALProcedureStore(final Configuration conf, final FileSystem fs, 
final Path logDir,
+      final LeaseRecovery leaseRecovery) {
+    this.fs = fs;
+    this.conf = conf;
+    this.logDir = logDir;
+    this.leaseRecovery = leaseRecovery;
+  }
+
+  @Override
+  public void start(int numSlots) throws IOException {
+    if (running.getAndSet(true)) {
+      return;
+    }
+
+    // Init buffer slots
+    slots = new ByteSlot[numSlots];
+    slotsCache = new ArrayBlockingQueue(numSlots, true);
+    while (slotsCache.remainingCapacity() > 0) {
+      slotsCache.offer(new ByteSlot());
+    }
+
+    // Tunings
+    syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, 
DEFAULT_SYNC_WAIT_MSEC);
+
+    // Init sync thread
+    syncThread = new Thread() {
+      @Override
+      public void run() {
+        while (running.get()) {
+          try {
+            syncLoop();
+          } catch (IOException e) {
+            LOG.error("got an exception from the sync-loop", e);
+            sendAbortProcessSignal();
+          }
+        }
+      }
+    };
+    syncThread.start();
+  }
+
+  @Override
+  public void stop(boolean abort) {
+    if (!running.getAndSet(false)) {
+      return;
+    }
+
+    LOG.info("Stopping the WAL Procedure Store");
+    if (lock.tryLock()) {
+      try {
+        waitCond.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    if (!abort) {
+      try {
+        syncThread.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // Close the writer
+    closeStream();
+
+    // Close the old logs
+    // they should be already closed, this is just in case the load fails
+    // and we call start() and then stop()
+    for (ProcedureWALFile log: logs) {
+      log.close();
+    }
+    logs.clear();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  public ProcedureStoreTracker getStoreTracker() {
+    return storeTracker;
+  }
+
+  @Override
+  public void registerListener(ProcedureStoreListener listener) {
+    this.listeners.add(listener);
+  }
+
+  @Override
+  public boolean unregisterListener(ProcedureStoreListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  @Override
+  public void recoverLease() throws IOException {
+    LOG.info("Starting WAL Procedure Store lease recovery");
+    FileStatus[] oldLogs = getLogFiles();
+    while (running.get()) {
+      // Get Log-MaxID and recover lease on old logs
+      flushLogId = initOldLogs(oldLogs) + 1;
+
+      // Create new state-log
+      if (!rollWriter(flushLogId)) {
+        // someone else has already created this log
+        LOG.debug("someone else has already created log " + flushLogId);
+        continue;
+      }
+
+      // We have the lease on the log
+      oldLogs = getLogFiles();
+      if (getMaxLogId(oldLogs) > flushLogId) {
+        // Someone else created new logs
+        LOG.debug("someone else created new logs. expected maxLogId < " + 
flushLogId);
+        logs.getLast().removeFile();
+        continue;
+      }
+
+      LOG.info("lease acquired flushLogId=" + flushLogId);
+      break;
+    }
+  }
+
+  @Override
+  public Iterator<Procedure> load() throws IOException {
+    if (logs.isEmpty()) {
+      throw new RuntimeException("recoverLease() must be called before loading 
data");
+    }
+
+    // Nothing to do, If we have only the current log.
+    if (logs.size() == 1) {
+      LOG.debug("No state logs to replay");
+      return null;
+    }
+
+    // Load the old logs
+    final ArrayList<ProcedureWALFile> toRemove = new 
ArrayList<ProcedureWALFile>();
+    Iterator<ProcedureWALFile> it = logs.descendingIterator();
+    it.next(); // Skip the current log
+    try {
+      return ProcedureWALFormat.load(it, storeTracker, new 
ProcedureWALFormat.Loader() {
+        @Override
+        public void removeLog(ProcedureWALFile log) {
+          toRemove.add(log);
+        }
+
+        @Override
+        public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
+          if (corruptedLogs == null) {
+            corruptedLogs = new HashSet<ProcedureWALFile>();
+          }
+          corruptedLogs.add(log);
+          // TODO: sideline corrupted log
+        }
+      });
+    } finally {
+      if (!toRemove.isEmpty()) {
+        for (ProcedureWALFile log: toRemove) {
+          removeLogFile(log);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void insert(final Procedure proc, final Procedure[] subprocs) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs));
+    }
+
+    ByteSlot slot = acquireSlot();
+    long logId = -1;
+    try {
+      // Serialize the insert
+      if (subprocs != null) {
+        ProcedureWALFormat.writeInsert(slot, proc, subprocs);
+      } else {
+        assert !proc.hasParent();
+        ProcedureWALFormat.writeInsert(slot, proc);
+      }
+
+      // Push the transaction data and wait until it is persisted
+      logId = pushData(slot);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
+                " subprocs=" + Arrays.toString(subprocs), e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+
+    // Update the store tracker
+    synchronized (storeTracker) {
+      if (logId == flushLogId) {
+        storeTracker.insert(proc, subprocs);
+      }
+    }
+  }
+
+  @Override
+  public void update(final Procedure proc) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("update " + proc);
+    }
+
+    ByteSlot slot = acquireSlot();
+    long logId = -1;
+    try {
+      // Serialize the update
+      ProcedureWALFormat.writeUpdate(slot, proc);
+
+      // Push the transaction data and wait until it is persisted
+      logId = pushData(slot);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize the procedure: " + proc, e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+
+    // Update the store tracker
+    boolean removeOldLogs = false;
+    synchronized (storeTracker) {
+      if (logId == flushLogId) {
+        storeTracker.update(proc);
+        removeOldLogs = storeTracker.isUpdated();
+      }
+    }
+
+    if (removeOldLogs) {
+      removeAllLogs(logId - 1);
+    }
+  }
+
+  @Override
+  public void delete(final long procId) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("delete " + procId);
+    }
+
+    ByteSlot slot = acquireSlot();
+    long logId = -1;
+    try {
+      // Serialize the delete
+      ProcedureWALFormat.writeDelete(slot, procId);
+
+      // Push the transaction data and wait until it is persisted
+      logId = pushData(slot);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize the procedure: " + procId, e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+
+    boolean removeOldLogs = false;
+    synchronized (storeTracker) {
+      if (logId == flushLogId) {
+        storeTracker.delete(procId);
+        if (storeTracker.isEmpty()) {
+          removeOldLogs = rollWriterOrDie(logId + 1);
+        }
+      }
+    }
+
+    if (removeOldLogs) {
+      removeAllLogs(logId);
+    }
+  }
+
+  private ByteSlot acquireSlot() {
+    ByteSlot slot = slotsCache.poll();
+    return slot != null ? slot : new ByteSlot();
+  }
+
+  private void releaseSlot(final ByteSlot slot) {
+    slot.reset();
+    slotsCache.offer(slot);
+  }
+
+  private long pushData(final ByteSlot slot) {
+    assert !logs.isEmpty() : "recoverLease() must be called before inserting 
data";
+    long logId = -1;
+
+    lock.lock();
+    try {
+      // Wait for the sync to be completed
+      while (true) {
+        if (inSync.get()) {
+          syncCond.await();
+        } else if (slotIndex == slots.length) {
+          slotCond.signal();
+          syncCond.await();
+        } else {
+          break;
+        }
+      }
+
+      slots[slotIndex++] = slot;
+      logId = flushLogId;
+
+      // Notify that there is new data
+      if (slotIndex == 1) {
+        waitCond.signal();
+      }
+
+      // Notify that the slots are full
+      if (slotIndex == slots.length) {
+        slotCond.signal();
+      }
+      syncCond.await();
+    } catch (InterruptedException e) {
+      sendAbortProcessSignal();
+    } finally {
+      lock.unlock();
+    }
+    return logId;
+  }
+
+  private void syncLoop() throws IOException {
+    inSync.set(false);
+    while (running.get()) {
+      lock.lock();
+      try {
+        // Wait until new data is available
+        if (slotIndex == 0) {
+          LOG.debug("Waiting for data. flushed=" + 
StringUtils.humanSize(totalSynced));
+          waitCond.await();
+          if (slotIndex == 0) {
+            // no data.. probably a stop()
+            continue;
+          }
+        }
+
+        // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
+        slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
+
+        inSync.set(true);
+        totalSynced += syncSlots();
+        slotIndex = 0;
+        inSync.set(false);
+        syncCond.signalAll();
+      } catch (InterruptedException e) {
+        sendAbortProcessSignal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    try {
+      syncCond.signalAll();
+    } catch (Throwable e) {
+      LOG.debug("unable to send a signal to ones waiting in pushData(): " + 
e.getMessage(), e);
+    }
+  }
+
+  private long syncSlots() {
+    int retry = 0;
+    long totalSynced = 0;
+    do {
+      try {
+        totalSynced = syncSlots(stream, slots, 0, slotIndex);
+        break;
+      } catch (IOException e) {
+        if (++retry == MAX_RETRIES_BEFORE_ABORT) {
+          LOG.error("sync slot failed, abort.", e);
+          sendAbortProcessSignal();
+        }
+      }
+    } while (running.get());
+    return totalSynced;
+  }
+
+  protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int 
offset, int count)
+      throws IOException {
+    long totalSynced = 0;
+    for (int i = 0; i < count; ++i) {
+      ByteSlot data = slots[offset + i];
+      data.writeTo(stream);
+      totalSynced += data.size();
+    }
+    stream.hsync();
+    return totalSynced;
+  }
+
+  private void sendAbortProcessSignal() {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureStoreListener listener : this.listeners) {
+        listener.abortProcess();
+      }
+    }
+  }
+
+  private boolean rollWriterOrDie(final long logId) {
+    try {
+      return rollWriter(logId);
+    } catch (IOException e) {
+      LOG.warn("Unable to roll the log", e);
+      sendAbortProcessSignal();
+      return false;
+    }
+  }
+
+  private boolean rollWriter(final long logId) throws IOException {
+    ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
+      .setVersion(ProcedureWALFormat.HEADER_VERSION)
+      .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
+      .setMinProcId(storeTracker.getMinProcId())
+      .setLogId(logId)
+      .build();
+
+    FSDataOutputStream newStream = null;
+    Path newLogFile = null;
+    long startPos = -1;
+    try {
+      newLogFile = getLogFilePath(logId);
+      newStream = fs.create(newLogFile, false);
+      ProcedureWALFormat.writeHeader(newStream, header);
+      startPos = newStream.getPos();
+    } catch (FileAlreadyExistsException e) {
+      LOG.error("Log file with id=" + logId + " already exists", e);
+      return false;
+    }
+    lock.lock();
+    try {
+      closeStream();
+      synchronized (storeTracker) {
+        storeTracker.resetUpdates();
+      }
+      stream = newStream;
+      flushLogId = logId;
+      totalSynced = 0;
+      logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
+    } finally {
+      lock.unlock();
+    }
+    LOG.info("Roll new state log: " + logId);
+    return true;
+  }
+
+  private void closeStream() {
+    try {
+      if (stream != null) {
+        try {
+          ProcedureWALFormat.writeTrailer(stream, storeTracker);
+        } catch (IOException e) {
+          LOG.error("Unable to write the trailer", e);
+        }
+        stream.close();
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to close the stream", e);
+    } finally {
+      stream = null;
+    }
+  }
+
+  private void removeAllLogs(long lastLogId) {
+    LOG.info("Remove all state logs with ID less then " + lastLogId);
+    while (!logs.isEmpty()) {
+      ProcedureWALFile log = logs.getFirst();
+      if (lastLogId < log.getLogId()) {
+        break;
+      }
+
+      removeLogFile(log);
+    }
+  }
+
+  private boolean removeLogFile(final ProcedureWALFile log) {
+    try {
+      LOG.debug("remove log: " + log);
+      log.removeFile();
+      logs.remove(log);
+    } catch (IOException e) {
+      LOG.error("unable to remove log " + log, e);
+      return false;
+    }
+    return true;
+  }
+
+  public Set<ProcedureWALFile> getCorruptedLogs() {
+    return corruptedLogs;
+  }
+
+  // ==========================================================================
+  //  FileSystem Log Files helpers
+  // ==========================================================================
+  public Path getLogDir() {
+    return this.logDir;
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  protected Path getLogFilePath(final long logId) throws IOException {
+    return new Path(logDir, String.format("state-%020d.log", logId));
+  }
+
+  private static long getLogIdFromName(final String name) {
+    int end = name.lastIndexOf(".log");
+    int start = name.lastIndexOf('-') + 1;
+    while (start < end) {
+      if (name.charAt(start) != '0')
+        break;
+      start++;
+    }
+    return Long.parseLong(name.substring(start, end));
+  }
+
+  private FileStatus[] getLogFiles() throws IOException {
+    try {
+      return fs.listStatus(logDir, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          String name = path.getName();
+          return name.startsWith("state-") && name.endsWith(".log");
+        }
+      });
+    } catch (FileNotFoundException e) {
+      LOG.warn("log directory not found: " + e.getMessage());
+      return null;
+    }
+  }
+
+  private long getMaxLogId(final FileStatus[] logFiles) {
+    long maxLogId = 0;
+    if (logFiles != null && logFiles.length > 0) {
+      for (int i = 0; i < logFiles.length; ++i) {
+        maxLogId = Math.max(maxLogId, 
getLogIdFromName(logFiles[i].getPath().getName()));
+      }
+    }
+    return maxLogId;
+  }
+
+  /**
+   * @return Max-LogID of the specified log file set
+   */
+  private long initOldLogs(final FileStatus[] logFiles) throws IOException {
+    this.logs.clear();
+
+    long maxLogId = 0;
+    if (logFiles != null && logFiles.length > 0) {
+      for (int i = 0; i < logFiles.length; ++i) {
+        final Path logPath = logFiles[i].getPath();
+        leaseRecovery.recoverFileLease(fs, logPath);
+        maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
+
+        ProcedureWALFile log = initOldLog(logFiles[i]);
+        if (log != null) {
+          this.logs.add(log);
+        }
+      }
+      Collections.sort(this.logs);
+      initTrackerFromOldLogs();
+    }
+    return maxLogId;
+  }
+
+  private void initTrackerFromOldLogs() {
+    // TODO: Load the most recent tracker available
+    if (!logs.isEmpty()) {
+      ProcedureWALFile log = logs.getLast();
+      try {
+        log.readTracker(storeTracker);
+      } catch (IOException e) {
+        LOG.error("Unable to read tracker for " + log, e);
+        // try the next one...
+        storeTracker.clear();
+        storeTracker.setPartialFlag(true);
+      }
+    }
+  }
+
+  private ProcedureWALFile initOldLog(final FileStatus logFile) throws 
IOException {
+    ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
+    if (logFile.getLen() == 0) {
+      LOG.warn("Remove uninitialized log " + logFile);
+      log.removeFile();
+      return null;
+    }
+
+    LOG.debug("opening state-log: " + logFile);
+    try {
+      log.open();
+    } catch (ProcedureWALFormat.InvalidWALDataException e) {
+      LOG.warn("Remove uninitialized log " + logFile, e);
+      log.removeFile();
+      return null;
+    } catch (IOException e) {
+      String msg = "Unable to read state log: " + logFile;
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (log.isCompacted()) {
+      try {
+        log.readTrailer();
+      } catch (IOException e) {
+        // unfinished compacted log throw it away
+        LOG.warn("Unfinished compacted log " + logFile, e);
+        log.removeFile();
+        return null;
+      }
+    }
+    return log;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
new file mode 100644
index 0000000..8904116
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Similar to the ByteArrayOutputStream, with the exception that we can 
prepend an header.
+ * e.g. you write some data and you want to prepend an header that contains 
the data len or cksum.
+ * <code>
+ * ByteSlot slot = new ByteSlot();
+ * // write data
+ * slot.write(...);
+ * slot.write(...);
+ * // write header with the size of the written data
+ * slot.markHead();
+ * slot.write(Bytes.toBytes(slot.size()));
+ * // flush to stream as [header, data]
+ * slot.writeTo(stream);
+ * </code>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ByteSlot extends OutputStream {
+  private static final int DOUBLE_GROW_LIMIT = 1 << 20;
+  private static final int GROW_ALIGN = 128;
+
+  private byte[] buf;
+  private int head;
+  private int size;
+
+  public void reset() {
+    head = 0;
+    size = 0;
+  }
+
+  public void markHead() {
+    head = size;
+  }
+
+  public int getHead() {
+    return head;
+  }
+
+  public int size() {
+    return size;
+  }
+
+  public byte[] getBuffer() {
+    return buf;
+  }
+
+  public void writeAt(int offset, int b) {
+    head = Math.min(head, offset);
+    buf[offset] = (byte)b;
+  }
+
+  public void write(int b) {
+    ensureCapacity(size + 1);
+    buf[size++] = (byte)b;
+  }
+
+  public void write(byte[] b, int off, int len) {
+    ensureCapacity(size + len);
+    System.arraycopy(b, off, buf, size, len);
+    size += len;
+  }
+
+  public void writeTo(final OutputStream stream) throws IOException {
+    if (head != 0) {
+      stream.write(buf, head, size - head);
+      stream.write(buf, 0, head);
+    } else {
+      stream.write(buf, 0, size);
+    }
+  }
+
+  private void ensureCapacity(int minCapacity) {
+    minCapacity = (minCapacity + (GROW_ALIGN - 1)) & -GROW_ALIGN;
+    if (buf == null) {
+      buf = new byte[minCapacity];
+    } else if (minCapacity > buf.length) {
+      int newCapacity = buf.length << 1;
+      if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) {
+        newCapacity = minCapacity;
+      }
+      buf = Arrays.copyOf(buf, newCapacity);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
new file mode 100644
index 0000000..97134c2
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class StringUtils {
+  private StringUtils() {}
+
+  public static String humanTimeDiff(long timeDiff) {
+    StringBuilder buf = new StringBuilder();
+    long hours = timeDiff / (60*60*1000);
+    long rem = (timeDiff % (60*60*1000));
+    long minutes =  rem / (60*1000);
+    rem = rem % (60*1000);
+    float seconds = rem / 1000.0f;
+
+    if (hours != 0){
+      buf.append(hours);
+      buf.append("hrs, ");
+    }
+    if (minutes != 0){
+      buf.append(minutes);
+      buf.append("mins, ");
+    }
+    if (hours > 0 || minutes > 0) {
+      buf.append(seconds);
+      buf.append("sec");
+    } else {
+      buf.append(String.format("%.4fsec", seconds));
+    }
+    return buf.toString();
+  }
+
+  public static String humanSize(double size) {
+    if (size >= (1L << 40)) return String.format("%.1fT", size / (1L << 40));
+    if (size >= (1L << 30)) return String.format("%.1fG", size / (1L << 30));
+    if (size >= (1L << 20)) return String.format("%.1fM", size / (1L << 20));
+    if (size >= (1L << 10)) return String.format("%.1fK", size / (1L << 10));
+    return String.format("%.0f", size);
+  }
+
+  public static boolean isEmpty(final String input) {
+    return input == null || input.length() == 0;
+  }
+
+  public static String buildString(final String... parts) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < parts.length; ++i) {
+      sb.append(parts[i]);
+    }
+    return sb.toString();
+  }
+
+  public static StringBuilder appendStrings(final StringBuilder sb, final 
String... parts) {
+    for (int i = 0; i < parts.length; ++i) {
+      sb.append(parts[i]);
+    }
+    return sb;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
new file mode 100644
index 0000000..368d26f
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TimeoutBlockingQueue<E> {
+  public static interface TimeoutRetriever<T> {
+    long getTimeout(T object);
+    TimeUnit getTimeUnit(T object);
+  }
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition waitCond = lock.newCondition();
+  private final TimeoutRetriever<? super E> timeoutRetriever;
+
+  private E[] objects;
+  private int head = 0;
+  private int tail = 0;
+
+  public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
+    this(32, timeoutRetriever);
+  }
+
+  @SuppressWarnings("unchecked")
+  public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> 
timeoutRetriever) {
+    this.objects = (E[])new Object[capacity];
+    this.timeoutRetriever = timeoutRetriever;
+  }
+
+  public void dump() {
+    for (int i = 0; i < objects.length; ++i) {
+      if (i == head) {
+        System.out.print("[" + objects[i] + "] ");
+      } else if (i == tail) {
+        System.out.print("]" + objects[i] + "[ ");
+      } else {
+        System.out.print(objects[i] + " ");
+      }
+    }
+    System.out.println();
+  }
+
+  public void clear() {
+    lock.lock();
+    try {
+      if (head != tail) {
+        for (int i = head; i < tail; ++i) {
+          objects[i] = null;
+        }
+        head = 0;
+        tail = 0;
+        waitCond.signal();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void add(E e) {
+    if (e == null) throw new NullPointerException();
+
+    lock.lock();
+    try {
+      addElement(e);
+      waitCond.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  public E poll() {
+    lock.lock();
+    try {
+      if (isEmpty()) {
+        waitCond.await();
+        return null;
+      }
+
+      E elem = objects[head];
+      long nanos = getNanosTimeout(elem);
+      nanos = waitCond.awaitNanos(nanos);
+      return nanos > 0 ? null : removeFirst();
+    } catch (InterruptedException e) {
+      return null;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public int size() {
+    return tail - head;
+  }
+
+  public boolean isEmpty() {
+    return (tail - head) == 0;
+  }
+
+  public void signalAll() {
+    lock.lock();
+    try {
+      waitCond.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void addElement(E elem) {
+    int size = (tail - head);
+    if ((objects.length - size) == 0) {
+      int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
+      E[] newObjects = (E[])new Object[capacity];
+
+      if (compareTimeouts(objects[tail - 1], elem) <= 0) {
+        // Append
+        System.arraycopy(objects, head, newObjects, 0, tail);
+        tail -= head;
+        newObjects[tail++] = elem;
+      } else if (compareTimeouts(objects[head], elem) > 0) {
+        // Prepend
+        System.arraycopy(objects, head, newObjects, 1, tail);
+        newObjects[0] = elem;
+        tail -= (head - 1);
+      } else {
+        // Insert in the middle
+        int index = upperBound(head, tail - 1, elem);
+        int newIndex = (index - head);
+        System.arraycopy(objects, head, newObjects, 0, newIndex);
+        newObjects[newIndex] = elem;
+        System.arraycopy(objects, index, newObjects, newIndex + 1, tail - 
index);
+        tail -= (head - 1);
+      }
+      head = 0;
+      objects = newObjects;
+    } else {
+      if (tail == objects.length) {
+        // shift down |-----AAAAAAA|
+        tail -= head;
+        System.arraycopy(objects, head, objects, 0, tail);
+        head = 0;
+      }
+
+      if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
+        // Append
+        objects[tail++] = elem;
+      } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
+        // Prepend
+        objects[--head] = elem;
+      } else {
+        // Insert in the middle
+        int index = upperBound(head, tail - 1, elem);
+        System.arraycopy(objects, index, objects, index + 1, tail - index);
+        objects[index] = elem;
+        tail++;
+      }
+    }
+  }
+
+  private E removeFirst() {
+    E elem = objects[head];
+    objects[head] = null;
+    head = (head + 1) % objects.length;
+    if (head == 0) tail = 0;
+    return elem;
+  }
+
+  private int upperBound(int start, int end, E key) {
+    while (start < end) {
+      int mid = (start + end) >>> 1;
+      E mitem = objects[mid];
+      int cmp = compareTimeouts(mitem, key);
+      if (cmp > 0) {
+        end = mid;
+      } else {
+        start = mid + 1;
+      }
+    }
+    return start;
+  }
+
+  private int compareTimeouts(final E a, final E b) {
+    long t1 = getNanosTimeout(a);
+    long t2 = getNanosTimeout(b);
+    return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
+  }
+
+  private long getNanosTimeout(final E obj) {
+    TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
+    long timeout = timeoutRetriever.getTimeout(obj);
+    return unit.toNanos(timeout);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
new file mode 100644
index 0000000..aba28f9
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ProcedureTestingUtility {
+  private static final Log LOG = 
LogFactory.getLog(ProcedureTestingUtility.class);
+
+  private ProcedureTestingUtility() {
+  }
+
+  public static ProcedureStore createStore(final Configuration conf, final 
FileSystem fs,
+      final Path baseDir) throws IOException {
+    return createWalStore(conf, fs, baseDir);
+  }
+
+  public static WALProcedureStore createWalStore(final Configuration conf, 
final FileSystem fs,
+      final Path logDir) throws IOException {
+    return new WALProcedureStore(conf, fs, logDir, new 
WALProcedureStore.LeaseRecovery() {
+      @Override
+      public void recoverFileLease(FileSystem fs, Path path) throws 
IOException {
+        // no-op
+      }
+    });
+  }
+
+  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
+      throws Exception {
+    ProcedureStore procStore = procExecutor.getStore();
+    int nthreads = procExecutor.getNumThreads();
+    procExecutor.stop();
+    procStore.stop(false);
+    procExecutor.join();
+    procStore.start(nthreads);
+    procExecutor.start(nthreads);
+  }
+
+  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> 
procExecutor,
+      boolean value) {
+    if (procExecutor.testing == null) {
+      procExecutor.testing = new ProcedureExecutor.Testing();
+    }
+    procExecutor.testing.killBeforeStoreUpdate = value;
+    LOG.warn("Set Kill before store update to: " + 
procExecutor.testing.killBeforeStoreUpdate);
+  }
+
+  public static <TEnv> void 
setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+      boolean value) {
+    if (procExecutor.testing == null) {
+      procExecutor.testing = new ProcedureExecutor.Testing();
+    }
+    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
+  }
+
+  public static <TEnv> void 
toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
+    if (procExecutor.testing == null) {
+      procExecutor.testing = new ProcedureExecutor.Testing();
+    }
+    procExecutor.testing.killBeforeStoreUpdate = 
!procExecutor.testing.killBeforeStoreUpdate;
+    LOG.warn("Set Kill before store update to: " + 
procExecutor.testing.killBeforeStoreUpdate);
+  }
+
+  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> 
procExecutor, Procedure proc) {
+    long procId = procExecutor.submitProcedure(proc);
+    waitProcedure(procExecutor, procId);
+    return procId;
+  }
+
+  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> 
procExecutor, long procId) {
+    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
+      Threads.sleepWithoutInterrupt(250);
+    }
+  }
+
+  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> 
procExecutor,
+      long procId) {
+    assertFalse(procExecutor.isFinished(procId));
+    assertEquals(null, procExecutor.getResult(procId));
+  }
+
+  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> 
procExecutor,
+      long procId) {
+    ProcedureResult result = procExecutor.getResult(procId);
+    assertTrue(result != null);
+    assertProcNotFailed(result);
+  }
+
+  public static void assertProcNotFailed(final ProcedureResult result) {
+    Exception exception = result.getException();
+    String msg = exception != null ? exception.toString() : "no exception 
found";
+    assertFalse(msg, result.isFailed());
+  }
+
+  public static void assertIsAbortException(final ProcedureResult result) {
+    LOG.info(result.getException());
+    assertTrue(result.isFailed());
+    assertTrue(result.getException().getCause() instanceof 
ProcedureAbortedException);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
new file mode 100644
index 0000000..9ff8eb7
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import 
org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureExecution {
+  private static final Log LOG = 
LogFactory.getLog(TestProcedureExecution.class);
+
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private ProcedureExecutor<Void> procExecutor;
+  private ProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), 
fs, logDir);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, 
procStore);
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  private static class TestProcedureException extends Exception {
+    public TestProcedureException(String msg) { super(msg); }
+  }
+
+  public static class TestSequentialProcedure extends 
SequentialProcedure<Void> {
+    private final Procedure[] subProcs;
+    private final List<String> state;
+    private final Exception failure;
+    private final String name;
+
+    public TestSequentialProcedure() {
+      throw new UnsupportedOperationException("recovery should not be 
triggered here");
+    }
+
+    public TestSequentialProcedure(String name, List<String> state, 
Procedure... subProcs) {
+      this.state = state;
+      this.subProcs = subProcs;
+      this.name = name;
+      this.failure = null;
+    }
+
+    public TestSequentialProcedure(String name, List<String> state, Exception 
failure) {
+      this.state = state;
+      this.subProcs = null;
+      this.name = name;
+      this.failure = failure;
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      state.add(name + "-execute");
+      if (failure != null) {
+        setFailure(new RemoteProcedureException(name + "-failure", failure));
+        return null;
+      }
+      return subProcs;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      state.add(name + "-rollback");
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      state.add(name + "-abort");
+      return true;
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testBadSubprocList() {
+    List<String> state = new ArrayList<String>();
+    Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
+    Procedure subProc1 = new TestSequentialProcedure("subProc1", state, 
subProc2, NULL_PROC);
+    Procedure rootProc = new TestSequentialProcedure("rootProc", state, 
subProc1);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, 
rootProc);
+
+    // subProc1 has a "null" subprocedure which is catched as InvalidArgument
+    // failed state with 2 execute and 2 rollback
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+      result.getException().getCause() instanceof IllegalArgumentException);
+
+    assertEquals(state.toString(), 4, state.size());
+    assertEquals("rootProc-execute", state.get(0));
+    assertEquals("subProc1-execute", state.get(1));
+    assertEquals("subProc1-rollback", state.get(2));
+    assertEquals("rootProc-rollback", state.get(3));
+  }
+
+  @Test(timeout=30000)
+  public void testSingleSequentialProc() {
+    List<String> state = new ArrayList<String>();
+    Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
+    Procedure subProc1 = new TestSequentialProcedure("subProc1", state, 
subProc2);
+    Procedure rootProc = new TestSequentialProcedure("rootProc", state, 
subProc1);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, 
rootProc);
+
+    // successful state, with 3 execute
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(state.toString(), 3, state.size());
+  }
+
+  @Test(timeout=30000)
+  public void testSingleSequentialProcRollback() {
+    List<String> state = new ArrayList<String>();
+    Procedure subProc2 = new TestSequentialProcedure("subProc2", state,
+                                                     new 
TestProcedureException("fail test"));
+    Procedure subProc1 = new TestSequentialProcedure("subProc1", state, 
subProc2);
+    Procedure rootProc = new TestSequentialProcedure("rootProc", state, 
subProc1);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, 
rootProc);
+
+    // the 3rd proc fail, rollback after 2 successful execution
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+      result.getException().getCause() instanceof TestProcedureException);
+
+    assertEquals(state.toString(), 6, state.size());
+    assertEquals("rootProc-execute", state.get(0));
+    assertEquals("subProc1-execute", state.get(1));
+    assertEquals("subProc2-execute", state.get(2));
+    assertEquals("subProc2-rollback", state.get(3));
+    assertEquals("subProc1-rollback", state.get(4));
+    assertEquals("rootProc-rollback", state.get(5));
+  }
+
+  public static class TestWaitingProcedure extends SequentialProcedure<Void> {
+    private final List<String> state;
+    private final boolean hasChild;
+    private final String name;
+
+    public TestWaitingProcedure() {
+      throw new UnsupportedOperationException("recovery should not be 
triggered here");
+    }
+
+    public TestWaitingProcedure(String name, List<String> state, boolean 
hasChild) {
+      this.hasChild = hasChild;
+      this.state = state;
+      this.name = name;
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      state.add(name + "-execute");
+      setState(ProcedureState.WAITING_TIMEOUT);
+      return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : 
null;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      state.add(name + "-rollback");
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      state.add(name + "-abort");
+      return true;
+    }
+
+    public static class TestWaitChild extends SequentialProcedure<Void> {
+      private final List<String> state;
+      private final String name;
+
+      public TestWaitChild() {
+        throw new UnsupportedOperationException("recovery should not be 
triggered here");
+      }
+
+      public TestWaitChild(String name, List<String> state) {
+        this.name = name;
+        this.state = state;
+      }
+
+      @Override
+      protected Procedure[] execute(Void env) {
+        state.add(name + "-child-execute");
+        return null;
+      }
+
+      @Override
+      protected void rollback(Void env) {
+        state.add(name + "-child-rollback");
+      }
+
+      @Override
+      protected boolean abort(Void env) {
+        state.add(name + "-child-abort");
+        return true;
+      }
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testAbortTimeout() {
+    final int PROC_TIMEOUT_MSEC = 2500;
+    List<String> state = new ArrayList<String>();
+    Procedure proc = new TestWaitingProcedure("wproc", state, false);
+    proc.setTimeout(PROC_TIMEOUT_MSEC);
+    long startTime = EnvironmentEdgeManager.currentTime();
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    long execTime = EnvironmentEdgeManager.currentTime() - startTime;
+    LOG.info(state);
+    assertTrue("we didn't wait enough execTime=" + execTime, execTime >= 
PROC_TIMEOUT_MSEC);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+               result.getException().getCause() instanceof TimeoutException);
+    assertEquals(state.toString(), 2, state.size());
+    assertEquals("wproc-execute", state.get(0));
+    assertEquals("wproc-rollback", state.get(1));
+  }
+
+  @Test(timeout=30000)
+  public void testAbortTimeoutWithChildren() {
+    List<String> state = new ArrayList<String>();
+    Procedure proc = new TestWaitingProcedure("wproc", state, true);
+    proc.setTimeout(2500);
+    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    LOG.info(state);
+    ProcedureResult result = procExecutor.getResult(rootId);
+    LOG.info(result.getException());
+    assertTrue(state.toString(), result.isFailed());
+    assertTrue(result.getException().toString(),
+               result.getException().getCause() instanceof TimeoutException);
+    assertEquals(state.toString(), 4, state.size());
+    assertEquals("wproc-execute", state.get(0));
+    assertEquals("wproc-child-execute", state.get(1));
+    assertEquals("wproc-child-rollback", state.get(2));
+    assertEquals("wproc-rollback", state.get(3));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
new file mode 100644
index 0000000..d654910
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureRecovery {
+  private static final Log LOG = 
LogFactory.getLog(TestProcedureRecovery.class);
+
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private static ProcedureExecutor<Void> procExecutor;
+  private static ProcedureStore procStore;
+  private static int procSleepInterval;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), 
fs, logDir);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, 
procStore);
+    procExecutor.testing = new ProcedureExecutor.Testing();
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+    procSleepInterval = 0;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  private void restart() throws Exception {
+    dumpLogDirState();
+    ProcedureTestingUtility.restart(procExecutor);
+    dumpLogDirState();
+  }
+
+  public static class TestSingleStepProcedure extends 
SequentialProcedure<Void> {
+    private int step = 0;
+
+    public TestSingleStepProcedure() { }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      LOG.debug("execute procedure " + this + " step=" + step);
+      step++;
+      setResult(Bytes.toBytes(step));
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) { }
+
+    @Override
+    protected boolean abort(Void env) { return true; }
+  }
+
+  public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+    private AtomicBoolean abort = new AtomicBoolean(false);
+    private int step = 0;
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      LOG.debug("execute procedure " + this + " step=" + step);
+      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
+      step++;
+      Threads.sleepWithoutInterrupt(procSleepInterval);
+      if (isAborted()) {
+        setFailure(new RemoteProcedureException(getClass().getName(),
+          new ProcedureAbortedException(
+            "got an abort at " + getClass().getName() + " step=" + step)));
+        return null;
+      }
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      LOG.debug("rollback procedure " + this + " step=" + step);
+      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
+      step++;
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      abort.set(true);
+      return true;
+    }
+
+    private boolean isAborted() {
+      boolean aborted = abort.get();
+      BaseTestStepProcedure proc = this;
+      while (proc.hasParent() && !aborted) {
+        proc = 
(BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
+        aborted = proc.isAborted();
+      }
+      return aborted;
+    }
+  }
+
+  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
+    public TestMultiStepProcedure() { }
+
+    @Override
+    public Procedure[] execute(Void env) {
+      super.execute(env);
+      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
+    }
+
+    public static class Step1Procedure extends BaseTestStepProcedure {
+      public Step1Procedure() { }
+
+      @Override
+      protected Procedure[] execute(Void env) {
+        super.execute(env);
+        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
+      }
+    }
+
+    public static class Step2Procedure extends BaseTestStepProcedure {
+      public Step2Procedure() { }
+    }
+  }
+
+  @Test
+  public void testNoopLoad() throws Exception {
+    restart();
+  }
+
+  @Test(timeout=30000)
+  public void testSingleStepProcRecovery() throws Exception {
+    Procedure proc = new TestSingleStepProcedure();
+    procExecutor.testing.killBeforeStoreUpdate = true;
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+    procExecutor.testing.killBeforeStoreUpdate = false;
+
+    // Restart and verify that the procedures restart
+    long restartTs = EnvironmentEdgeManager.currentTime();
+    restart();
+    waitProcedure(procId);
+    ProcedureResult result = procExecutor.getResult(procId);
+    assertTrue(result.getLastUpdate() > restartTs);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(1, Bytes.toInt(result.getResult()));
+    long resultTs = result.getLastUpdate();
+
+    // Verify that after another restart the result is still there
+    restart();
+    result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(resultTs, result.getLastUpdate());
+    assertEquals(1, Bytes.toInt(result.getResult()));
+  }
+
+  @Test(timeout=30000)
+  public void testMultiStepProcRecovery() throws Exception {
+    // Step 0 - kill
+    Procedure proc = new TestMultiStepProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 0 exec && Step 1 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec
+    restart();
+    waitProcedure(procId);
+    assertTrue(procExecutor.isRunning());
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+  }
+
+  @Test(timeout=30000)
+  public void testMultiStepRollbackRecovery() throws Exception {
+    // Step 0 - kill
+    Procedure proc = new TestMultiStepProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 0 exec && Step 1 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec - rollback - kill
+    procSleepInterval = 2500;
+    restart();
+    assertTrue(procExecutor.abort(procId));
+    waitProcedure(procId);
+    assertFalse(procExecutor.isRunning());
+
+    // rollback - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // rollback - complete
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Restart the executor and get the result
+    restart();
+    waitProcedure(procId);
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertIsAbortException(result);
+  }
+
+  public static class TestStateMachineProcedure
+      extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+    enum State { STATE_1, STATE_2, STATE_3, DONE }
+
+    public TestStateMachineProcedure() {}
+
+    private AtomicBoolean aborted = new AtomicBoolean(false);
+    private int iResult = 0;
+
+    @Override
+    protected StateMachineProcedure.Flow executeFromState(Void env, State 
state) {
+      if (state == null) {
+        LOG.info("Initializing " + this);
+        state = State.STATE_1;
+        setNextState(state);
+      }
+
+      switch (state) {
+        case STATE_1:
+          LOG.info("execute step 1 " + this);
+          setNextState(State.STATE_2);
+          iResult += 3;
+          break;
+        case STATE_2:
+          LOG.info("execute step 2 " + this);
+          setNextState(State.STATE_3);
+          iResult += 5;
+          break;
+        case STATE_3:
+          LOG.info("execute step 3 " + this);
+          Threads.sleepWithoutInterrupt(procSleepInterval);
+          if (aborted.get()) {
+            LOG.info("aborted step 3 " + this);
+            setAbortFailure("test", "aborted");
+            break;
+          }
+          setNextState(State.DONE);
+          iResult += 7;
+          setResult(Bytes.toBytes(iResult));
+          return Flow.NO_MORE_STATE;
+        default:
+          throw new UnsupportedOperationException();
+      }
+      return Flow.HAS_MORE_STATE;
+    }
+
+    @Override
+    protected void rollbackState(Void env, final State state) {
+      switch (state) {
+        case STATE_1:
+          LOG.info("rollback step 1 " + this);
+          break;
+        case STATE_2:
+          LOG.info("rollback step 2 " + this);
+          break;
+        case STATE_3:
+          LOG.info("rollback step 3 " + this);
+          break;
+        default:
+          throw new UnsupportedOperationException();
+      }
+    }
+
+    @Override
+    protected State getState(final int stateId) {
+      return State.values()[stateId];
+    }
+
+    private void setNextState(final State state) {
+      setNextState(state.ordinal());
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      aborted.set(true);
+      return true;
+    }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws 
IOException {
+      super.serializeStateData(stream);
+      stream.write(Bytes.toBytes(iResult));
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws 
IOException {
+      super.deserializeStateData(stream);
+      byte[] data = new byte[4];
+      stream.read(data);
+      iResult = Bytes.toInt(data);
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testStateMachineRecovery() throws Exception {
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
+
+    // Step 1 - kill
+    Procedure proc = new TestStateMachineProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && Step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec && step 3 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 3 exec
+    restart();
+    waitProcedure(procId);
+    assertTrue(procExecutor.isRunning());
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+    assertEquals(15, Bytes.toInt(result.getResult()));
+  }
+
+  @Test(timeout=30000)
+  public void testStateMachineRollbackRecovery() throws Exception {
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
+
+    // Step 1 - kill
+    Procedure proc = new TestStateMachineProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 1 exec && Step 2 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 2 exec && step 3 - kill
+    restart();
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Step 3 exec - rollback step 3 - kill
+    procSleepInterval = 2500;
+    restart();
+    assertTrue(procExecutor.abort(procId));
+    waitProcedure(procId);
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+    assertFalse(procExecutor.isRunning());
+
+    // Rollback step 3 - rollback step 2 - kill
+    restart();
+    waitProcedure(procId);
+    assertFalse(procExecutor.isRunning());
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+
+    // Rollback step 2 - step 1 - kill
+    restart();
+    waitProcedure(procId);
+    assertFalse(procExecutor.isRunning());
+    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
+
+    // Rollback step 1 - complete
+    restart();
+    waitProcedure(procId);
+    assertTrue(procExecutor.isRunning());
+
+    // The procedure is completed
+    ProcedureResult result = procExecutor.getResult(procId);
+    ProcedureTestingUtility.assertIsAbortException(result);
+  }
+
+  private void waitProcedure(final long procId) {
+    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
+    dumpLogDirState();
+  }
+
+  private void dumpLogDirState() {
+    try {
+      FileStatus[] files = fs.listStatus(logDir);
+      if (files != null && files.length > 0) {
+        for (FileStatus file: files) {
+          assertTrue(file.toString(), file.isFile());
+          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
+        }
+      } else {
+        LOG.debug("no files under: " + logDir);
+      }
+    } catch (IOException e) {
+      LOG.warn("Unable to dump " + logDir, e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java
new file mode 100644
index 0000000..c040be6
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureRunQueues {
+  class TestRunQueue implements ProcedureFairRunQueues.FairObject {
+    private final String name;
+    private final int priority;
+
+    public TestRunQueue(String name, int priority) {
+      this.name = name;
+      this.priority = priority;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+
+    @Override
+    public boolean isAvailable() {
+      return true;
+    }
+
+    @Override
+    public int getPriority() {
+      return priority;
+    }
+  }
+
+  @Test
+  public void testFairQueues() throws Exception {
+    ProcedureFairRunQueues<String, TestRunQueue> fairq
+      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
+    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
+    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
+    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
+
+    for (int i = 0; i < 3; ++i) {
+      assertEquals(a, fairq.poll());
+      assertEquals(b, fairq.poll());
+      assertEquals(m, fairq.poll());
+      assertEquals(m, fairq.poll());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
new file mode 100644
index 0000000..0669549
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureStoreTracker {
+  private static final Log LOG = 
LogFactory.getLog(TestProcedureStoreTracker.class);
+
+  static class TestProcedure extends Procedure<Void> {
+    public TestProcedure(long procId) {
+      setProcId(procId);
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) { return null; }
+
+    @Override
+    protected void rollback(Void env) { /* no-op */ }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) { /* no-op */ 
}
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) { /* no-op 
*/ }
+  }
+
+  @Test
+  public void testSeqInsertAndDelete() {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    assertTrue(tracker.isEmpty());
+
+    final int MIN_PROC = 1;
+    final int MAX_PROC = 1 << 10;
+
+    // sequential insert
+    for (int i = MIN_PROC; i < MAX_PROC; ++i) {
+      tracker.insert(i);
+
+      // All the proc that we inserted should not be deleted
+      for (int j = MIN_PROC; j <= i; ++j) {
+        assertEquals(ProcedureStoreTracker.DeleteState.NO, 
tracker.isDeleted(j));
+      }
+      // All the proc that are not yet inserted should be result as deleted
+      for (int j = i + 1; j < MAX_PROC; ++j) {
+        assertTrue(tracker.isDeleted(j) != 
ProcedureStoreTracker.DeleteState.NO);
+      }
+    }
+
+    // sequential delete
+    for (int i = MIN_PROC; i < MAX_PROC; ++i) {
+      tracker.delete(i);
+
+      // All the proc that we deleted should be deleted
+      for (int j = MIN_PROC; j <= i; ++j) {
+        assertEquals(ProcedureStoreTracker.DeleteState.YES, 
tracker.isDeleted(j));
+      }
+      // All the proc that are not yet deleted should be result as not deleted
+      for (int j = i + 1; j < MAX_PROC; ++j) {
+        assertEquals(ProcedureStoreTracker.DeleteState.NO, 
tracker.isDeleted(j));
+      }
+    }
+    assertTrue(tracker.isEmpty());
+  }
+
+  @Test
+  public void testPartialTracker() {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    tracker.setPartialFlag(true);
+
+    // nothing in the tracker, the state is unknown
+    assertTrue(tracker.isEmpty());
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(1));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(579));
+
+    // Mark 1 as deleted, now that is a known state
+    tracker.setDeleted(1, true);
+    tracker.dump();
+    assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(2));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(579));
+
+    // Mark 579 as non-deleted, now that is a known state
+    tracker.setDeleted(579, false);
+    assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(2));
+    assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(577));
+    assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(580));
+  }
+
+  @Test
+  public void testBasicCRUD() {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    assertTrue(tracker.isEmpty());
+
+    Procedure[] procs = new TestProcedure[] {
+      new TestProcedure(1), new TestProcedure(2), new TestProcedure(3),
+      new TestProcedure(4), new TestProcedure(5), new TestProcedure(6),
+    };
+
+    tracker.insert(procs[0], null);
+    tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] });
+    assertFalse(tracker.isEmpty());
+    assertTrue(tracker.isUpdated());
+
+    tracker.resetUpdates();
+    assertFalse(tracker.isUpdated());
+
+    for (int i = 0; i < 4; ++i) {
+      tracker.update(procs[i]);
+      assertFalse(tracker.isEmpty());
+      assertFalse(tracker.isUpdated());
+    }
+
+    tracker.update(procs[4]);
+    assertFalse(tracker.isEmpty());
+    assertTrue(tracker.isUpdated());
+
+    tracker.update(procs[5]);
+    assertFalse(tracker.isEmpty());
+    assertTrue(tracker.isUpdated());
+
+    for (int i = 0; i < 5; ++i) {
+      tracker.delete(procs[i].getProcId());
+      assertFalse(tracker.isEmpty());
+      assertTrue(tracker.isUpdated());
+    }
+    tracker.delete(procs[5].getProcId());
+    assertTrue(tracker.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
new file mode 100644
index 0000000..344b28b
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestWALProcedureStore {
+  private static final Log LOG = 
LogFactory.getLog(TestWALProcedureStore.class);
+
+  private static final int PROCEDURE_STORE_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private WALProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), 
fs, logDir);
+    procStore.start(PROCEDURE_STORE_SLOTS);
+    procStore.recoverLease();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  private Iterator<Procedure> storeRestart() throws Exception {
+    procStore.stop(false);
+    procStore.start(PROCEDURE_STORE_SLOTS);
+    procStore.recoverLease();
+    return procStore.load();
+  }
+
+  @Test
+  public void testEmptyLogLoad() throws Exception {
+    Iterator<Procedure> loader = storeRestart();
+    assertEquals(0, countProcedures(loader));
+  }
+
+  @Test
+  public void testLoad() throws Exception {
+    Set<Long> procIds = new HashSet<>();
+
+    // Insert something in the log
+    Procedure proc1 = new TestSequentialProcedure();
+    procIds.add(proc1.getProcId());
+    procStore.insert(proc1, null);
+
+    Procedure proc2 = new TestSequentialProcedure();
+    Procedure[] child2 = new Procedure[2];
+    child2[0] = new TestSequentialProcedure();
+    child2[1] = new TestSequentialProcedure();
+
+    procIds.add(proc2.getProcId());
+    procIds.add(child2[0].getProcId());
+    procIds.add(child2[1].getProcId());
+    procStore.insert(proc2, child2);
+
+    // Verify that everything is there
+    verifyProcIdsOnRestart(procIds);
+
+    // Update and delete something
+    procStore.update(proc1);
+    procStore.update(child2[1]);
+    procStore.delete(child2[1].getProcId());
+    procIds.remove(child2[1].getProcId());
+
+    // Verify that everything is there
+    verifyProcIdsOnRestart(procIds);
+
+    // Remove 4 byte from the trailers
+    procStore.stop(false);
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(3, logs.length);
+    for (int i = 0; i < logs.length; ++i) {
+      corruptLog(logs[i], 4);
+    }
+    verifyProcIdsOnRestart(procIds);
+  }
+
+  @Test
+  public void testCorruptedTrailer() throws Exception {
+    // Insert something
+    for (int i = 0; i < 100; ++i) {
+      procStore.insert(new TestSequentialProcedure(), null);
+    }
+
+    // Stop the store
+    procStore.stop(false);
+
+    // Remove 4 byte from the trailer
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(1, logs.length);
+    corruptLog(logs[0], 4);
+
+    int count = countProcedures(storeRestart());
+    assertEquals(100, count);
+  }
+
+  @Test
+  public void testCorruptedEntries() throws Exception {
+    // Insert something
+    for (int i = 0; i < 100; ++i) {
+      procStore.insert(new TestSequentialProcedure(), null);
+    }
+
+    // Stop the store
+    procStore.stop(false);
+
+    // Remove some byte from the log
+    // (enough to cut the trailer and corrupt some entries)
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(1, logs.length);
+    corruptLog(logs[0], 1823);
+
+    int count = countProcedures(storeRestart());
+    assertTrue(procStore.getCorruptedLogs() != null);
+    assertEquals(1, procStore.getCorruptedLogs().size());
+    assertEquals(85, count);
+  }
+
+  private void corruptLog(final FileStatus logFile, final long dropBytes)
+      throws IOException {
+    assertTrue(logFile.getLen() > dropBytes);
+    LOG.debug("corrupt log " + logFile.getPath() +
+              " size=" + logFile.getLen() + " drop=" + dropBytes);
+    Path tmpPath = new Path(testDir, "corrupted.log");
+    InputStream in = fs.open(logFile.getPath());
+    OutputStream out =  fs.create(tmpPath);
+    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
+    fs.rename(tmpPath, logFile.getPath());
+  }
+
+  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws 
Exception {
+    int count = 0;
+    Iterator<Procedure> loader = storeRestart();
+    while (loader.hasNext()) {
+      Procedure proc = loader.next();
+      LOG.debug("loading procId=" + proc.getProcId());
+      assertTrue("procId=" + proc.getProcId() + " unexpected", 
procIds.contains(proc.getProcId()));
+      count++;
+    }
+    assertEquals(procIds.size(), count);
+  }
+
+  private void assertIsEmpty(Iterator<Procedure> iterator) {
+    assertEquals(0, countProcedures(iterator));
+  }
+
+  private int countProcedures(Iterator<Procedure> iterator) {
+    int count = 0;
+    while (iterator.hasNext()) {
+      Procedure proc = iterator.next();
+      LOG.trace("loading procId=" + proc.getProcId());
+      count++;
+    }
+    return count;
+  }
+
+  private void assertEmptyLogDir() {
+    try {
+      FileStatus[] status = fs.listStatus(logDir);
+      assertTrue("expected empty state-log dir", status == null || 
status.length == 0);
+    } catch (FileNotFoundException e) {
+      fail("expected the state-log dir to be present: " + logDir);
+    } catch (IOException e) {
+      fail("got en exception on state-log dir list: " + e.getMessage());
+    }
+  }
+
+  public static class TestSequentialProcedure extends 
SequentialProcedure<Void> {
+    private static long seqid = 0;
+
+    public TestSequentialProcedure() {
+      setProcId(++seqid);
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) { return null; }
+
+    @Override
+    protected void rollback(Void env) { }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws 
IOException {
+      long procId = getProcId();
+      if (procId % 2 == 0) {
+        stream.write(Bytes.toBytes(procId));
+      }
+    }
+
+    @Override
+    protected void deserializeStateData(InputStream stream) throws IOException 
{
+      long procId = getProcId();
+      if (procId % 2 == 0) {
+        byte[] bProcId = new byte[8];
+        assertEquals(8, stream.read(bProcId));
+        assertEquals(procId, Bytes.toLong(bProcId));
+      } else {
+        assertEquals(0, stream.available());
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to