http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java new file mode 100644 index 0000000..d2a394a --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -0,0 +1,714 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.io.FileNotFoundException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.procedure2.util.ByteSlot; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; + +/** + * WAL implementation of the ProcedureStore. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class WALProcedureStore implements ProcedureStore { + private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); + + public interface LeaseRecovery { + void recoverFileLease(FileSystem fs, Path path) throws IOException; + } + + private static final int MAX_RETRIES_BEFORE_ABORT = 3; + + private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; + private static final int DEFAULT_SYNC_WAIT_MSEC = 50; + + private final CopyOnWriteArrayList<ProcedureStoreListener> listeners = + new CopyOnWriteArrayList<ProcedureStoreListener>(); + + private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>(); + private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); + private final AtomicBoolean running = new AtomicBoolean(false); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition waitCond = lock.newCondition(); + private final Condition slotCond = lock.newCondition(); + private final Condition syncCond = lock.newCondition(); + + private final LeaseRecovery leaseRecovery; + private final Configuration conf; + private final FileSystem fs; + private final Path logDir; + + private AtomicBoolean inSync = new AtomicBoolean(false); + private ArrayBlockingQueue<ByteSlot> slotsCache = null; + private Set<ProcedureWALFile> corruptedLogs = null; + private FSDataOutputStream stream = null; + private long totalSynced = 0; + private long flushLogId = 0; + private int slotIndex = 0; + private Thread syncThread; + private ByteSlot[] slots; + private int syncWaitMsec; + + public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, + final LeaseRecovery leaseRecovery) { + this.fs = fs; + this.conf = conf; + this.logDir = logDir; + this.leaseRecovery = leaseRecovery; + } + + @Override + public void start(int numSlots) throws IOException { + if (running.getAndSet(true)) { + return; + } + + // Init buffer slots + slots = new ByteSlot[numSlots]; + slotsCache = new ArrayBlockingQueue(numSlots, true); + while (slotsCache.remainingCapacity() > 0) { + slotsCache.offer(new ByteSlot()); + } + + // Tunings + syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); + + // Init sync thread + syncThread = new Thread() { + @Override + public void run() { + while (running.get()) { + try { + syncLoop(); + } catch (IOException e) { + LOG.error("got an exception from the sync-loop", e); + sendAbortProcessSignal(); + } + } + } + }; + syncThread.start(); + } + + @Override + public void stop(boolean abort) { + if (!running.getAndSet(false)) { + return; + } + + LOG.info("Stopping the WAL Procedure Store"); + if (lock.tryLock()) { + try { + waitCond.signalAll(); + } finally { + lock.unlock(); + } + } + + if (!abort) { + try { + syncThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // Close the writer + closeStream(); + + // Close the old logs + // they should be already closed, this is just in case the load fails + // and we call start() and then stop() + for (ProcedureWALFile log: logs) { + log.close(); + } + logs.clear(); + } + + @Override + public boolean isRunning() { + return running.get(); + } + + public ProcedureStoreTracker getStoreTracker() { + return storeTracker; + } + + @Override + public void registerListener(ProcedureStoreListener listener) { + this.listeners.add(listener); + } + + @Override + public boolean unregisterListener(ProcedureStoreListener listener) { + return this.listeners.remove(listener); + } + + @Override + public void recoverLease() throws IOException { + LOG.info("Starting WAL Procedure Store lease recovery"); + FileStatus[] oldLogs = getLogFiles(); + while (running.get()) { + // Get Log-MaxID and recover lease on old logs + flushLogId = initOldLogs(oldLogs) + 1; + + // Create new state-log + if (!rollWriter(flushLogId)) { + // someone else has already created this log + LOG.debug("someone else has already created log " + flushLogId); + continue; + } + + // We have the lease on the log + oldLogs = getLogFiles(); + if (getMaxLogId(oldLogs) > flushLogId) { + // Someone else created new logs + LOG.debug("someone else created new logs. expected maxLogId < " + flushLogId); + logs.getLast().removeFile(); + continue; + } + + LOG.info("lease acquired flushLogId=" + flushLogId); + break; + } + } + + @Override + public Iterator<Procedure> load() throws IOException { + if (logs.isEmpty()) { + throw new RuntimeException("recoverLease() must be called before loading data"); + } + + // Nothing to do, If we have only the current log. + if (logs.size() == 1) { + LOG.debug("No state logs to replay"); + return null; + } + + // Load the old logs + final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>(); + Iterator<ProcedureWALFile> it = logs.descendingIterator(); + it.next(); // Skip the current log + try { + return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { + @Override + public void removeLog(ProcedureWALFile log) { + toRemove.add(log); + } + + @Override + public void markCorruptedWAL(ProcedureWALFile log, IOException e) { + if (corruptedLogs == null) { + corruptedLogs = new HashSet<ProcedureWALFile>(); + } + corruptedLogs.add(log); + // TODO: sideline corrupted log + } + }); + } finally { + if (!toRemove.isEmpty()) { + for (ProcedureWALFile log: toRemove) { + removeLogFile(log); + } + } + } + } + + @Override + public void insert(final Procedure proc, final Procedure[] subprocs) { + if (LOG.isTraceEnabled()) { + LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs)); + } + + ByteSlot slot = acquireSlot(); + long logId = -1; + try { + // Serialize the insert + if (subprocs != null) { + ProcedureWALFormat.writeInsert(slot, proc, subprocs); + } else { + assert !proc.hasParent(); + ProcedureWALFormat.writeInsert(slot, proc); + } + + // Push the transaction data and wait until it is persisted + logId = pushData(slot); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize one of the procedure: proc=" + proc + + " subprocs=" + Arrays.toString(subprocs), e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + + // Update the store tracker + synchronized (storeTracker) { + if (logId == flushLogId) { + storeTracker.insert(proc, subprocs); + } + } + } + + @Override + public void update(final Procedure proc) { + if (LOG.isTraceEnabled()) { + LOG.trace("update " + proc); + } + + ByteSlot slot = acquireSlot(); + long logId = -1; + try { + // Serialize the update + ProcedureWALFormat.writeUpdate(slot, proc); + + // Push the transaction data and wait until it is persisted + logId = pushData(slot); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedure: " + proc, e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + + // Update the store tracker + boolean removeOldLogs = false; + synchronized (storeTracker) { + if (logId == flushLogId) { + storeTracker.update(proc); + removeOldLogs = storeTracker.isUpdated(); + } + } + + if (removeOldLogs) { + removeAllLogs(logId - 1); + } + } + + @Override + public void delete(final long procId) { + if (LOG.isTraceEnabled()) { + LOG.trace("delete " + procId); + } + + ByteSlot slot = acquireSlot(); + long logId = -1; + try { + // Serialize the delete + ProcedureWALFormat.writeDelete(slot, procId); + + // Push the transaction data and wait until it is persisted + logId = pushData(slot); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedure: " + procId, e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + + boolean removeOldLogs = false; + synchronized (storeTracker) { + if (logId == flushLogId) { + storeTracker.delete(procId); + if (storeTracker.isEmpty()) { + removeOldLogs = rollWriterOrDie(logId + 1); + } + } + } + + if (removeOldLogs) { + removeAllLogs(logId); + } + } + + private ByteSlot acquireSlot() { + ByteSlot slot = slotsCache.poll(); + return slot != null ? slot : new ByteSlot(); + } + + private void releaseSlot(final ByteSlot slot) { + slot.reset(); + slotsCache.offer(slot); + } + + private long pushData(final ByteSlot slot) { + assert !logs.isEmpty() : "recoverLease() must be called before inserting data"; + long logId = -1; + + lock.lock(); + try { + // Wait for the sync to be completed + while (true) { + if (inSync.get()) { + syncCond.await(); + } else if (slotIndex == slots.length) { + slotCond.signal(); + syncCond.await(); + } else { + break; + } + } + + slots[slotIndex++] = slot; + logId = flushLogId; + + // Notify that there is new data + if (slotIndex == 1) { + waitCond.signal(); + } + + // Notify that the slots are full + if (slotIndex == slots.length) { + slotCond.signal(); + } + syncCond.await(); + } catch (InterruptedException e) { + sendAbortProcessSignal(); + } finally { + lock.unlock(); + } + return logId; + } + + private void syncLoop() throws IOException { + inSync.set(false); + while (running.get()) { + lock.lock(); + try { + // Wait until new data is available + if (slotIndex == 0) { + LOG.debug("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced)); + waitCond.await(); + if (slotIndex == 0) { + // no data.. probably a stop() + continue; + } + } + + // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing + slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + + inSync.set(true); + totalSynced += syncSlots(); + slotIndex = 0; + inSync.set(false); + syncCond.signalAll(); + } catch (InterruptedException e) { + sendAbortProcessSignal(); + } finally { + lock.unlock(); + } + } + + try { + syncCond.signalAll(); + } catch (Throwable e) { + LOG.debug("unable to send a signal to ones waiting in pushData(): " + e.getMessage(), e); + } + } + + private long syncSlots() { + int retry = 0; + long totalSynced = 0; + do { + try { + totalSynced = syncSlots(stream, slots, 0, slotIndex); + break; + } catch (IOException e) { + if (++retry == MAX_RETRIES_BEFORE_ABORT) { + LOG.error("sync slot failed, abort.", e); + sendAbortProcessSignal(); + } + } + } while (running.get()); + return totalSynced; + } + + protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) + throws IOException { + long totalSynced = 0; + for (int i = 0; i < count; ++i) { + ByteSlot data = slots[offset + i]; + data.writeTo(stream); + totalSynced += data.size(); + } + stream.hsync(); + return totalSynced; + } + + private void sendAbortProcessSignal() { + if (!this.listeners.isEmpty()) { + for (ProcedureStoreListener listener : this.listeners) { + listener.abortProcess(); + } + } + } + + private boolean rollWriterOrDie(final long logId) { + try { + return rollWriter(logId); + } catch (IOException e) { + LOG.warn("Unable to roll the log", e); + sendAbortProcessSignal(); + return false; + } + } + + private boolean rollWriter(final long logId) throws IOException { + ProcedureWALHeader header = ProcedureWALHeader.newBuilder() + .setVersion(ProcedureWALFormat.HEADER_VERSION) + .setType(ProcedureWALFormat.LOG_TYPE_STREAM) + .setMinProcId(storeTracker.getMinProcId()) + .setLogId(logId) + .build(); + + FSDataOutputStream newStream = null; + Path newLogFile = null; + long startPos = -1; + try { + newLogFile = getLogFilePath(logId); + newStream = fs.create(newLogFile, false); + ProcedureWALFormat.writeHeader(newStream, header); + startPos = newStream.getPos(); + } catch (FileAlreadyExistsException e) { + LOG.error("Log file with id=" + logId + " already exists", e); + return false; + } + lock.lock(); + try { + closeStream(); + synchronized (storeTracker) { + storeTracker.resetUpdates(); + } + stream = newStream; + flushLogId = logId; + totalSynced = 0; + logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); + } finally { + lock.unlock(); + } + LOG.info("Roll new state log: " + logId); + return true; + } + + private void closeStream() { + try { + if (stream != null) { + try { + ProcedureWALFormat.writeTrailer(stream, storeTracker); + } catch (IOException e) { + LOG.error("Unable to write the trailer", e); + } + stream.close(); + } + } catch (IOException e) { + LOG.error("Unable to close the stream", e); + } finally { + stream = null; + } + } + + private void removeAllLogs(long lastLogId) { + LOG.info("Remove all state logs with ID less then " + lastLogId); + while (!logs.isEmpty()) { + ProcedureWALFile log = logs.getFirst(); + if (lastLogId < log.getLogId()) { + break; + } + + removeLogFile(log); + } + } + + private boolean removeLogFile(final ProcedureWALFile log) { + try { + LOG.debug("remove log: " + log); + log.removeFile(); + logs.remove(log); + } catch (IOException e) { + LOG.error("unable to remove log " + log, e); + return false; + } + return true; + } + + public Set<ProcedureWALFile> getCorruptedLogs() { + return corruptedLogs; + } + + // ========================================================================== + // FileSystem Log Files helpers + // ========================================================================== + public Path getLogDir() { + return this.logDir; + } + + public FileSystem getFileSystem() { + return this.fs; + } + + protected Path getLogFilePath(final long logId) throws IOException { + return new Path(logDir, String.format("state-%020d.log", logId)); + } + + private static long getLogIdFromName(final String name) { + int end = name.lastIndexOf(".log"); + int start = name.lastIndexOf('-') + 1; + while (start < end) { + if (name.charAt(start) != '0') + break; + start++; + } + return Long.parseLong(name.substring(start, end)); + } + + private FileStatus[] getLogFiles() throws IOException { + try { + return fs.listStatus(logDir, new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith("state-") && name.endsWith(".log"); + } + }); + } catch (FileNotFoundException e) { + LOG.warn("log directory not found: " + e.getMessage()); + return null; + } + } + + private long getMaxLogId(final FileStatus[] logFiles) { + long maxLogId = 0; + if (logFiles != null && logFiles.length > 0) { + for (int i = 0; i < logFiles.length; ++i) { + maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName())); + } + } + return maxLogId; + } + + /** + * @return Max-LogID of the specified log file set + */ + private long initOldLogs(final FileStatus[] logFiles) throws IOException { + this.logs.clear(); + + long maxLogId = 0; + if (logFiles != null && logFiles.length > 0) { + for (int i = 0; i < logFiles.length; ++i) { + final Path logPath = logFiles[i].getPath(); + leaseRecovery.recoverFileLease(fs, logPath); + maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); + + ProcedureWALFile log = initOldLog(logFiles[i]); + if (log != null) { + this.logs.add(log); + } + } + Collections.sort(this.logs); + initTrackerFromOldLogs(); + } + return maxLogId; + } + + private void initTrackerFromOldLogs() { + // TODO: Load the most recent tracker available + if (!logs.isEmpty()) { + ProcedureWALFile log = logs.getLast(); + try { + log.readTracker(storeTracker); + } catch (IOException e) { + LOG.error("Unable to read tracker for " + log, e); + // try the next one... + storeTracker.clear(); + storeTracker.setPartialFlag(true); + } + } + } + + private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { + ProcedureWALFile log = new ProcedureWALFile(fs, logFile); + if (logFile.getLen() == 0) { + LOG.warn("Remove uninitialized log " + logFile); + log.removeFile(); + return null; + } + + LOG.debug("opening state-log: " + logFile); + try { + log.open(); + } catch (ProcedureWALFormat.InvalidWALDataException e) { + LOG.warn("Remove uninitialized log " + logFile, e); + log.removeFile(); + return null; + } catch (IOException e) { + String msg = "Unable to read state log: " + logFile; + LOG.error(msg, e); + throw new IOException(msg, e); + } + + if (log.isCompacted()) { + try { + log.readTrailer(); + } catch (IOException e) { + // unfinished compacted log throw it away + LOG.warn("Unfinished compacted log " + logFile, e); + log.removeFile(); + return null; + } + } + return log; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java new file mode 100644 index 0000000..8904116 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Similar to the ByteArrayOutputStream, with the exception that we can prepend an header. + * e.g. you write some data and you want to prepend an header that contains the data len or cksum. + * <code> + * ByteSlot slot = new ByteSlot(); + * // write data + * slot.write(...); + * slot.write(...); + * // write header with the size of the written data + * slot.markHead(); + * slot.write(Bytes.toBytes(slot.size())); + * // flush to stream as [header, data] + * slot.writeTo(stream); + * </code> + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ByteSlot extends OutputStream { + private static final int DOUBLE_GROW_LIMIT = 1 << 20; + private static final int GROW_ALIGN = 128; + + private byte[] buf; + private int head; + private int size; + + public void reset() { + head = 0; + size = 0; + } + + public void markHead() { + head = size; + } + + public int getHead() { + return head; + } + + public int size() { + return size; + } + + public byte[] getBuffer() { + return buf; + } + + public void writeAt(int offset, int b) { + head = Math.min(head, offset); + buf[offset] = (byte)b; + } + + public void write(int b) { + ensureCapacity(size + 1); + buf[size++] = (byte)b; + } + + public void write(byte[] b, int off, int len) { + ensureCapacity(size + len); + System.arraycopy(b, off, buf, size, len); + size += len; + } + + public void writeTo(final OutputStream stream) throws IOException { + if (head != 0) { + stream.write(buf, head, size - head); + stream.write(buf, 0, head); + } else { + stream.write(buf, 0, size); + } + } + + private void ensureCapacity(int minCapacity) { + minCapacity = (minCapacity + (GROW_ALIGN - 1)) & -GROW_ALIGN; + if (buf == null) { + buf = new byte[minCapacity]; + } else if (minCapacity > buf.length) { + int newCapacity = buf.length << 1; + if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) { + newCapacity = minCapacity; + } + buf = Arrays.copyOf(buf, newCapacity); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java new file mode 100644 index 0000000..97134c2 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.util; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class StringUtils { + private StringUtils() {} + + public static String humanTimeDiff(long timeDiff) { + StringBuilder buf = new StringBuilder(); + long hours = timeDiff / (60*60*1000); + long rem = (timeDiff % (60*60*1000)); + long minutes = rem / (60*1000); + rem = rem % (60*1000); + float seconds = rem / 1000.0f; + + if (hours != 0){ + buf.append(hours); + buf.append("hrs, "); + } + if (minutes != 0){ + buf.append(minutes); + buf.append("mins, "); + } + if (hours > 0 || minutes > 0) { + buf.append(seconds); + buf.append("sec"); + } else { + buf.append(String.format("%.4fsec", seconds)); + } + return buf.toString(); + } + + public static String humanSize(double size) { + if (size >= (1L << 40)) return String.format("%.1fT", size / (1L << 40)); + if (size >= (1L << 30)) return String.format("%.1fG", size / (1L << 30)); + if (size >= (1L << 20)) return String.format("%.1fM", size / (1L << 20)); + if (size >= (1L << 10)) return String.format("%.1fK", size / (1L << 10)); + return String.format("%.0f", size); + } + + public static boolean isEmpty(final String input) { + return input == null || input.length() == 0; + } + + public static String buildString(final String... parts) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length; ++i) { + sb.append(parts[i]); + } + return sb.toString(); + } + + public static StringBuilder appendStrings(final StringBuilder sb, final String... parts) { + for (int i = 0; i < parts.length; ++i) { + sb.append(parts[i]); + } + return sb; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java new file mode 100644 index 0000000..368d26f --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.util; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class TimeoutBlockingQueue<E> { + public static interface TimeoutRetriever<T> { + long getTimeout(T object); + TimeUnit getTimeUnit(T object); + } + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition waitCond = lock.newCondition(); + private final TimeoutRetriever<? super E> timeoutRetriever; + + private E[] objects; + private int head = 0; + private int tail = 0; + + public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) { + this(32, timeoutRetriever); + } + + @SuppressWarnings("unchecked") + public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) { + this.objects = (E[])new Object[capacity]; + this.timeoutRetriever = timeoutRetriever; + } + + public void dump() { + for (int i = 0; i < objects.length; ++i) { + if (i == head) { + System.out.print("[" + objects[i] + "] "); + } else if (i == tail) { + System.out.print("]" + objects[i] + "[ "); + } else { + System.out.print(objects[i] + " "); + } + } + System.out.println(); + } + + public void clear() { + lock.lock(); + try { + if (head != tail) { + for (int i = head; i < tail; ++i) { + objects[i] = null; + } + head = 0; + tail = 0; + waitCond.signal(); + } + } finally { + lock.unlock(); + } + } + + public void add(E e) { + if (e == null) throw new NullPointerException(); + + lock.lock(); + try { + addElement(e); + waitCond.signal(); + } finally { + lock.unlock(); + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + public E poll() { + lock.lock(); + try { + if (isEmpty()) { + waitCond.await(); + return null; + } + + E elem = objects[head]; + long nanos = getNanosTimeout(elem); + nanos = waitCond.awaitNanos(nanos); + return nanos > 0 ? null : removeFirst(); + } catch (InterruptedException e) { + return null; + } finally { + lock.unlock(); + } + } + + public int size() { + return tail - head; + } + + public boolean isEmpty() { + return (tail - head) == 0; + } + + public void signalAll() { + lock.lock(); + try { + waitCond.signalAll(); + } finally { + lock.unlock(); + } + } + + private void addElement(E elem) { + int size = (tail - head); + if ((objects.length - size) == 0) { + int capacity = size + ((size < 64) ? (size + 2) : (size >> 1)); + E[] newObjects = (E[])new Object[capacity]; + + if (compareTimeouts(objects[tail - 1], elem) <= 0) { + // Append + System.arraycopy(objects, head, newObjects, 0, tail); + tail -= head; + newObjects[tail++] = elem; + } else if (compareTimeouts(objects[head], elem) > 0) { + // Prepend + System.arraycopy(objects, head, newObjects, 1, tail); + newObjects[0] = elem; + tail -= (head - 1); + } else { + // Insert in the middle + int index = upperBound(head, tail - 1, elem); + int newIndex = (index - head); + System.arraycopy(objects, head, newObjects, 0, newIndex); + newObjects[newIndex] = elem; + System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index); + tail -= (head - 1); + } + head = 0; + objects = newObjects; + } else { + if (tail == objects.length) { + // shift down |-----AAAAAAA| + tail -= head; + System.arraycopy(objects, head, objects, 0, tail); + head = 0; + } + + if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) { + // Append + objects[tail++] = elem; + } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) { + // Prepend + objects[--head] = elem; + } else { + // Insert in the middle + int index = upperBound(head, tail - 1, elem); + System.arraycopy(objects, index, objects, index + 1, tail - index); + objects[index] = elem; + tail++; + } + } + } + + private E removeFirst() { + E elem = objects[head]; + objects[head] = null; + head = (head + 1) % objects.length; + if (head == 0) tail = 0; + return elem; + } + + private int upperBound(int start, int end, E key) { + while (start < end) { + int mid = (start + end) >>> 1; + E mitem = objects[mid]; + int cmp = compareTimeouts(mitem, key); + if (cmp > 0) { + end = mid; + } else { + start = mid + 1; + } + } + return start; + } + + private int compareTimeouts(final E a, final E b) { + long t1 = getNanosTimeout(a); + long t2 = getNanosTimeout(b); + return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0; + } + + private long getNanosTimeout(final E obj) { + TimeUnit unit = timeoutRetriever.getTimeUnit(obj); + long timeout = timeoutRetriever.getTimeout(obj); + return unit.toNanos(timeout); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java new file mode 100644 index 0000000..aba28f9 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ProcedureTestingUtility { + private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class); + + private ProcedureTestingUtility() { + } + + public static ProcedureStore createStore(final Configuration conf, final FileSystem fs, + final Path baseDir) throws IOException { + return createWalStore(conf, fs, baseDir); + } + + public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs, + final Path logDir) throws IOException { + return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { + @Override + public void recoverFileLease(FileSystem fs, Path path) throws IOException { + // no-op + } + }); + } + + public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor) + throws Exception { + ProcedureStore procStore = procExecutor.getStore(); + int nthreads = procExecutor.getNumThreads(); + procExecutor.stop(); + procStore.stop(false); + procExecutor.join(); + procStore.start(nthreads); + procExecutor.start(nthreads); + } + + public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, + boolean value) { + if (procExecutor.testing == null) { + procExecutor.testing = new ProcedureExecutor.Testing(); + } + procExecutor.testing.killBeforeStoreUpdate = value; + LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); + } + + public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, + boolean value) { + if (procExecutor.testing == null) { + procExecutor.testing = new ProcedureExecutor.Testing(); + } + procExecutor.testing.toggleKillBeforeStoreUpdate = value; + } + + public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { + if (procExecutor.testing == null) { + procExecutor.testing = new ProcedureExecutor.Testing(); + } + procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; + LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); + } + + public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { + long procId = procExecutor.submitProcedure(proc); + waitProcedure(procExecutor, procId); + return procId; + } + + public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { + while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { + Threads.sleepWithoutInterrupt(250); + } + } + + public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, + long procId) { + assertFalse(procExecutor.isFinished(procId)); + assertEquals(null, procExecutor.getResult(procId)); + } + + public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, + long procId) { + ProcedureResult result = procExecutor.getResult(procId); + assertTrue(result != null); + assertProcNotFailed(result); + } + + public static void assertProcNotFailed(final ProcedureResult result) { + Exception exception = result.getException(); + String msg = exception != null ? exception.toString() : "no exception found"; + assertFalse(msg, result.isFailed()); + } + + public static void assertIsAbortException(final ProcedureResult result) { + LOG.info(result.getException()); + assertTrue(result.isFailed()); + assertTrue(result.getException().getCause() instanceof ProcedureAbortedException); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java new file mode 100644 index 0000000..9ff8eb7 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import org.junit.After; +import org.junit.Before; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureExecution { + private static final Log LOG = LogFactory.getLog(TestProcedureExecution.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + private static final Procedure NULL_PROC = null; + + private ProcedureExecutor<Void> procExecutor; + private ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path testDir; + private Path logDir; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + private static class TestProcedureException extends Exception { + public TestProcedureException(String msg) { super(msg); } + } + + public static class TestSequentialProcedure extends SequentialProcedure<Void> { + private final Procedure[] subProcs; + private final List<String> state; + private final Exception failure; + private final String name; + + public TestSequentialProcedure() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) { + this.state = state; + this.subProcs = subProcs; + this.name = name; + this.failure = null; + } + + public TestSequentialProcedure(String name, List<String> state, Exception failure) { + this.state = state; + this.subProcs = null; + this.name = name; + this.failure = failure; + } + + @Override + protected Procedure[] execute(Void env) { + state.add(name + "-execute"); + if (failure != null) { + setFailure(new RemoteProcedureException(name + "-failure", failure)); + return null; + } + return subProcs; + } + + @Override + protected void rollback(Void env) { + state.add(name + "-rollback"); + } + + @Override + protected boolean abort(Void env) { + state.add(name + "-abort"); + return true; + } + } + + @Test(timeout=30000) + public void testBadSubprocList() { + List<String> state = new ArrayList<String>(); + Procedure subProc2 = new TestSequentialProcedure("subProc2", state); + Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC); + Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1); + long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); + + // subProc1 has a "null" subprocedure which is catched as InvalidArgument + // failed state with 2 execute and 2 rollback + LOG.info(state); + ProcedureResult result = procExecutor.getResult(rootId); + LOG.info(result.getException()); + assertTrue(state.toString(), result.isFailed()); + assertTrue(result.getException().toString(), + result.getException().getCause() instanceof IllegalArgumentException); + + assertEquals(state.toString(), 4, state.size()); + assertEquals("rootProc-execute", state.get(0)); + assertEquals("subProc1-execute", state.get(1)); + assertEquals("subProc1-rollback", state.get(2)); + assertEquals("rootProc-rollback", state.get(3)); + } + + @Test(timeout=30000) + public void testSingleSequentialProc() { + List<String> state = new ArrayList<String>(); + Procedure subProc2 = new TestSequentialProcedure("subProc2", state); + Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); + Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1); + long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); + + // successful state, with 3 execute + LOG.info(state); + ProcedureResult result = procExecutor.getResult(rootId); + ProcedureTestingUtility.assertProcNotFailed(result); + assertEquals(state.toString(), 3, state.size()); + } + + @Test(timeout=30000) + public void testSingleSequentialProcRollback() { + List<String> state = new ArrayList<String>(); + Procedure subProc2 = new TestSequentialProcedure("subProc2", state, + new TestProcedureException("fail test")); + Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); + Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1); + long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); + + // the 3rd proc fail, rollback after 2 successful execution + LOG.info(state); + ProcedureResult result = procExecutor.getResult(rootId); + LOG.info(result.getException()); + assertTrue(state.toString(), result.isFailed()); + assertTrue(result.getException().toString(), + result.getException().getCause() instanceof TestProcedureException); + + assertEquals(state.toString(), 6, state.size()); + assertEquals("rootProc-execute", state.get(0)); + assertEquals("subProc1-execute", state.get(1)); + assertEquals("subProc2-execute", state.get(2)); + assertEquals("subProc2-rollback", state.get(3)); + assertEquals("subProc1-rollback", state.get(4)); + assertEquals("rootProc-rollback", state.get(5)); + } + + public static class TestWaitingProcedure extends SequentialProcedure<Void> { + private final List<String> state; + private final boolean hasChild; + private final String name; + + public TestWaitingProcedure() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestWaitingProcedure(String name, List<String> state, boolean hasChild) { + this.hasChild = hasChild; + this.state = state; + this.name = name; + } + + @Override + protected Procedure[] execute(Void env) { + state.add(name + "-execute"); + setState(ProcedureState.WAITING_TIMEOUT); + return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null; + } + + @Override + protected void rollback(Void env) { + state.add(name + "-rollback"); + } + + @Override + protected boolean abort(Void env) { + state.add(name + "-abort"); + return true; + } + + public static class TestWaitChild extends SequentialProcedure<Void> { + private final List<String> state; + private final String name; + + public TestWaitChild() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestWaitChild(String name, List<String> state) { + this.name = name; + this.state = state; + } + + @Override + protected Procedure[] execute(Void env) { + state.add(name + "-child-execute"); + return null; + } + + @Override + protected void rollback(Void env) { + state.add(name + "-child-rollback"); + } + + @Override + protected boolean abort(Void env) { + state.add(name + "-child-abort"); + return true; + } + } + } + + @Test(timeout=30000) + public void testAbortTimeout() { + final int PROC_TIMEOUT_MSEC = 2500; + List<String> state = new ArrayList<String>(); + Procedure proc = new TestWaitingProcedure("wproc", state, false); + proc.setTimeout(PROC_TIMEOUT_MSEC); + long startTime = EnvironmentEdgeManager.currentTime(); + long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + long execTime = EnvironmentEdgeManager.currentTime() - startTime; + LOG.info(state); + assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC); + ProcedureResult result = procExecutor.getResult(rootId); + LOG.info(result.getException()); + assertTrue(state.toString(), result.isFailed()); + assertTrue(result.getException().toString(), + result.getException().getCause() instanceof TimeoutException); + assertEquals(state.toString(), 2, state.size()); + assertEquals("wproc-execute", state.get(0)); + assertEquals("wproc-rollback", state.get(1)); + } + + @Test(timeout=30000) + public void testAbortTimeoutWithChildren() { + List<String> state = new ArrayList<String>(); + Procedure proc = new TestWaitingProcedure("wproc", state, true); + proc.setTimeout(2500); + long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + LOG.info(state); + ProcedureResult result = procExecutor.getResult(rootId); + LOG.info(result.getException()); + assertTrue(state.toString(), result.isFailed()); + assertTrue(result.getException().toString(), + result.getException().getCause() instanceof TimeoutException); + assertEquals(state.toString(), 4, state.size()); + assertEquals("wproc-execute", state.get(0)); + assertEquals("wproc-child-execute", state.get(1)); + assertEquals("wproc-child-rollback", state.get(2)); + assertEquals("wproc-rollback", state.get(3)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java new file mode 100644 index 0000000..d654910 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -0,0 +1,488 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureRecovery { + private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + private static final Procedure NULL_PROC = null; + + private static ProcedureExecutor<Void> procExecutor; + private static ProcedureStore procStore; + private static int procSleepInterval; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path testDir; + private Path logDir; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procExecutor.testing = new ProcedureExecutor.Testing(); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS); + procSleepInterval = 0; + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + private void restart() throws Exception { + dumpLogDirState(); + ProcedureTestingUtility.restart(procExecutor); + dumpLogDirState(); + } + + public static class TestSingleStepProcedure extends SequentialProcedure<Void> { + private int step = 0; + + public TestSingleStepProcedure() { } + + @Override + protected Procedure[] execute(Void env) { + LOG.debug("execute procedure " + this + " step=" + step); + step++; + setResult(Bytes.toBytes(step)); + return null; + } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return true; } + } + + public static class BaseTestStepProcedure extends SequentialProcedure<Void> { + private AtomicBoolean abort = new AtomicBoolean(false); + private int step = 0; + + @Override + protected Procedure[] execute(Void env) { + LOG.debug("execute procedure " + this + " step=" + step); + ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); + step++; + Threads.sleepWithoutInterrupt(procSleepInterval); + if (isAborted()) { + setFailure(new RemoteProcedureException(getClass().getName(), + new ProcedureAbortedException( + "got an abort at " + getClass().getName() + " step=" + step))); + return null; + } + return null; + } + + @Override + protected void rollback(Void env) { + LOG.debug("rollback procedure " + this + " step=" + step); + ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); + step++; + } + + @Override + protected boolean abort(Void env) { + abort.set(true); + return true; + } + + private boolean isAborted() { + boolean aborted = abort.get(); + BaseTestStepProcedure proc = this; + while (proc.hasParent() && !aborted) { + proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId()); + aborted = proc.isAborted(); + } + return aborted; + } + } + + public static class TestMultiStepProcedure extends BaseTestStepProcedure { + public TestMultiStepProcedure() { } + + @Override + public Procedure[] execute(Void env) { + super.execute(env); + return isFailed() ? null : new Procedure[] { new Step1Procedure() }; + } + + public static class Step1Procedure extends BaseTestStepProcedure { + public Step1Procedure() { } + + @Override + protected Procedure[] execute(Void env) { + super.execute(env); + return isFailed() ? null : new Procedure[] { new Step2Procedure() }; + } + } + + public static class Step2Procedure extends BaseTestStepProcedure { + public Step2Procedure() { } + } + } + + @Test + public void testNoopLoad() throws Exception { + restart(); + } + + @Test(timeout=30000) + public void testSingleStepProcRecovery() throws Exception { + Procedure proc = new TestSingleStepProcedure(); + procExecutor.testing.killBeforeStoreUpdate = true; + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertFalse(procExecutor.isRunning()); + procExecutor.testing.killBeforeStoreUpdate = false; + + // Restart and verify that the procedures restart + long restartTs = EnvironmentEdgeManager.currentTime(); + restart(); + waitProcedure(procId); + ProcedureResult result = procExecutor.getResult(procId); + assertTrue(result.getLastUpdate() > restartTs); + ProcedureTestingUtility.assertProcNotFailed(result); + assertEquals(1, Bytes.toInt(result.getResult())); + long resultTs = result.getLastUpdate(); + + // Verify that after another restart the result is still there + restart(); + result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + assertEquals(resultTs, result.getLastUpdate()); + assertEquals(1, Bytes.toInt(result.getResult())); + } + + @Test(timeout=30000) + public void testMultiStepProcRecovery() throws Exception { + // Step 0 - kill + Procedure proc = new TestMultiStepProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertFalse(procExecutor.isRunning()); + + // Step 0 exec && Step 1 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 1 exec && step 2 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 2 exec + restart(); + waitProcedure(procId); + assertTrue(procExecutor.isRunning()); + + // The procedure is completed + ProcedureResult result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + } + + @Test(timeout=30000) + public void testMultiStepRollbackRecovery() throws Exception { + // Step 0 - kill + Procedure proc = new TestMultiStepProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertFalse(procExecutor.isRunning()); + + // Step 0 exec && Step 1 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 1 exec && step 2 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 2 exec - rollback - kill + procSleepInterval = 2500; + restart(); + assertTrue(procExecutor.abort(procId)); + waitProcedure(procId); + assertFalse(procExecutor.isRunning()); + + // rollback - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // rollback - complete + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Restart the executor and get the result + restart(); + waitProcedure(procId); + + // The procedure is completed + ProcedureResult result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertIsAbortException(result); + } + + public static class TestStateMachineProcedure + extends StateMachineProcedure<Void, TestStateMachineProcedure.State> { + enum State { STATE_1, STATE_2, STATE_3, DONE } + + public TestStateMachineProcedure() {} + + private AtomicBoolean aborted = new AtomicBoolean(false); + private int iResult = 0; + + @Override + protected StateMachineProcedure.Flow executeFromState(Void env, State state) { + if (state == null) { + LOG.info("Initializing " + this); + state = State.STATE_1; + setNextState(state); + } + + switch (state) { + case STATE_1: + LOG.info("execute step 1 " + this); + setNextState(State.STATE_2); + iResult += 3; + break; + case STATE_2: + LOG.info("execute step 2 " + this); + setNextState(State.STATE_3); + iResult += 5; + break; + case STATE_3: + LOG.info("execute step 3 " + this); + Threads.sleepWithoutInterrupt(procSleepInterval); + if (aborted.get()) { + LOG.info("aborted step 3 " + this); + setAbortFailure("test", "aborted"); + break; + } + setNextState(State.DONE); + iResult += 7; + setResult(Bytes.toBytes(iResult)); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(Void env, final State state) { + switch (state) { + case STATE_1: + LOG.info("rollback step 1 " + this); + break; + case STATE_2: + LOG.info("rollback step 2 " + this); + break; + case STATE_3: + LOG.info("rollback step 3 " + this); + break; + default: + throw new UnsupportedOperationException(); + } + } + + @Override + protected State getState(final int stateId) { + return State.values()[stateId]; + } + + private void setNextState(final State state) { + setNextState(state.ordinal()); + } + + @Override + protected boolean abort(Void env) { + aborted.set(true); + return true; + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + stream.write(Bytes.toBytes(iResult)); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + byte[] data = new byte[4]; + stream.read(data); + iResult = Bytes.toInt(data); + } + } + + @Test(timeout=30000) + public void testStateMachineRecovery() throws Exception { + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); + + // Step 1 - kill + Procedure proc = new TestStateMachineProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertFalse(procExecutor.isRunning()); + + // Step 1 exec && Step 2 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 2 exec && step 3 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 3 exec + restart(); + waitProcedure(procId); + assertTrue(procExecutor.isRunning()); + + // The procedure is completed + ProcedureResult result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + assertEquals(15, Bytes.toInt(result.getResult())); + } + + @Test(timeout=30000) + public void testStateMachineRollbackRecovery() throws Exception { + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); + + // Step 1 - kill + Procedure proc = new TestStateMachineProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 1 exec && Step 2 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 2 exec && step 3 - kill + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Step 3 exec - rollback step 3 - kill + procSleepInterval = 2500; + restart(); + assertTrue(procExecutor.abort(procId)); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + + // Rollback step 3 - rollback step 2 - kill + restart(); + waitProcedure(procId); + assertFalse(procExecutor.isRunning()); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + + // Rollback step 2 - step 1 - kill + restart(); + waitProcedure(procId); + assertFalse(procExecutor.isRunning()); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + + // Rollback step 1 - complete + restart(); + waitProcedure(procId); + assertTrue(procExecutor.isRunning()); + + // The procedure is completed + ProcedureResult result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertIsAbortException(result); + } + + private void waitProcedure(final long procId) { + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + dumpLogDirState(); + } + + private void dumpLogDirState() { + try { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + for (FileStatus file: files) { + assertTrue(file.toString(), file.isFile()); + LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); + } + } else { + LOG.debug("no files under: " + logDir); + } + } catch (IOException e) { + LOG.warn("Unable to dump " + logDir, e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java new file mode 100644 index 0000000..c040be6 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRunQueues.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureRunQueues { + class TestRunQueue implements ProcedureFairRunQueues.FairObject { + private final String name; + private final int priority; + + public TestRunQueue(String name, int priority) { + this.name = name; + this.priority = priority; + } + + @Override + public String toString() { + return name; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return priority; + } + } + + @Test + public void testFairQueues() throws Exception { + ProcedureFairRunQueues<String, TestRunQueue> fairq + = new ProcedureFairRunQueues<String, TestRunQueue>(1); + TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); + TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); + TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); + + for (int i = 0; i < 3; ++i) { + assertEquals(a, fairq.poll()); + assertEquals(b, fairq.poll()); + assertEquals(m, fairq.poll()); + assertEquals(m, fairq.poll()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java new file mode 100644 index 0000000..0669549 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureStoreTracker { + private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class); + + static class TestProcedure extends Procedure<Void> { + public TestProcedure(long procId) { + setProcId(procId); + } + + @Override + protected Procedure[] execute(Void env) { return null; } + + @Override + protected void rollback(Void env) { /* no-op */ } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) { /* no-op */ } + + @Override + protected void deserializeStateData(final InputStream stream) { /* no-op */ } + } + + @Test + public void testSeqInsertAndDelete() { + ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + assertTrue(tracker.isEmpty()); + + final int MIN_PROC = 1; + final int MAX_PROC = 1 << 10; + + // sequential insert + for (int i = MIN_PROC; i < MAX_PROC; ++i) { + tracker.insert(i); + + // All the proc that we inserted should not be deleted + for (int j = MIN_PROC; j <= i; ++j) { + assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(j)); + } + // All the proc that are not yet inserted should be result as deleted + for (int j = i + 1; j < MAX_PROC; ++j) { + assertTrue(tracker.isDeleted(j) != ProcedureStoreTracker.DeleteState.NO); + } + } + + // sequential delete + for (int i = MIN_PROC; i < MAX_PROC; ++i) { + tracker.delete(i); + + // All the proc that we deleted should be deleted + for (int j = MIN_PROC; j <= i; ++j) { + assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(j)); + } + // All the proc that are not yet deleted should be result as not deleted + for (int j = i + 1; j < MAX_PROC; ++j) { + assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(j)); + } + } + assertTrue(tracker.isEmpty()); + } + + @Test + public void testPartialTracker() { + ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + tracker.setPartialFlag(true); + + // nothing in the tracker, the state is unknown + assertTrue(tracker.isEmpty()); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(1)); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(579)); + + // Mark 1 as deleted, now that is a known state + tracker.setDeleted(1, true); + tracker.dump(); + assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1)); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(2)); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(579)); + + // Mark 579 as non-deleted, now that is a known state + tracker.setDeleted(579, false); + assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(1)); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(2)); + assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579)); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(577)); + assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(580)); + } + + @Test + public void testBasicCRUD() { + ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + assertTrue(tracker.isEmpty()); + + Procedure[] procs = new TestProcedure[] { + new TestProcedure(1), new TestProcedure(2), new TestProcedure(3), + new TestProcedure(4), new TestProcedure(5), new TestProcedure(6), + }; + + tracker.insert(procs[0], null); + tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] }); + assertFalse(tracker.isEmpty()); + assertTrue(tracker.isUpdated()); + + tracker.resetUpdates(); + assertFalse(tracker.isUpdated()); + + for (int i = 0; i < 4; ++i) { + tracker.update(procs[i]); + assertFalse(tracker.isEmpty()); + assertFalse(tracker.isUpdated()); + } + + tracker.update(procs[4]); + assertFalse(tracker.isEmpty()); + assertTrue(tracker.isUpdated()); + + tracker.update(procs[5]); + assertFalse(tracker.isEmpty()); + assertTrue(tracker.isUpdated()); + + for (int i = 0; i < 5; ++i) { + tracker.delete(procs[i].getProcId()); + assertFalse(tracker.isEmpty()); + assertTrue(tracker.isUpdated()); + } + tracker.delete(procs[5].getProcId()); + assertTrue(tracker.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java new file mode 100644 index 0000000..344b28b --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.SequentialProcedure; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IOUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestWALProcedureStore { + private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class); + + private static final int PROCEDURE_STORE_SLOTS = 1; + private static final Procedure NULL_PROC = null; + + private WALProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path testDir; + private Path logDir; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore.start(PROCEDURE_STORE_SLOTS); + procStore.recoverLease(); + } + + @After + public void tearDown() throws IOException { + procStore.stop(false); + fs.delete(logDir, true); + } + + private Iterator<Procedure> storeRestart() throws Exception { + procStore.stop(false); + procStore.start(PROCEDURE_STORE_SLOTS); + procStore.recoverLease(); + return procStore.load(); + } + + @Test + public void testEmptyLogLoad() throws Exception { + Iterator<Procedure> loader = storeRestart(); + assertEquals(0, countProcedures(loader)); + } + + @Test + public void testLoad() throws Exception { + Set<Long> procIds = new HashSet<>(); + + // Insert something in the log + Procedure proc1 = new TestSequentialProcedure(); + procIds.add(proc1.getProcId()); + procStore.insert(proc1, null); + + Procedure proc2 = new TestSequentialProcedure(); + Procedure[] child2 = new Procedure[2]; + child2[0] = new TestSequentialProcedure(); + child2[1] = new TestSequentialProcedure(); + + procIds.add(proc2.getProcId()); + procIds.add(child2[0].getProcId()); + procIds.add(child2[1].getProcId()); + procStore.insert(proc2, child2); + + // Verify that everything is there + verifyProcIdsOnRestart(procIds); + + // Update and delete something + procStore.update(proc1); + procStore.update(child2[1]); + procStore.delete(child2[1].getProcId()); + procIds.remove(child2[1].getProcId()); + + // Verify that everything is there + verifyProcIdsOnRestart(procIds); + + // Remove 4 byte from the trailers + procStore.stop(false); + FileStatus[] logs = fs.listStatus(logDir); + assertEquals(3, logs.length); + for (int i = 0; i < logs.length; ++i) { + corruptLog(logs[i], 4); + } + verifyProcIdsOnRestart(procIds); + } + + @Test + public void testCorruptedTrailer() throws Exception { + // Insert something + for (int i = 0; i < 100; ++i) { + procStore.insert(new TestSequentialProcedure(), null); + } + + // Stop the store + procStore.stop(false); + + // Remove 4 byte from the trailer + FileStatus[] logs = fs.listStatus(logDir); + assertEquals(1, logs.length); + corruptLog(logs[0], 4); + + int count = countProcedures(storeRestart()); + assertEquals(100, count); + } + + @Test + public void testCorruptedEntries() throws Exception { + // Insert something + for (int i = 0; i < 100; ++i) { + procStore.insert(new TestSequentialProcedure(), null); + } + + // Stop the store + procStore.stop(false); + + // Remove some byte from the log + // (enough to cut the trailer and corrupt some entries) + FileStatus[] logs = fs.listStatus(logDir); + assertEquals(1, logs.length); + corruptLog(logs[0], 1823); + + int count = countProcedures(storeRestart()); + assertTrue(procStore.getCorruptedLogs() != null); + assertEquals(1, procStore.getCorruptedLogs().size()); + assertEquals(85, count); + } + + private void corruptLog(final FileStatus logFile, final long dropBytes) + throws IOException { + assertTrue(logFile.getLen() > dropBytes); + LOG.debug("corrupt log " + logFile.getPath() + + " size=" + logFile.getLen() + " drop=" + dropBytes); + Path tmpPath = new Path(testDir, "corrupted.log"); + InputStream in = fs.open(logFile.getPath()); + OutputStream out = fs.create(tmpPath); + IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); + fs.rename(tmpPath, logFile.getPath()); + } + + private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { + int count = 0; + Iterator<Procedure> loader = storeRestart(); + while (loader.hasNext()) { + Procedure proc = loader.next(); + LOG.debug("loading procId=" + proc.getProcId()); + assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId())); + count++; + } + assertEquals(procIds.size(), count); + } + + private void assertIsEmpty(Iterator<Procedure> iterator) { + assertEquals(0, countProcedures(iterator)); + } + + private int countProcedures(Iterator<Procedure> iterator) { + int count = 0; + while (iterator.hasNext()) { + Procedure proc = iterator.next(); + LOG.trace("loading procId=" + proc.getProcId()); + count++; + } + return count; + } + + private void assertEmptyLogDir() { + try { + FileStatus[] status = fs.listStatus(logDir); + assertTrue("expected empty state-log dir", status == null || status.length == 0); + } catch (FileNotFoundException e) { + fail("expected the state-log dir to be present: " + logDir); + } catch (IOException e) { + fail("got en exception on state-log dir list: " + e.getMessage()); + } + } + + public static class TestSequentialProcedure extends SequentialProcedure<Void> { + private static long seqid = 0; + + public TestSequentialProcedure() { + setProcId(++seqid); + } + + @Override + protected Procedure[] execute(Void env) { return null; } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + long procId = getProcId(); + if (procId % 2 == 0) { + stream.write(Bytes.toBytes(procId)); + } + } + + @Override + protected void deserializeStateData(InputStream stream) throws IOException { + long procId = getProcId(); + if (procId % 2 == 0) { + byte[] bProcId = new byte[8]; + assertEquals(8, stream.read(bProcId)); + assertEquals(procId, Bytes.toLong(bProcId)); + } else { + assertEquals(0, stream.available()); + } + } + } +} \ No newline at end of file