http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
new file mode 100644
index 0000000..8cadc01
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Thread-safe implementation of a RingBuffer
+ *
+ */
+public class RingBuffer<T> {
+
+    private final Object[] buffer;
+    private int insertionPointer = 0;
+    private boolean filled = false;
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
+    public RingBuffer(final int size) {
+        buffer = new Object[size];
+    }
+
+    /**
+     * Adds the given value to the RingBuffer and returns the value that was 
removed in order to make room.
+     *
+     * @param value the new value to add
+     * @return value previously in the buffer
+     */
+    @SuppressWarnings("unchecked")
+    public T add(final T value) {
+        Objects.requireNonNull(value);
+
+        writeLock.lock();
+        try {
+            final Object removed = buffer[insertionPointer];
+
+            buffer[insertionPointer] = value;
+
+            if (insertionPointer == buffer.length - 1) {
+                filled = true;
+            }
+
+            insertionPointer = (insertionPointer + 1) % buffer.length;
+            return (T) removed;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public int getSize() {
+        readLock.lock();
+        try {
+            return filled ? buffer.length : insertionPointer;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<T> getSelectedElements(final Filter<T> filter) {
+        return getSelectedElements(filter, Integer.MAX_VALUE);
+    }
+
+    public List<T> getSelectedElements(final Filter<T> filter, final int 
maxElements) {
+        final List<T> selected = new ArrayList<>(1000);
+        int numSelected = 0;
+        readLock.lock();
+        try {
+            for (int i = 0; i < buffer.length && numSelected < maxElements; 
i++) {
+                final int idx = (insertionPointer + i) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+                if (filter.select(element)) {
+                    selected.add(element);
+                    numSelected++;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+        return selected;
+    }
+
+    public int countSelectedElements(final Filter<T> filter) {
+        int numSelected = 0;
+        readLock.lock();
+        try {
+            for (int i = 0; i < buffer.length; i++) {
+                final int idx = (insertionPointer + i) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+                if (filter.select(element)) {
+                    numSelected++;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+
+        return numSelected;
+    }
+
+    /**
+     * Removes all elements from the RingBuffer that match the given filter
+     *
+     * @param filter to use for deciding what is removed
+     * @return always zero
+     */
+    public int removeSelectedElements(final Filter<T> filter) {
+        int count = 0;
+
+        writeLock.lock();
+        try {
+            for (int i = 0; i < buffer.length; i++) {
+                final int idx = (insertionPointer + i + 1) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+
+                if (filter.select(element)) {
+                    buffer[idx] = null;
+                }
+            }
+        } finally {
+            writeLock.unlock();
+        }
+
+        return count;
+    }
+
+    public List<T> asList() {
+        return getSelectedElements(new Filter<T>() {
+            @Override
+            public boolean select(final T value) {
+                return true;
+            }
+        });
+    }
+
+    public T getOldestElement() {
+        readLock.lock();
+        try {
+            return getElementData(insertionPointer);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public T getNewestElement() {
+        readLock.lock();
+        try {
+            int index = (insertionPointer == 0) ? buffer.length - 1 : 
insertionPointer - 1;
+            return getElementData(index);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private T getElementData(final int index) {
+        readLock.lock();
+        try {
+            return (T) buffer[index];
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    /**
+     * Iterates over each element in the RingBuffer, calling the {@link 
ForEachEvaluator#evaluate(Object) evaluate} method on each element in the 
RingBuffer. If the Evaluator returns {@code false},
+     * the method will skip all remaining elements in the RingBuffer; 
otherwise, the next element will be evaluated until all elements have been 
evaluated.
+     *
+     * @param evaluator used to evaluate each item in the ring buffer
+     */
+    public void forEach(final ForEachEvaluator<T> evaluator) {
+        forEach(evaluator, IterationDirection.FORWARD);
+    }
+
+    /**
+     * Iterates over each element in the RingBuffer, calling the {@link 
ForEachEvaluator#evaluate(Object) evaluate} method on each element in the 
RingBuffer. If the Evaluator returns {@code false},
+     * the method will skip all remaining elements in the RingBuffer; 
otherwise, the next element will be evaluated until all elements have been 
evaluated.
+     *
+     * @param evaluator the evaluator
+     * @param iterationDirection the order in which to iterate over the 
elements in the RingBuffer
+     */
+    public void forEach(final ForEachEvaluator<T> evaluator, final 
IterationDirection iterationDirection) {
+        readLock.lock();
+        try {
+            final int startIndex;
+            final int endIndex;
+            final int increment;
+
+            if (iterationDirection == IterationDirection.FORWARD) {
+                startIndex = 0;
+                endIndex = buffer.length - 1;
+                increment = 1;
+            } else {
+                startIndex = buffer.length - 1;
+                endIndex = 0;
+                increment = -1;
+            }
+
+            for (int i = startIndex; (iterationDirection == 
IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) {
+                final int idx = (insertionPointer + i) % buffer.length;
+                final Object val = buffer[idx];
+                if (val == null) {
+                    continue;
+                }
+
+                @SuppressWarnings("unchecked")
+                final T element = (T) val;
+                if (!evaluator.evaluate(element)) {
+                    return;
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public static interface Filter<S> {
+
+        boolean select(S value);
+    }
+
+    /**
+     * Defines an interface that can be used to iterate over all of the 
elements in the RingBuffer via the {@link #forEach} method
+     *
+     * @param <S> the type to evaluate
+     */
+    public static interface ForEachEvaluator<S> {
+
+        /**
+         * Evaluates the given element and returns {@code true} if the next 
element should be evaluated, {@code false} otherwise
+         *
+         * @param value the value to evaluate
+         * @return true if should continue evaluating; false otherwise
+         */
+        boolean evaluate(S value);
+    }
+
+    public static enum IterationDirection {
+
+        FORWARD,
+        BACKWARD;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
new file mode 100644
index 0000000..cffe49c
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.concurrent.TimeUnit;
+
+public final class StopWatch {
+
+    private long startNanos = -1L;
+    private long duration = -1L;
+
+    /**
+     * Creates a StopWatch but does not start it
+     */
+    public StopWatch() {
+        this(false);
+    }
+
+    /**
+     * @param autoStart whether or not the timer should be started 
automatically
+     */
+    public StopWatch(final boolean autoStart) {
+        if (autoStart) {
+            start();
+        }
+    }
+
+    public void start() {
+        this.startNanos = System.nanoTime();
+        this.duration = -1L;
+    }
+
+    public void stop() {
+        if (startNanos < 0) {
+            throw new IllegalStateException("StopWatch has not been started");
+        }
+        this.duration = System.nanoTime() - startNanos;
+        this.startNanos = -1L;
+    }
+
+    /**
+     * Returns the amount of time that the StopWatch was running.
+     *
+     * @param timeUnit the unit for which the duration should be reported
+     * @return the duration of the stopwatch in the specified unit
+     *
+     * @throws IllegalStateException if the StopWatch has not been stopped via 
{@link #stop()}
+     */
+    public long getDuration(final TimeUnit timeUnit) {
+        if (duration < 0) {
+            throw new IllegalStateException("Cannot get duration until 
StopWatch has been stopped");
+        }
+        return timeUnit.convert(duration, TimeUnit.NANOSECONDS);
+    }
+
+    /**
+     * Returns the amount of time that has elapsed since the timer was started.
+     *
+     * @param timeUnit the unit for which the elapsed time should be computed
+     * @return the elapsed time in the specified unit
+     */
+    public long getElapsed(final TimeUnit timeUnit) {
+        return timeUnit.convert(System.nanoTime() - startNanos, 
TimeUnit.NANOSECONDS);
+    }
+
+    public String calculateDataRate(final long bytes) {
+        final double seconds = (double) duration / 1000000000.0D;
+        final long dataSize = (long) (bytes / seconds);
+        return FormatUtils.formatDataSize(dataSize) + "/sec";
+    }
+
+    public String getDuration() {
+        final StringBuilder sb = new StringBuilder();
+
+        long duration = this.duration;
+        final long minutes = (duration > 60000000000L) ? (duration / 
60000000000L) : 0L;
+        duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES);
+
+        final long seconds = (duration > 1000000000L) ? (duration / 
1000000000L) : 0L;
+        duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS);
+
+        final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L;
+        duration -= TimeUnit.NANOSECONDS.convert(millis, 
TimeUnit.MILLISECONDS);
+
+        final long nanos = duration % 1000000L;
+
+        if (minutes > 0) {
+            sb.append(minutes).append(" minutes");
+        }
+
+        if (seconds > 0) {
+            if (minutes > 0) {
+                sb.append(", ");
+            }
+
+            sb.append(seconds).append(" seconds");
+        }
+
+        if (millis > 0) {
+            if (seconds > 0) {
+                sb.append(", ");
+            }
+
+            sb.append(millis).append(" millis");
+        }
+        if (seconds == 0 && millis == 0) {
+            sb.append(nanos).append(" nanos");
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
new file mode 100644
index 0000000..c797c7f
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class Tuple<A, B> {
+
+    final A key;
+    final B value;
+
+    public Tuple(A key, B value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public A getKey() {
+        return key;
+    }
+
+    public B getValue() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (other == null) {
+            return false;
+        }
+        if (other == this) {
+            return true;
+        }
+        if (!(other instanceof Tuple)) {
+            return false;
+        }
+
+        final Tuple<?, ?> tuple = (Tuple<?, ?>) other;
+        if (key == null) {
+            if (tuple.key != null) {
+                return false;
+            }
+        } else {
+            if (!key.equals(tuple.key)) {
+                return false;
+            }
+        }
+
+        if (value == null) {
+            if (tuple.value != null) {
+                return false;
+            }
+        } else {
+            if (!value.equals(tuple.value)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return 581 + (this.key == null ? 0 : this.key.hashCode()) + 
(this.value == null ? 0 : this.value.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
new file mode 100644
index 0000000..8faf3ba
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+public class DebugDisabledTimedLock implements DebuggableTimedLock {
+
+    private final Lock lock;
+
+    public DebugDisabledTimedLock(final Lock lock) {
+        this.lock = lock;
+    }
+
+    /**
+     *
+     * @return true if lock obtained; false otherwise
+     */
+    @Override
+    public boolean tryLock() {
+        return lock.tryLock();
+    }
+
+    /**
+     *
+     * @param timeout the duration of time to wait for the lock
+     * @param timeUnit the unit which provides meaning to the duration
+     * @return true if obtained lock in time; false otherwise
+     */
+    @Override
+    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+        try {
+            return lock.tryLock(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void lock() {
+        lock.lock();
+    }
+
+    @Override
+    public void unlock(final String task) {
+        lock.unlock();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
new file mode 100644
index 0000000..e7d599e
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DebugEnabledTimedLock implements DebuggableTimedLock {
+
+    private final Lock lock;
+    private final Logger logger;
+    private long lockTime = 0L;
+
+    private final Map<String, Long> lockIterations = new HashMap<>();
+    private final Map<String, Long> lockNanos = new HashMap<>();
+
+    private final String name;
+    private final int iterationFrequency;
+
+    public DebugEnabledTimedLock(final Lock lock, final String name, final int 
iterationFrequency) {
+        this.lock = lock;
+        this.name = name;
+        this.iterationFrequency = iterationFrequency;
+        logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + 
name);
+    }
+
+    /**
+     * @return true if lock obtained; false otherwise
+     */
+    @Override
+    public boolean tryLock() {
+        logger.trace("Trying to obtain Lock: {}", name);
+        final boolean success = lock.tryLock();
+        if (!success) {
+            logger.trace("TryLock failed for Lock: {}", name);
+            return false;
+        }
+        logger.trace("TryLock successful");
+
+        return true;
+    }
+
+    /**
+     * @param timeout duration to wait for lock
+     * @param timeUnit unit to understand given duration
+     * @return true if lock obtained in time; false otherwise
+     */
+    @Override
+    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+        logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, 
timeout, timeUnit);
+        final boolean success;
+        try {
+            success = lock.tryLock(timeout, timeUnit);
+        } catch (final InterruptedException ie) {
+            return false;
+        }
+
+        if (!success) {
+            logger.trace("TryLock failed for Lock {} with a timeout of {} {}", 
name, timeout, timeUnit);
+            return false;
+        }
+        logger.trace("TryLock successful");
+        return true;
+    }
+
+    @Override
+    public void lock() {
+        logger.trace("Obtaining Lock {}", name);
+        lock.lock();
+        lockTime = System.nanoTime();
+        logger.trace("Obtained Lock {}", name);
+    }
+
+    /**
+     * @param task to release the lock for
+     */
+    @Override
+    public void unlock(final String task) {
+        if (lockTime <= 0L) {
+            lock.unlock();
+            return;
+        }
+
+        logger.trace("Releasing Lock {}", name);
+        final long nanosLocked = System.nanoTime() - lockTime;
+
+        Long startIterations = lockIterations.get(task);
+        if (startIterations == null) {
+            startIterations = 0L;
+        }
+        final long iterations = startIterations + 1L;
+        lockIterations.put(task, iterations);
+
+        Long startNanos = lockNanos.get(task);
+        if (startNanos == null) {
+            startNanos = 0L;
+        }
+        final long totalNanos = startNanos + nanosLocked;
+        lockNanos.put(task, totalNanos);
+
+        lockTime = -1L;
+
+        lock.unlock();
+        logger.trace("Released Lock {}", name);
+
+        if (iterations % iterationFrequency == 0) {
+            logger.debug("Lock {} held for {} nanos for task: {}; total lock 
iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, 
totalNanos);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
new file mode 100644
index 0000000..69da6e8
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+
+public interface DebuggableTimedLock {
+
+    void lock();
+
+    boolean tryLock(long timePeriod, TimeUnit timeUnit);
+
+    boolean tryLock();
+
+    void unlock(String task);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
new file mode 100644
index 0000000..532d3c3
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.concurrency;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimedLock {
+
+    private final DebugEnabledTimedLock enabled;
+    private final DebugDisabledTimedLock disabled;
+
+    private final Logger logger;
+
+    public TimedLock(final Lock lock, final String name, final int 
iterationFrequency) {
+        this.enabled = new DebugEnabledTimedLock(lock, name, 
iterationFrequency);
+        this.disabled = new DebugDisabledTimedLock(lock);
+
+        logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + 
name);
+    }
+
+    private DebuggableTimedLock getLock() {
+        return logger.isDebugEnabled() ? enabled : disabled;
+    }
+
+    public boolean tryLock() {
+        return getLock().tryLock();
+    }
+
+    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
+        return getLock().tryLock(timeout, timeUnit);
+    }
+
+    public void lock() {
+        getLock().lock();
+    }
+
+    public void unlock(final String task) {
+        getLock().unlock(task);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
new file mode 100644
index 0000000..ff4da8e
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.slf4j.Logger;
+
+/**
+ * A utility class containing a few useful static methods to do typical IO 
operations.
+ *
+ */
+public class FileUtils {
+
+    public static final long TRANSFER_CHUNK_SIZE_BYTES = 1024 * 1024 * 8; //8 
MB chunks
+    public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
+
+    /**
+     * Closes the given closeable quietly - no logging, no exceptions...
+     *
+     * @param closeable the thing to close
+     */
+    public static void closeQuietly(final Closeable closeable) {
+        if (null != closeable) {
+            try {
+                closeable.close();
+            } catch (final IOException io) {/*IGNORE*/
+
+            }
+        }
+    }
+
+    /**
+     * Releases the given lock quietly no logging, no exception
+     *
+     * @param lock the lock to release
+     */
+    public static void releaseQuietly(final FileLock lock) {
+        if (null != lock) {
+            try {
+                lock.release();
+            } catch (final IOException io) {
+                /*IGNORE*/
+            }
+        }
+    }
+
+    public static void ensureDirectoryExistAndCanAccess(final File dir) throws 
IOException {
+        if (dir.exists() && !dir.isDirectory()) {
+            throw new IOException(dir.getAbsolutePath() + " is not a 
directory");
+        } else if (!dir.exists()) {
+            final boolean made = dir.mkdirs();
+            if (!made) {
+                throw new IOException(dir.getAbsolutePath() + " could not be 
created");
+            }
+        }
+        if (!(dir.canRead() && dir.canWrite())) {
+            throw new IOException(dir.getAbsolutePath() + " directory does not 
have read/write privilege");
+        }
+    }
+
+    /**
+     * Deletes the given file. If the given file exists but could not be 
deleted this will be printed as a warning to the given logger
+     *
+     * @param file the file to delete
+     * @param logger the logger to provide logging information to about the 
operation
+     * @return true if given file no longer exists
+     */
+    public static boolean deleteFile(final File file, final Logger logger) {
+        return FileUtils.deleteFile(file, logger, 1);
+    }
+
+    /**
+     * Deletes the given file. If the given file exists but could not be 
deleted this will be printed as a warning to the given logger
+     *
+     * @param file the file to delete
+     * @param logger the logger to write to
+     * @param attempts indicates how many times an attempt to delete should be 
made
+     * @return true if given file no longer exists
+     */
+    public static boolean deleteFile(final File file, final Logger logger, 
final int attempts) {
+        if (file == null) {
+            return false;
+        }
+        boolean isGone = false;
+        try {
+            if (file.exists()) {
+                final int effectiveAttempts = Math.max(1, attempts);
+                for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+                    isGone = file.delete() || !file.exists();
+                    if (!isGone && (effectiveAttempts - i) > 1) {
+                        FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+                    }
+                }
+                if (!isGone && logger != null) {
+                    logger.warn("File appears to exist but unable to delete 
file: " + file.getAbsolutePath());
+                }
+            }
+        } catch (final Throwable t) {
+            if (logger != null) {
+                logger.warn("Unable to delete file: '" + 
file.getAbsolutePath() + "' due to " + t);
+            }
+        }
+        return isGone;
+    }
+
+    /**
+     * Deletes all of the given files. If any exist and cannot be deleted that 
will be printed at warn to the given logger.
+     *
+     * @param files can be null
+     * @param logger can be null
+     */
+    public static void deleteFile(final List<File> files, final Logger logger) 
{
+        FileUtils.deleteFile(files, logger, 1);
+    }
+
+    /**
+     * Deletes all of the given files. If any exist and cannot be deleted that 
will be printed at warn to the given logger.
+     *
+     * @param files can be null
+     * @param logger can be null
+     * @param attempts indicates how many times an attempt should be made to 
delete each file
+     */
+    public static void deleteFile(final List<File> files, final Logger logger, 
final int attempts) {
+        if (null == files || files.isEmpty()) {
+            return;
+        }
+        final int effectiveAttempts = Math.max(1, attempts);
+        for (final File file : files) {
+            try {
+                boolean isGone = false;
+                for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+                    isGone = file.delete() || !file.exists();
+                    if (!isGone && (effectiveAttempts - i) > 1) {
+                        FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+                    }
+                }
+                if (!isGone && logger != null) {
+                    logger.warn("File appears to exist but unable to delete 
file: " + file.getAbsolutePath());
+                }
+            } catch (final Throwable t) {
+                if (null != logger) {
+                    logger.warn("Unable to delete file given from path: '" + 
file.getPath() + "' due to " + t);
+                }
+            }
+        }
+    }
+
+    /**
+     * Deletes all files (not directories..) in the given directory (non 
recursive) that match the given filename filter. If any file cannot be deleted 
then this is printed at warn to the given
+     * logger.
+     *
+     * @param directory the directory to scan for files to delete
+     * @param filter if null then no filter is used
+     * @param logger the logger to use
+     */
+    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger) {
+        FileUtils.deleteFilesInDir(directory, filter, logger, false);
+    }
+
+    /**
+     * Deletes all files (not directories) in the given directory (recursive) 
that match the given filename filter. If any file cannot be deleted then this 
is printed at warn to the given logger.
+     *
+     * @param directory the directory to scan
+     * @param filter if null then no filter is used
+     * @param logger the logger to use
+     * @param recurse indicates whether to recurse subdirectories
+     */
+    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse) {
+        FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
+    }
+
+    /**
+     * Deletes all files (not directories) in the given directory (recursive) 
that match the given filename filter. If any file cannot be deleted then this 
is printed at warn to the given logger.
+     *
+     * @param directory the directory to scan
+     * @param filter if null then no filter is used
+     * @param logger the logger
+     * @param recurse whether to recurse subdirectories or not
+     * @param deleteEmptyDirectories default is false; if true will delete 
directories found that are empty
+     */
+    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse, final 
boolean deleteEmptyDirectories) {
+        // ensure the specified directory is actually a directory and that it 
exists
+        if (null != directory && directory.isDirectory()) {
+            final File ingestFiles[] = directory.listFiles();
+            for (File ingestFile : ingestFiles) {
+                boolean process = (filter == null) ? true : 
filter.accept(directory, ingestFile.getName());
+                if (ingestFile.isFile() && process) {
+                    FileUtils.deleteFile(ingestFile, logger, 3);
+                }
+                if (ingestFile.isDirectory() && recurse) {
+                    FileUtils.deleteFilesInDir(ingestFile, filter, logger, 
recurse, deleteEmptyDirectories);
+                    if (deleteEmptyDirectories && ingestFile.list().length == 
0) {
+                        FileUtils.deleteFile(ingestFile, logger, 3);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Deletes given files.
+     *
+     * @param files the files to delete
+     * @param recurse will recurse if true; false otherwise
+     * @throws IOException if any issues deleting specified files
+     */
+    public static void deleteFiles(final Collection<File> files, final boolean 
recurse) throws IOException {
+        for (final File file : files) {
+            FileUtils.deleteFile(file, recurse);
+        }
+    }
+
+    public static void deleteFile(final File file, final boolean recurse) 
throws IOException {
+        if (file.isDirectory() && recurse) {
+            FileUtils.deleteFiles(Arrays.asList(file.listFiles()), recurse);
+        }
+        //now delete the file itself regardless of whether it is plain file or 
a directory
+        if (!FileUtils.deleteFile(file, null, 5)) {
+            throw new IOException("Unable to delete " + 
file.getAbsolutePath());
+        }
+    }
+
+    /**
+     * Randomly generates a sequence of bytes and overwrites the contents of 
the file a number of times. The file is then deleted.
+     *
+     * @param file File to be overwritten a number of times and, ultimately, 
deleted
+     * @param passes Number of times file should be overwritten
+     * @throws IOException if something makes shredding or deleting a problem
+     */
+    public static void shredFile(final File file, final int passes)
+            throws IOException {
+        final Random generator = new Random();
+        final long fileLength = file.length();
+        final int byteArraySize = (int) Math.min(fileLength, 1048576); // 1MB
+        final byte[] b = new byte[byteArraySize];
+        final long numOfRandomWrites = (fileLength / b.length) + 1;
+        final FileOutputStream fos = new FileOutputStream(file);
+        try {
+            // Over write file contents (passes) times
+            final FileChannel channel = fos.getChannel();
+            for (int i = 0; i < passes; i++) {
+                generator.nextBytes(b);
+                for (int j = 0; j <= numOfRandomWrites; j++) {
+                    fos.write(b);
+                }
+                fos.flush();
+                channel.position(0);
+            }
+            // Write out "0" for each byte in the file
+            Arrays.fill(b, (byte) 0);
+            for (int j = 0; j < numOfRandomWrites; j++) {
+                fos.write(b);
+            }
+            fos.flush();
+            fos.close();
+            // Try to delete the file a few times
+            if (!FileUtils.deleteFile(file, null, 5)) {
+                throw new IOException("Failed to delete file after shredding");
+            }
+
+        } finally {
+            FileUtils.closeQuietly(fos);
+        }
+    }
+
+    public static long copy(final InputStream in, final OutputStream out) 
throws IOException {
+        final byte[] buffer = new byte[65536];
+        long copied = 0L;
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+            copied += len;
+        }
+
+        return copied;
+    }
+
+    public static long copyBytes(final byte[] bytes, final File destination, 
final boolean lockOutputFile) throws FileNotFoundException, IOException {
+        FileOutputStream fos = null;
+        FileLock outLock = null;
+        long fileSize = 0L;
+        try {
+            fos = new FileOutputStream(destination);
+            final FileChannel out = fos.getChannel();
+            if (lockOutputFile) {
+                outLock = out.tryLock(0, Long.MAX_VALUE, false);
+                if (null == outLock) {
+                    throw new IOException("Unable to obtain exclusive file 
lock for: " + destination.getAbsolutePath());
+                }
+            }
+            fos.write(bytes);
+            fos.flush();
+            fileSize = bytes.length;
+        } finally {
+            FileUtils.releaseQuietly(outLock);
+            FileUtils.closeQuietly(fos);
+        }
+        return fileSize;
+    }
+
+    /**
+     * Copies the given source file to the given destination file. The given 
destination will be overwritten if it already exists.
+     *
+     * @param source the file to copy
+     * @param destination the file to copy to
+     * @param lockInputFile if true will lock input file during copy; if false 
will not
+     * @param lockOutputFile if true will lock output file during copy; if 
false will not
+     * @param move if true will perform what is effectively a move operation 
rather than a pure copy. This allows for potentially highly efficient movement 
of the file but if not possible this will
+     * revert to a copy then delete behavior. If false, then the file is 
copied and the source file is retained. If a true rename/move occurs then no 
lock is held during that time.
+     * @param logger if failures occur, they will be logged to this logger if 
possible. If this logger is null, an IOException will instead be thrown, 
indicating the problem.
+     * @return long number of bytes copied
+     * @throws FileNotFoundException if the source file could not be found
+     * @throws IOException if unable to read or write the underlying streams
+     * @throws SecurityException if a security manager denies the needed file 
operations
+     */
+    public static long copyFile(final File source, final File destination, 
final boolean lockInputFile, final boolean lockOutputFile, final boolean move, 
final Logger logger)
+            throws FileNotFoundException, IOException {
+
+        FileInputStream fis = null;
+        FileOutputStream fos = null;
+        FileLock inLock = null;
+        FileLock outLock = null;
+        long fileSize = 0L;
+        if (!source.canRead()) {
+            throw new IOException("Must at least have read permission");
+
+        }
+        if (move && source.renameTo(destination)) {
+            fileSize = destination.length();
+        } else {
+            try {
+                fis = new FileInputStream(source);
+                fos = new FileOutputStream(destination);
+                final FileChannel in = fis.getChannel();
+                final FileChannel out = fos.getChannel();
+                if (lockInputFile) {
+                    inLock = in.tryLock(0, Long.MAX_VALUE, true);
+                    if (null == inLock) {
+                        throw new IOException("Unable to obtain shared file 
lock for: " + source.getAbsolutePath());
+                    }
+                }
+                if (lockOutputFile) {
+                    outLock = out.tryLock(0, Long.MAX_VALUE, false);
+                    if (null == outLock) {
+                        throw new IOException("Unable to obtain exclusive file 
lock for: " + destination.getAbsolutePath());
+                    }
+                }
+                long bytesWritten = 0;
+                do {
+                    bytesWritten += out.transferFrom(in, bytesWritten, 
TRANSFER_CHUNK_SIZE_BYTES);
+                    fileSize = in.size();
+                } while (bytesWritten < fileSize);
+                out.force(false);
+                FileUtils.closeQuietly(fos);
+                FileUtils.closeQuietly(fis);
+                fos = null;
+                fis = null;
+                if (move && !FileUtils.deleteFile(source, null, 5)) {
+                    if (logger == null) {
+                        FileUtils.deleteFile(destination, null, 5);
+                        throw new IOException("Could not remove file " + 
source.getAbsolutePath());
+                    } else {
+                        logger.warn("Configured to delete source file when 
renaming/move not successful.  However, unable to delete file at: " + 
source.getAbsolutePath());
+                    }
+                }
+            } finally {
+                FileUtils.releaseQuietly(inLock);
+                FileUtils.releaseQuietly(outLock);
+                FileUtils.closeQuietly(fos);
+                FileUtils.closeQuietly(fis);
+            }
+        }
+        return fileSize;
+    }
+
+    /**
+     * Copies the given source file to the given destination file. The given 
destination will be overwritten if it already exists.
+     *
+     * @param source the file to copy from
+     * @param destination the file to copy to
+     * @param lockInputFile if true will lock input file during copy; if false 
will not
+     * @param lockOutputFile if true will lock output file during copy; if 
false will not
+     * @param logger the logger to use
+     * @return long number of bytes copied
+     * @throws FileNotFoundException if the source file could not be found
+     * @throws IOException if unable to read or write to file
+     * @throws SecurityException if a security manager denies the needed file 
operations
+     */
+    public static long copyFile(final File source, final File destination, 
final boolean lockInputFile, final boolean lockOutputFile, final Logger logger) 
throws FileNotFoundException, IOException {
+        return FileUtils.copyFile(source, destination, lockInputFile, 
lockOutputFile, false, logger);
+    }
+
+    public static long copyFile(final File source, final OutputStream stream, 
final boolean closeOutputStream, final boolean lockInputFile) throws 
FileNotFoundException, IOException {
+        FileInputStream fis = null;
+        FileLock inLock = null;
+        long fileSize = 0L;
+        try {
+            fis = new FileInputStream(source);
+            final FileChannel in = fis.getChannel();
+            if (lockInputFile) {
+                inLock = in.tryLock(0, Long.MAX_VALUE, true);
+                if (inLock == null) {
+                    throw new IOException("Unable to obtain exclusive file 
lock for: " + source.getAbsolutePath());
+                }
+
+            }
+
+            byte[] buffer = new byte[1 << 18]; //256 KB
+            int bytesRead = -1;
+            while ((bytesRead = fis.read(buffer)) != -1) {
+                stream.write(buffer, 0, bytesRead);
+            }
+            in.force(false);
+            stream.flush();
+            fileSize = in.size();
+        } finally {
+            FileUtils.releaseQuietly(inLock);
+            FileUtils.closeQuietly(fis);
+            if (closeOutputStream) {
+                FileUtils.closeQuietly(stream);
+            }
+        }
+        return fileSize;
+    }
+
+    public static long copyFile(final InputStream stream, final File 
destination, final boolean closeInputStream, final boolean lockOutputFile) 
throws FileNotFoundException, IOException {
+        final Path destPath = destination.toPath();
+        final long size = Files.copy(stream, destPath);
+        if (closeInputStream) {
+            stream.close();
+        }
+        return size;
+    }
+
+    /**
+     * Renames the given file from the source path to the destination path. 
This handles multiple attempts. This should only be used to rename within a 
given directory. Renaming across directories
+     * might not work well. See the <code>File.renameTo</code> for more 
information.
+     *
+     * @param source the file to rename
+     * @param destination the file path to rename to
+     * @param maxAttempts the max number of attempts to attempt the rename
+     * @throws IOException if rename isn't successful
+     */
+    public static void renameFile(final File source, final File destination, 
final int maxAttempts) throws IOException {
+        FileUtils.renameFile(source, destination, maxAttempts, false);
+    }
+
+    /**
+     * Renames the given file from the source path to the destination path. 
This handles multiple attempts. This should only be used to rename within a 
given directory. Renaming across directories
+     * might not work well. See the <code>File.renameTo</code> for more 
information.
+     *
+     * @param source the file to rename
+     * @param destination the file path to rename to
+     * @param maxAttempts the max number of attempts to attempt the rename
+     * @param replace if true and a rename attempt fails will check if a file 
is already at the destination path. If so it will delete that file and attempt 
the rename according the remaining
+     * maxAttempts. If false, any conflicting files will be left as they were 
and the rename attempts will fail if conflicting.
+     * @throws IOException if rename isn't successful
+     */
+    public static void renameFile(final File source, final File destination, 
final int maxAttempts, final boolean replace) throws IOException {
+        final int attempts = (replace || maxAttempts < 1) ? Math.max(2, 
maxAttempts) : maxAttempts;
+        boolean renamed = false;
+        for (int i = 0; i < attempts; i++) {
+            renamed = source.renameTo(destination);
+            if (!renamed) {
+                FileUtils.deleteFile(destination, null, 5);
+            } else {
+                break; //rename has succeeded
+            }
+        }
+        if (!renamed) {
+            throw new IOException("Attempted " + maxAttempts + " times but 
unable to rename from \'" + source.getPath() + "\' to \'" + 
destination.getPath() + "\'");
+
+        }
+    }
+
+    public static void sleepQuietly(final long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (final InterruptedException ex) {
+            /* do nothing */
+        }
+    }
+
+    /**
+     * Syncs a primary copy of a file with the copy in the restore directory. 
If the restore directory does not have a file and the primary has a file, the 
the primary's file is copied to the restore
+     * directory. Else if the restore directory has a file, but the primary 
does not, then the restore's file is copied to the primary directory. Else if 
the primary file is different than the restore
+     * file, then an IllegalStateException is thrown. Otherwise, if neither 
file exists, then no syncing is performed.
+     *
+     * @param primaryFile the primary file
+     * @param restoreFile the restore file
+     * @param logger a logger
+     * @throws IOException if an I/O problem was encountered during syncing
+     * @throws IllegalStateException if the primary and restore copies exist 
but are different
+     */
+    public static void syncWithRestore(final File primaryFile, final File 
restoreFile, final Logger logger)
+            throws IOException {
+
+        if (primaryFile.exists() && !restoreFile.exists()) {
+            // copy primary file to restore
+            copyFile(primaryFile, restoreFile, false, false, logger);
+        } else if (restoreFile.exists() && !primaryFile.exists()) {
+            // copy restore file to primary
+            copyFile(restoreFile, primaryFile, false, false, logger);
+        } else if (primaryFile.exists() && restoreFile.exists() && 
!isSame(primaryFile, restoreFile)) {
+            throw new IllegalStateException(String.format("Primary file '%s' 
is different than restore file '%s'",
+                    primaryFile.getAbsoluteFile(), 
restoreFile.getAbsolutePath()));
+        }
+    }
+
+    /**
+     * Returns true if the given files are the same according to their MD5 
hash.
+     *
+     * @param file1 a file
+     * @param file2 a file
+     * @return true if the files are the same; false otherwise
+     * @throws IOException if the MD5 hash could not be computed
+     */
+    public static boolean isSame(final File file1, final File file2) throws 
IOException {
+        return Arrays.equals(computeMd5Digest(file1), computeMd5Digest(file2));
+    }
+
+    /**
+     * Returns the MD5 hash of the given file.
+     *
+     * @param file a file
+     * @return the MD5 hash
+     * @throws IOException if the MD5 hash could not be computed
+     */
+    public static byte[] computeMd5Digest(final File file) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(file)) {
+            return computeMd5Digest(fis);
+        }
+    }
+
+    /**
+     * Returns the MD5 hash of the given stream.
+     *
+     * @param stream an input stream
+     * @return the MD5 hash
+     * @throws IOException if the MD5 hash could not be computed
+     */
+    public static byte[] computeMd5Digest(final InputStream stream) throws 
IOException {
+        final MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException nsae) {
+            throw new IOException(nsae);
+        }
+
+
+        int len;
+        final byte[] buffer = new byte[8192];
+        while ((len = stream.read(buffer)) > -1) {
+            if (len > 0) {
+                digest.update(buffer, 0, len);
+            }
+        }
+
+        return digest.digest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
new file mode 100644
index 0000000..dc60318
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/CompoundUpdateMonitor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s 
such that it will indicate a change in a file only if ALL sub-monitors indicate 
a change. The sub-monitors will be
+ * applied in the order given and if any indicates that the state has not 
changed, the subsequent sub-monitors may not be given a chance to run
+ */
+public class CompoundUpdateMonitor implements UpdateMonitor {
+
+    private final List<UpdateMonitor> monitors;
+
+    public CompoundUpdateMonitor(final UpdateMonitor first, final 
UpdateMonitor... others) {
+        monitors = new ArrayList<>();
+        monitors.add(first);
+        for (final UpdateMonitor monitor : others) {
+            monitors.add(monitor);
+        }
+    }
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        return new DeferredMonitorAction(monitors, path);
+    }
+
+    private static class DeferredMonitorAction {
+
+        private static final Object NON_COMPUTED_VALUE = new Object();
+
+        private final List<UpdateMonitor> monitors;
+        private final Path path;
+
+        private final Object[] preCalculated;
+
+        public DeferredMonitorAction(final List<UpdateMonitor> monitors, final 
Path path) {
+            this.monitors = monitors;
+            this.path = path;
+            preCalculated = new Object[monitors.size()];
+
+            for (int i = 0; i < preCalculated.length; i++) {
+                preCalculated[i] = NON_COMPUTED_VALUE;
+            }
+        }
+
+        private Object getCalculatedValue(final int i) throws IOException {
+            if (preCalculated[i] == NON_COMPUTED_VALUE) {
+                preCalculated[i] = monitors.get(i).getCurrentState(path);
+            }
+
+            return preCalculated[i];
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            // must return true unless ALL DeferredMonitorAction's indicate 
that they are different
+            if (obj == null) {
+                return false;
+            }
+
+            if (!(obj instanceof DeferredMonitorAction)) {
+                return false;
+            }
+
+            final DeferredMonitorAction other = (DeferredMonitorAction) obj;
+            try {
+                // Go through each UpdateMonitor's value and check if the 
value has changed.
+                for (int i = 0; i < preCalculated.length; i++) {
+                    final Object mine = getCalculatedValue(i);
+                    final Object theirs = other.getCalculatedValue(i);
+
+                    if (mine == theirs) {
+                        // same
+                        return true;
+                    }
+
+                    if (mine == null && theirs == null) {
+                        // same
+                        return true;
+                    }
+
+                    if (mine.equals(theirs)) {
+                        return true;
+                    }
+                }
+            } catch (final IOException e) {
+                return false;
+            }
+
+            // No DeferredMonitorAction was the same as last time. Therefore, 
it's not equal
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
new file mode 100644
index 0000000..e6be558
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/LastModifiedMonitor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class LastModifiedMonitor implements UpdateMonitor {
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        return Files.getLastModifiedTime(path);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
new file mode 100644
index 0000000..8dea4bf
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/MD5SumMonitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class MD5SumMonitor implements UpdateMonitor {
+
+    @Override
+    public Object getCurrentState(final Path path) throws IOException {
+        final MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException nsae) {
+            throw new AssertionError(nsae);
+        }
+
+        try (final FileInputStream fis = new FileInputStream(path.toFile())) {
+            int len;
+            final byte[] buffer = new byte[8192];
+            while ((len = fis.read(buffer)) > -1) {
+                if (len > 0) {
+                    digest.update(buffer, 0, len);
+                }
+            }
+        }
+
+        // Return a ByteBuffer instead of byte[] because we want equals() to 
do a deep equality
+        return ByteBuffer.wrap(digest.digest());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
new file mode 100644
index 0000000..0040037
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/SynchronousFileWatcher.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Allows the user to configure a {@link java.nio.file.Path Path} to watch for 
modifications and periodically poll to check if the file has been modified
+ */
+public class SynchronousFileWatcher {
+
+    private final Path path;
+    private final long checkUpdateMillis;
+    private final UpdateMonitor monitor;
+    private final AtomicReference<StateWrapper> lastState;
+    private final Lock resourceLock = new ReentrantLock();
+
+    public SynchronousFileWatcher(final Path path, final UpdateMonitor 
monitor) {
+        this(path, monitor, 0L);
+    }
+
+    public SynchronousFileWatcher(final Path path, final UpdateMonitor 
monitor, final long checkMillis) {
+        if (checkMillis < 0) {
+            throw new IllegalArgumentException();
+        }
+
+        this.path = path;
+        checkUpdateMillis = checkMillis;
+        this.monitor = monitor;
+
+        Object currentState;
+        try {
+            currentState = monitor.getCurrentState(path);
+        } catch (final IOException e) {
+            currentState = null;
+        }
+
+        this.lastState = new AtomicReference<>(new StateWrapper(currentState));
+    }
+
+    /**
+     * Checks if the file has been updated according to the configured {@link 
UpdateMonitor} and resets the state
+     *
+     * @return true if updated; false otherwise
+     * @throws IOException if failure occurs checking for changes
+     */
+    public boolean checkAndReset() throws IOException {
+        if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always 
check
+            return checkForUpdate();
+        } else {
+            final StateWrapper stateWrapper = lastState.get();
+            if (stateWrapper.getTimestamp() < System.currentTimeMillis() - 
checkUpdateMillis) {
+                return checkForUpdate();
+            }
+            return false;
+        }
+    }
+
+    private boolean checkForUpdate() throws IOException {
+        if (resourceLock.tryLock()) {
+            try {
+                final StateWrapper wrapper = lastState.get();
+                final Object newState = monitor.getCurrentState(path);
+                if (newState == null && wrapper.getState() == null) {
+                    return false;
+                }
+                if (newState == null || wrapper.getState() == null) {
+                    lastState.set(new StateWrapper(newState));
+                    return true;
+                }
+
+                final boolean unmodified = newState.equals(wrapper.getState());
+                if (!unmodified) {
+                    lastState.set(new StateWrapper(newState));
+                }
+                return !unmodified;
+            } finally {
+                resourceLock.unlock();
+            }
+        } else {
+            return false;
+        }
+    }
+
+    private static class StateWrapper {
+
+        private final Object state;
+        private final long timestamp;
+
+        public StateWrapper(final Object state) {
+            this.state = state;
+            this.timestamp = System.currentTimeMillis();
+        }
+
+        public Object getState() {
+            return state;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
new file mode 100644
index 0000000..20ed1dd
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/monitor/UpdateMonitor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public interface UpdateMonitor {
+
+    Object getCurrentState(Path path) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
new file mode 100644
index 0000000..b407c4d
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/Search.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import org.apache.nifi.util.search.ahocorasick.SearchState;
+
+/**
+ * Defines an interface to search for content given a set of search terms. Any 
implementation of search must be thread safe.
+ *
+ */
+public interface Search<T> {
+
+    /**
+     * Establishes the dictionary of terms which will be searched in 
subsequent search calls. This can be called only once
+     *
+     * @param terms the terms to create a dictionary of
+     */
+    void initializeDictionary(Set<SearchTerm<T>> terms);
+
+    /**
+     * Searches the given input stream for matches between the already 
specified dictionary and the contents scanned.
+     *
+     * @param haystack the source data to scan for hits
+     * @param findAll if true will find all matches if false will find only 
the first match
+     * @return SearchState containing results Map might be empty which 
indicates no matches found but will not be null
+     * @throws IOException Thrown for any exceptions occurring while searching.
+     * @throws IllegalStateException if the dictionary has not yet been 
initialized
+     */
+    SearchState<T> search(InputStream haystack, boolean findAll) throws 
IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
new file mode 100644
index 0000000..48f8678
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * This is an immutable thread safe object representing a search term
+ *
+ */
+public class SearchTerm<T> {
+
+    private final byte[] bytes;
+    private final int hashCode;
+    private final T reference;
+
+    /**
+     * Constructs a SearchTerm. Defensively copies the given byte array
+     *
+     * @param bytes the bytes of the search term
+     * @throws IllegalArgumentException if given bytes are null or 0 length
+     */
+    public SearchTerm(final byte[] bytes) {
+        this(bytes, true, null);
+    }
+
+    /**
+     * Constructs a search term. Optionally performs a defensive copy of the 
given byte array. If the caller indicates a defensive copy is not necessary 
then they must not change the given arrays
+     * state any longer
+     *
+     * @param bytes the bytes of the new search term
+     * @param defensiveCopy if true will make a defensive copy; false otherwise
+     * @param reference a holder for an object which can be retrieved when 
this search term hits
+     */
+    public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T 
reference) {
+        if (bytes == null || bytes.length == 0) {
+            throw new IllegalArgumentException();
+        }
+        if (defensiveCopy) {
+            this.bytes = Arrays.copyOf(bytes, bytes.length);
+        } else {
+            this.bytes = bytes;
+        }
+        this.hashCode = Arrays.hashCode(this.bytes);
+        this.reference = reference;
+    }
+
+    public int get(final int index) {
+        return bytes[index] & 0xff;
+    }
+
+    /**
+     * @return size in of search term in bytes
+     */
+    public int size() {
+        return bytes.length;
+    }
+
+    /**
+     * @return reference object for this given search term
+     */
+    public T getReference() {
+        return reference;
+    }
+
+    /**
+     * Determines if the given window starts with the same bytes as this term
+     *
+     * @param window bytes from the haystack being evaluated
+     * @param windowLength The length of the window to consider
+     * @return true if this term starts with the same bytes of the given window
+     */
+    public boolean startsWith(byte[] window, int windowLength) {
+        if (windowLength > window.length) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (bytes.length < windowLength) {
+            return false;
+        }
+        for (int i = 0; i < bytes.length && i < windowLength; i++) {
+            if (bytes[i] != window[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * @return a defensive copy of the internal byte structure
+     */
+    public byte[] getBytes() {
+        return Arrays.copyOf(bytes, bytes.length);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final SearchTerm other = (SearchTerm) obj;
+        if (this.hashCode != other.hashCode) {
+            return false;
+        }
+        return Arrays.equals(this.bytes, other.bytes);
+    }
+
+    @Override
+    public String toString() {
+        return new String(bytes);
+    }
+
+    public String toString(final Charset charset) {
+        return new String(bytes, charset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
new file mode 100644
index 0000000..3b8afaf
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.nifi.util.search.Search;
+import org.apache.nifi.util.search.SearchTerm;
+
+public class AhoCorasick<T> implements Search<T> {
+
+    private Node root = null;
+
+    /**
+     * Constructs a new search object.
+     *
+     * @throws IllegalArgumentException if given terms are null or empty
+     */
+    public AhoCorasick() {
+    }
+
+    @Override
+    public void initializeDictionary(final Set<SearchTerm<T>> terms) {
+        if (root != null) {
+            throw new IllegalStateException();
+        }
+        root = new Node();
+        if (terms == null || terms.isEmpty()) {
+            throw new IllegalArgumentException();
+        }
+        for (final SearchTerm<T> term : terms) {
+            int i = 0;
+            Node nextNode = root;
+            while (true) {
+                nextNode = addMatch(term, i, nextNode);
+                if (nextNode == null) {
+                    break; //we're done
+                }
+                i++;
+            }
+        }
+        initialize();
+    }
+
+    private Node addMatch(final SearchTerm<T> term, final int offset, final 
Node current) {
+        final int index = term.get(offset);
+        boolean atEnd = (offset == (term.size() - 1));
+        if (current.getNeighbor(index) == null) {
+            if (atEnd) {
+                current.setNeighbor(new Node(term), index);
+                return null;
+            }
+            current.setNeighbor(new Node(), index);
+        } else if (atEnd) {
+            current.getNeighbor(index).setMatchingTerm(term);
+            return null;
+        }
+        return current.getNeighbor(index);
+    }
+
+    private void initialize() {
+        //perform bgs to build failure links
+        final Queue<Node> queue = new LinkedList<>();
+        queue.add(root);
+        root.setFailureNode(null);
+        while (!queue.isEmpty()) {
+            final Node current = queue.poll();
+            for (int i = 0; i < 256; i++) {
+                final Node next = current.getNeighbor(i);
+                if (next != null) {
+                    //traverse failure to get state
+                    Node fail = current.getFailureNode();
+                    while ((fail != null) && fail.getNeighbor(i) == null) {
+                        fail = fail.getFailureNode();
+                    }
+                    if (fail != null) {
+                        next.setFailureNode(fail.getNeighbor(i));
+                    } else {
+                        next.setFailureNode(root);
+                    }
+                    queue.add(next);
+                }
+            }
+        }
+    }
+
+    @Override
+    public SearchState search(final InputStream stream, final boolean findAll) 
throws IOException {
+        return search(stream, findAll, null);
+    }
+
+    private SearchState search(final InputStream stream, final boolean 
findAll, final SearchState state) throws IOException {
+        if (root == null) {
+            throw new IllegalStateException();
+        }
+        final SearchState<T> currentState = (state == null) ? new 
SearchState(root) : state;
+        if (!findAll && currentState.foundMatch()) {
+            throw new IllegalStateException("A match has already been found 
yet we're being asked to keep searching");
+        }
+        Node current = currentState.getCurrentNode();
+        int currentChar;
+        while ((currentChar = stream.read()) >= 0) {
+            currentState.incrementBytesRead(1L);
+            Node next = current.getNeighbor(currentChar);
+            if (next == null) {
+                next = current.getFailureNode();
+                while ((next != null) && next.getNeighbor(currentChar) == 
null) {
+                    next = next.getFailureNode();
+                }
+                if (next != null) {
+                    next = next.getNeighbor(currentChar);
+                } else {
+                    next = root;
+                }
+            }
+            if (next == null) {
+                throw new IllegalStateException("tree out of sync");
+            }
+            //Accept condition
+            if (next.hasMatch()) {
+                currentState.addResult(next.getMatchingTerm());
+            }
+            for (Node failNode = next.getFailureNode(); failNode != null; 
failNode = failNode.getFailureNode()) {
+                if (failNode.hasMatch()) {
+                    currentState.addResult(failNode.getMatchingTerm());
+                }
+            }
+            current = next;
+            if (currentState.foundMatch() && !findAll) {
+                break;//give up as soon as we have at least one match
+            }
+        }
+        currentState.setCurrentNode(current);
+        return currentState;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
new file mode 100644
index 0000000..d61ae6f
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+/**
+ *
+ */
+public class Node {
+
+    private final Map<Integer, Node> neighborMap;
+    private Node failureNode;
+    private SearchTerm<?> term;
+
+    Node(final SearchTerm<?> term) {
+        this();
+        this.term = term;
+    }
+
+    Node() {
+        neighborMap = new HashMap<>();
+        term = null;
+    }
+
+    void setFailureNode(final Node fail) {
+        failureNode = fail;
+    }
+
+    public Node getFailureNode() {
+        return failureNode;
+    }
+
+    public boolean hasMatch() {
+        return term != null;
+    }
+
+    void setMatchingTerm(final SearchTerm<?> term) {
+        this.term = term;
+    }
+
+    public SearchTerm<?> getMatchingTerm() {
+        return term;
+    }
+
+    public Node getNeighbor(final int index) {
+        return neighborMap.get(index);
+    }
+
+    void setNeighbor(final Node neighbor, final int index) {
+        neighborMap.put(index, neighbor);
+    }
+
+}

Reply via email to