http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
new file mode 100644
index 0000000..e205996
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.DataOutput;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * This class is different from java.io.DataOutputStream in that it does 
synchronize on its methods.
+ */
+public class DataOutputStream extends FilterOutputStream implements DataOutput 
{
+
+    /**
+     * The number of bytes written to the data output stream so far. If this 
counter overflows, it will be wrapped to Integer.MAX_VALUE.
+     */
+    protected int written;
+
+    /**
+     * bytearr is initialized on demand by writeUTF
+     */
+    private byte[] bytearr = null;
+
+    /**
+     * Creates a new data output stream to write data to the specified 
underlying output stream. The counter <code>written</code> is set to zero.
+     *
+     * @param out the underlying output stream, to be saved for later use.
+     * @see java.io.FilterOutputStream#out
+     */
+    public DataOutputStream(OutputStream out) {
+        super(out);
+    }
+
+    /**
+     * Increases the written counter by the specified value until it reaches 
Integer.MAX_VALUE.
+     */
+    private void incCount(int value) {
+        int temp = written + value;
+        if (temp < 0) {
+            temp = Integer.MAX_VALUE;
+        }
+        written = temp;
+    }
+
+    /**
+     * Writes the specified byte (the low eight bits of the argument 
<code>b</code>) to the underlying output stream. If no exception is thrown, the 
counter <code>written</code> is incremented by
+     * <code>1</code>.
+     * <p>
+     * Implements the <code>write</code> method of <code>OutputStream</code>.
+     *
+     * @param b the <code>byte</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+        incCount(1);
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at 
offset <code>off</code> to the underlying output stream. If no exception is 
thrown, the counter <code>written</code> is
+     * incremented by <code>len</code>.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        out.write(b, off, len);
+        incCount(len);
+    }
+
+    /**
+     * Flushes this data output stream. This forces any buffered output bytes 
to be written out to the stream.
+     * <p>
+     * The <code>flush</code> method of <code>DataOutputStream</code> calls 
the <code>flush</code> method of its underlying output stream.
+     *
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     * @see java.io.OutputStream#flush()
+     */
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    /**
+     * Writes a <code>boolean</code> to the underlying output stream as a 
1-byte value. The value <code>true</code> is written out as the value 
<code>(byte)1</code>; the value <code>false</code> is
+     * written out as the value <code>(byte)0</code>. If no exception is 
thrown, the counter <code>written</code> is incremented by <code>1</code>.
+     *
+     * @param v a <code>boolean</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeBoolean(boolean v) throws IOException {
+        out.write(v ? 1 : 0);
+        incCount(1);
+    }
+
+    /**
+     * Writes out a <code>byte</code> to the underlying output stream as a 
1-byte value. If no exception is thrown, the counter <code>written</code> is 
incremented by <code>1</code>.
+     *
+     * @param v a <code>byte</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeByte(int v) throws IOException {
+        out.write(v);
+        incCount(1);
+    }
+
+    /**
+     * Writes a <code>short</code> to the underlying output stream as two 
bytes, high byte first. If no exception is thrown, the counter 
<code>written</code> is incremented by <code>2</code>.
+     *
+     * @param v a <code>short</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeShort(int v) throws IOException {
+        out.write((v >>> 8) & 0xFF);
+        out.write((v) & 0xFF);
+        incCount(2);
+    }
+
+    /**
+     * Writes a <code>char</code> to the underlying output stream as a 2-byte 
value, high byte first. If no exception is thrown, the counter 
<code>written</code> is incremented by <code>2</code>.
+     *
+     * @param v a <code>char</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeChar(int v) throws IOException {
+        out.write((v >>> 8) & 0xFF);
+        out.write((v) & 0xFF);
+        incCount(2);
+    }
+
+    /**
+     * Writes an <code>int</code> to the underlying output stream as four 
bytes, high byte first. If no exception is thrown, the counter 
<code>written</code> is incremented by <code>4</code>.
+     *
+     * @param v an <code>int</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeInt(int v) throws IOException {
+        out.write((v >>> 24) & 0xFF);
+        out.write((v >>> 16) & 0xFF);
+        out.write((v >>> 8) & 0xFF);
+        out.write((v) & 0xFF);
+        incCount(4);
+    }
+
+    private final byte writeBuffer[] = new byte[8];
+
+    /**
+     * Writes a <code>long</code> to the underlying output stream as eight 
bytes, high byte first. In no exception is thrown, the counter 
<code>written</code> is incremented by <code>8</code>.
+     *
+     * @param v a <code>long</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeLong(long v) throws IOException {
+        writeBuffer[0] = (byte) (v >>> 56);
+        writeBuffer[1] = (byte) (v >>> 48);
+        writeBuffer[2] = (byte) (v >>> 40);
+        writeBuffer[3] = (byte) (v >>> 32);
+        writeBuffer[4] = (byte) (v >>> 24);
+        writeBuffer[5] = (byte) (v >>> 16);
+        writeBuffer[6] = (byte) (v >>> 8);
+        writeBuffer[7] = (byte) (v);
+        out.write(writeBuffer, 0, 8);
+        incCount(8);
+    }
+
+    /**
+     * Converts the float argument to an <code>int</code> using the 
<code>floatToIntBits</code> method in class <code>Float</code>, and then writes 
that <code>int</code> value to the underlying output
+     * stream as a 4-byte quantity, high byte first. If no exception is 
thrown, the counter <code>written</code> is incremented by <code>4</code>.
+     *
+     * @param v a <code>float</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     * @see java.lang.Float#floatToIntBits(float)
+     */
+    @Override
+    public final void writeFloat(float v) throws IOException {
+        writeInt(Float.floatToIntBits(v));
+    }
+
+    /**
+     * Converts the double argument to a <code>long</code> using the 
<code>doubleToLongBits</code> method in class <code>Double</code>, and then 
writes that <code>long</code> value to the underlying
+     * output stream as an 8-byte quantity, high byte first. If no exception 
is thrown, the counter <code>written</code> is incremented by <code>8</code>.
+     *
+     * @param v a <code>double</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     * @see java.lang.Double#doubleToLongBits(double)
+     */
+    @Override
+    public final void writeDouble(double v) throws IOException {
+        writeLong(Double.doubleToLongBits(v));
+    }
+
+    /**
+     * Writes out the string to the underlying output stream as a sequence of 
bytes. Each character in the string is written out, in sequence, by discarding 
its high eight bits. If no exception is
+     * thrown, the counter <code>written</code> is incremented by the length 
of <code>s</code>.
+     *
+     * @param s a string of bytes to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeBytes(String s) throws IOException {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            out.write((byte) s.charAt(i));
+        }
+        incCount(len);
+    }
+
+    /**
+     * Writes a string to the underlying output stream as a sequence of 
characters. Each character is written to the data output stream as if by the 
<code>writeChar</code> method. If no exception is
+     * thrown, the counter <code>written</code> is incremented by twice the 
length of <code>s</code>.
+     *
+     * @param s a <code>String</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.DataOutputStream#writeChar(int)
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeChars(String s) throws IOException {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            int v = s.charAt(i);
+            out.write((v >>> 8) & 0xFF);
+            out.write((v) & 0xFF);
+        }
+        incCount(len * 2);
+    }
+
+    /**
+     * Writes a string to the underlying output stream using
+     * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
+     * encoding in a machine-independent manner.
+     * <p>
+     * First, two bytes are written to the output stream as if by the 
<code>writeShort</code> method giving the number of bytes to follow. This value 
is the number of bytes actually written out, not
+     * the length of the string. Following the length, each character of the 
string is output, in sequence, using the modified UTF-8 encoding for the 
character. If no exception is thrown, the counter
+     * <code>written</code> is incremented by the total number of bytes 
written to the output stream. This will be at least two plus the length of 
<code>str</code>, and at most two plus thrice the
+     * length of <code>str</code>.
+     *
+     * @param str a string to be written.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public final void writeUTF(String str) throws IOException {
+        writeUTF(str, this);
+    }
+
+    /**
+     * Writes a string to the specified DataOutput using
+     * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
+     * encoding in a machine-independent manner.
+     * <p>
+     * First, two bytes are written to out as if by the 
<code>writeShort</code> method giving the number of bytes to follow. This value 
is the number of bytes actually written out, not the length of
+     * the string. Following the length, each character of the string is 
output, in sequence, using the modified UTF-8 encoding for the character. If no 
exception is thrown, the counter
+     * <code>written</code> is incremented by the total number of bytes 
written to the output stream. This will be at least two plus the length of 
<code>str</code>, and at most two plus thrice the
+     * length of <code>str</code>.
+     *
+     * @param str a string to be written.
+     * @param out destination to write to
+     * @return The number of bytes written out.
+     * @exception IOException if an I/O error occurs.
+     */
+    static int writeUTF(String str, DataOutput out) throws IOException {
+        int strlen = str.length();
+        int utflen = 0;
+        int c, count = 0;
+
+        /* use charAt instead of copying String to char array */
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                utflen++;
+            } else if (c > 0x07FF) {
+                utflen += 3;
+            } else {
+                utflen += 2;
+            }
+        }
+
+        if (utflen > 65535) {
+            throw new UTFDataFormatException("encoded string too long: " + 
utflen + " bytes");
+        }
+
+        byte[] bytearr = null;
+        if (out instanceof DataOutputStream) {
+            DataOutputStream dos = (DataOutputStream) out;
+            if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
+                dos.bytearr = new byte[(utflen * 2) + 2];
+            }
+            bytearr = dos.bytearr;
+        } else {
+            bytearr = new byte[utflen + 2];
+        }
+
+        bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+        bytearr[count++] = (byte) ((utflen) & 0xFF);
+
+        int i = 0;
+        for (i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if (!((c >= 0x0001) && (c <= 0x007F))) {
+                break;
+            }
+            bytearr[count++] = (byte) c;
+        }
+
+        for (; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                bytearr[count++] = (byte) c;
+
+            } else if (c > 0x07FF) {
+                bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+                bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
+            } else {
+                bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+                bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
+            }
+        }
+        out.write(bytearr, 0, utflen + 2);
+        return utflen + 2;
+    }
+
+    /**
+     * Returns the current value of the counter <code>written</code>, the 
number of bytes written to this data output stream so far. If the counter 
overflows, it will be wrapped to Integer.MAX_VALUE.
+     *
+     * @return the value of the <code>written</code> field.
+     * @see java.io.DataOutputStream#written
+     */
+    public final int size() {
+        return written;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
new file mode 100644
index 0000000..1e2f3c7
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * <p>
+ * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing 
the constructor to provide a compression level, and uses a default value of 1, 
rather than 5.
+ * </p>
+ */
+public class GZIPOutputStream extends java.util.zip.GZIPOutputStream {
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+
+    public GZIPOutputStream(final OutputStream out) throws IOException {
+        this(out, DEFAULT_COMPRESSION_LEVEL);
+    }
+
+    public GZIPOutputStream(final OutputStream out, final int 
compressionLevel) throws IOException {
+        super(out);
+        def.setLevel(compressionLevel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
new file mode 100644
index 0000000..5153db5
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LeakyBucketStreamThrottler implements StreamThrottler {
+
+    private final int maxBytesPerSecond;
+    private final BlockingQueue<Request> requestQueue = new 
LinkedBlockingQueue<Request>();
+    private final ScheduledExecutorService executorService;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+    public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
+        this.maxBytesPerSecond = maxBytesPerSecond;
+
+        executorService = Executors.newSingleThreadScheduledExecutor();
+        final Runnable task = new Drain();
+        executorService.scheduleAtFixedRate(task, 0, 1000, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        this.shutdown.set(true);
+
+        executorService.shutdown();
+        try {
+            // Should not take more than 2 seconds because we run every 
second. If it takes more than
+            // 2 seconds, it is because the Runnable thread is blocking on a 
write; in this case,
+            // we will just ignore it and return
+            executorService.awaitTermination(2, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    @Override
+    public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
+        return new OutputStream() {
+            @Override
+            public void write(final int b) throws IOException {
+                write(new byte[]{(byte) b}, 0, 1);
+            }
+
+            @Override
+            public void write(byte[] b) throws IOException {
+                write(b, 0, b.length);
+            }
+
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                final InputStream in = new ByteArrayInputStream(b, off, len);
+                LeakyBucketStreamThrottler.this.copy(in, toWrap);
+            }
+
+            @Override
+            public void close() throws IOException {
+                toWrap.close();
+            }
+
+            @Override
+            public void flush() throws IOException {
+                toWrap.flush();
+            }
+        };
+    }
+
+    @Override
+    public InputStream newThrottledInputStream(final InputStream toWrap) {
+        return new InputStream() {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            @Override
+            public int read() throws IOException {
+                final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(1);
+                LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
+                if (baos.getBufferLength() < 1) {
+                    return -1;
+                }
+
+                return baos.getUnderlyingBuffer()[0] & 0xFF;
+            }
+
+            @Override
+            public int read(final byte[] b) throws IOException {
+                if (b.length == 0) {
+                    return 0;
+                }
+                return read(b, 0, b.length);
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                if (len < 0) {
+                    throw new IllegalArgumentException();
+                }
+                if (len == 0) {
+                    return 0;
+                }
+
+                baos.reset();
+                final int copied = (int) 
LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
+                if (copied == 0) {
+                    return -1;
+                }
+                System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, 
copied);
+                return copied;
+            }
+
+            @Override
+            public void close() throws IOException {
+                toWrap.close();
+            }
+
+            @Override
+            public int available() throws IOException {
+                return toWrap.available();
+            }
+        };
+    }
+
+    @Override
+    public long copy(final InputStream in, final OutputStream out) throws 
IOException {
+        return copy(in, out, -1);
+    }
+
+    @Override
+    public long copy(final InputStream in, final OutputStream out, final long 
maxBytes) throws IOException {
+        long totalBytesCopied = 0;
+        boolean finished = false;
+        while (!finished) {
+            final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes 
- totalBytesCopied;
+            final Request request = new Request(in, out, requestMax);
+            boolean transferred = false;
+            while (!transferred) {
+                if (shutdown.get()) {
+                    throw new IOException("Throttler shutdown");
+                }
+
+                try {
+                    transferred = requestQueue.offer(request, 1000, 
TimeUnit.MILLISECONDS);
+                } catch (final InterruptedException e) {
+                    throw new IOException("Interrupted", e);
+                }
+            }
+
+            final BlockingQueue<Response> responseQueue = 
request.getResponseQueue();
+            Response response = null;
+            while (response == null) {
+                try {
+                    if (shutdown.get()) {
+                        throw new IOException("Throttler shutdown");
+                    }
+                    response = responseQueue.poll(1000L, 
TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new IOException("Interrupted", e);
+                }
+            }
+
+            if (!response.isSuccess()) {
+                throw response.getError();
+            }
+
+            totalBytesCopied += response.getBytesCopied();
+            finished = (response.getBytesCopied() == 0) || (totalBytesCopied 
>= maxBytes && maxBytes > 0);
+        }
+
+        return totalBytesCopied;
+    }
+
+    /**
+     * This class is responsible for draining water from the leaky bucket. 
I.e., it actually moves the data
+     */
+    private class Drain implements Runnable {
+
+        private final byte[] buffer;
+
+        public Drain() {
+            final int bufferSize = Math.min(4096, maxBytesPerSecond);
+            buffer = new byte[bufferSize];
+        }
+
+        @Override
+        public void run() {
+            final long start = System.currentTimeMillis();
+
+            int bytesTransferred = 0;
+            while (bytesTransferred < maxBytesPerSecond) {
+                final long maxMillisToWait = 1000 - 
(System.currentTimeMillis() - start);
+                if (maxMillisToWait < 1) {
+                    return;
+                }
+
+                try {
+                    final Request request = requestQueue.poll(maxMillisToWait, 
TimeUnit.MILLISECONDS);
+                    if (request == null) {
+                        return;
+                    }
+
+                    final BlockingQueue<Response> responseQueue = 
request.getResponseQueue();
+
+                    final OutputStream out = request.getOutputStream();
+                    final InputStream in = request.getInputStream();
+
+                    try {
+                        final long requestMax = request.getMaxBytesToCopy();
+                        long maxBytesToTransfer;
+                        if (requestMax < 0) {
+                            maxBytesToTransfer = Math.min(buffer.length, 
maxBytesPerSecond - bytesTransferred);
+                        } else {
+                            maxBytesToTransfer = Math.min(requestMax,
+                                    Math.min(buffer.length, maxBytesPerSecond 
- bytesTransferred));
+                        }
+                        maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
+
+                        final int bytesCopied = fillBuffer(in, 
maxBytesToTransfer);
+                        out.write(buffer, 0, bytesCopied);
+
+                        final Response response = new Response(true, 
bytesCopied);
+                        responseQueue.put(response);
+                        bytesTransferred += bytesCopied;
+                    } catch (final IOException e) {
+                        final Response response = new Response(e);
+                        responseQueue.put(response);
+                    }
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        private int fillBuffer(final InputStream in, final long maxBytes) 
throws IOException {
+            int bytesRead = 0;
+            int len;
+            while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, 
(int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
+                bytesRead += len;
+            }
+
+            return bytesRead;
+        }
+    }
+
+    private static class Response {
+
+        private final boolean success;
+        private final IOException error;
+        private final int bytesCopied;
+
+        public Response(final boolean success, final int bytesCopied) {
+            this.success = success;
+            this.bytesCopied = bytesCopied;
+            this.error = null;
+        }
+
+        public Response(final IOException error) {
+            this.success = false;
+            this.error = error;
+            this.bytesCopied = -1;
+        }
+
+        public boolean isSuccess() {
+            return success;
+        }
+
+        public IOException getError() {
+            return error;
+        }
+
+        public int getBytesCopied() {
+            return bytesCopied;
+        }
+    }
+
+    private static class Request {
+
+        private final OutputStream out;
+        private final InputStream in;
+        private final long maxBytesToCopy;
+        private final BlockingQueue<Response> responseQueue;
+
+        public Request(final InputStream in, final OutputStream out, final 
long maxBytesToCopy) {
+            this.out = out;
+            this.in = in;
+            this.maxBytesToCopy = maxBytesToCopy;
+            this.responseQueue = new LinkedBlockingQueue<Response>(1);
+        }
+
+        public BlockingQueue<Response> getResponseQueue() {
+            return this.responseQueue;
+        }
+
+        public OutputStream getOutputStream() {
+            return out;
+        }
+
+        public InputStream getInputStream() {
+            return in;
+        }
+
+        public long getMaxBytesToCopy() {
+            return maxBytesToCopy;
+        }
+
+        @Override
+        public String toString() {
+            return "Request[maxBytes=" + maxBytesToCopy + "]";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..a657030
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+
+    public long getLimit() {
+        return limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
new file mode 100644
index 0000000..44e9c2e
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An InputStream that will throw EOFException if the underlying InputStream 
runs out of data before reaching the configured minimum amount of data
+ */
+public class MinimumLengthInputStream extends FilterInputStream {
+
+    private final long minLength;
+    private long consumedCount = 0L;
+
+    public MinimumLengthInputStream(final InputStream in, final long 
minLength) {
+        super(in);
+        this.minLength = minLength;
+    }
+
+    @Override
+    public int read() throws IOException {
+        final int b = super.read();
+        if (b < 0 && consumedCount < minLength) {
+            throw new EOFException();
+        }
+
+        if (b >= 0) {
+            consumedCount++;
+        }
+
+        return b;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        final int num = super.read(b, off, len);
+
+        if (num < 0 && consumedCount < minLength) {
+            throw new EOFException();
+        }
+
+        if (num >= 0) {
+            consumedCount += num;
+        }
+
+        return num;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        long skipped = super.skip(n);
+        if (skipped < 1) {
+            final int b = super.read();
+            if (b >= 0) {
+                skipped = 1;
+            }
+        }
+
+        if (skipped < 0 && consumedCount < minLength) {
+            throw new EOFException();
+        }
+
+        if (skipped >= 0) {
+            consumedCount += skipped;
+        }
+
+        return skipped;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
new file mode 100644
index 0000000..27a0c47
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wraps and InputStream so that the underlying InputStream cannot be closed. 
This is used so that the InputStream can be wrapped with yet another 
InputStream and prevent the outer layer from closing
+ * the inner InputStream
+ */
+public class NonCloseableInputStream extends FilterInputStream {
+
+    private final InputStream toWrap;
+
+    public NonCloseableInputStream(final InputStream toWrap) {
+        super(toWrap);
+        this.toWrap = toWrap;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return toWrap.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return toWrap.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        return toWrap.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
new file mode 100644
index 0000000..9c77637
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class NonCloseableOutputStream extends FilterOutputStream {
+
+    private final OutputStream out;
+
+    public NonCloseableOutputStream(final OutputStream out) {
+        super(out);
+        this.out = out;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
new file mode 100644
index 0000000..8452761
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * OutputStream that throws away all data, just like as if writing to /dev/null
+ */
+public class NullOutputStream extends OutputStream {
+
+    @Override
+    public void write(final int b) throws IOException {
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+    }
+
+    @Override
+    public void write(final byte[] b, int off, int len) throws IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void flush() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
new file mode 100644
index 0000000..9158050
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface StreamThrottler extends Closeable {
+
+    long copy(InputStream in, OutputStream out) throws IOException;
+
+    long copy(InputStream in, OutputStream out, long maxBytes) throws 
IOException;
+
+    InputStream newThrottledInputStream(final InputStream toWrap);
+
+    OutputStream newThrottledOutputStream(final OutputStream toWrap);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
new file mode 100644
index 0000000..64f6eaa
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.stream.io.exception.BytePatternNotFoundException;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+
+public class StreamUtils {
+
+    public static long copy(final InputStream source, final OutputStream 
destination) throws IOException {
+        final byte[] buffer = new byte[8192];
+        int len;
+        long totalCount = 0L;
+        while ((len = source.read(buffer)) > 0) {
+            destination.write(buffer, 0, len);
+            totalCount += len;
+        }
+        return totalCount;
+    }
+
+    /**
+     * Copies <code>numBytes</code> from <code>source</code> to 
<code>destination</code>. If <code>numBytes</code> are not available from 
<code>source</code>, throws EOFException
+     *
+     * @param source the source of bytes to copy
+     * @param destination the destination to copy bytes to
+     * @param numBytes the number of bytes to copy
+     * @throws IOException if any issues occur while copying
+     */
+    public static void copy(final InputStream source, final OutputStream 
destination, final long numBytes) throws IOException {
+        final byte[] buffer = new byte[8192];
+        int len;
+        long bytesLeft = numBytes;
+        while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, 
buffer.length))) > 0) {
+            destination.write(buffer, 0, len);
+            bytesLeft -= len;
+        }
+
+        if (bytesLeft > 0) {
+            throw new EOFException("Attempted to copy " + numBytes + " bytes 
but only " + (numBytes - bytesLeft) + " bytes were available");
+        }
+    }
+
+    /**
+     * Reads data from the given input stream, copying it to the destination 
byte array. If the InputStream has less data than the given byte array, throws 
an EOFException
+     *
+     * @param source the source to copy bytes from
+     * @param destination the destination to fill
+     * @throws IOException if any issues occur reading bytes
+     */
+    public static void fillBuffer(final InputStream source, final byte[] 
destination) throws IOException {
+        fillBuffer(source, destination, true);
+    }
+
+    /**
+     * Reads data from the given input stream, copying it to the destination 
byte array. If the InputStream has less data than the given byte array, throws 
an EOFException if
+     * <code>ensureCapacity</code> is true and otherwise returns the number of 
bytes copied
+     *
+     * @param source the source to read bytes from
+     * @param destination the destination to fill
+     * @param ensureCapacity whether or not to enforce that the InputStream 
have at least as much data as the capacity of the destination byte array
+     * @return the number of bytes actually filled
+     * @throws IOException if unable to read from the underlying stream
+     */
+    public static int fillBuffer(final InputStream source, final byte[] 
destination, final boolean ensureCapacity) throws IOException {
+        int bytesRead = 0;
+        int len;
+        while (bytesRead < destination.length) {
+            len = source.read(destination, bytesRead, destination.length - 
bytesRead);
+            if (len < 0) {
+                if (ensureCapacity) {
+                    throw new EOFException();
+                } else {
+                    break;
+                }
+            }
+
+            bytesRead += len;
+        }
+
+        return bytesRead;
+    }
+
+    /**
+     * Copies data from in to out until either we are out of data (returns 
null) or we hit one of the byte patterns identified by the 
<code>stoppers</code> parameter (returns the byte pattern
+     * matched). The bytes in the stopper will be copied.
+     *
+     * @param in the source to read bytes from
+     * @param out the destination to write bytes to
+     * @param maxBytes the max bytes to copy
+     * @param stoppers patterns of bytes which if seen will cause the copy to 
stop
+     * @return the byte array matched, or null if end of stream was reached
+     * @throws IOException if issues occur reading or writing bytes to the 
underlying streams
+     */
+    public static byte[] copyInclusive(final InputStream in, final 
OutputStream out, final int maxBytes, final byte[]... stoppers) throws 
IOException {
+        if (stoppers.length == 0) {
+            return null;
+        }
+
+        final List<NonThreadSafeCircularBuffer> circularBuffers = new 
ArrayList<>();
+        for (final byte[] stopper : stoppers) {
+            circularBuffers.add(new NonThreadSafeCircularBuffer(stopper));
+        }
+
+        long bytesRead = 0;
+        while (true) {
+            final int next = in.read();
+            if (next == -1) {
+                return null;
+            } else if (maxBytes > 0 && ++bytesRead >= maxBytes) {
+                throw new BytePatternNotFoundException("Did not encounter any 
byte pattern that was expected; data does not appear to be in the expected 
format");
+            }
+
+            out.write(next);
+
+            for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
+                if (circ.addAndCompare((byte) next)) {
+                    return circ.getByteArray();
+                }
+            }
+        }
+    }
+
+    /**
+     * Copies data from in to out until either we are out of data (returns 
null) or we hit one of the byte patterns identified by the 
<code>stoppers</code> parameter (returns the byte pattern
+     * matched). The byte pattern matched will NOT be copied to the output and 
will be un-read from the input.
+     *
+     * @param in the source to read bytes from
+     * @param out the destination to write bytes to
+     * @param maxBytes the maximum number of bytes to copy
+     * @param stoppers byte patterns which will cause the copy to stop if found
+     * @return the byte array matched, or null if end of stream was reached
+     * @throws IOException for issues reading or writing to underlying streams
+     */
+    public static byte[] copyExclusive(final InputStream in, final 
OutputStream out, final int maxBytes, final byte[]... stoppers) throws 
IOException {
+        if (stoppers.length == 0) {
+            return null;
+        }
+
+        int longest = 0;
+        NonThreadSafeCircularBuffer longestBuffer = null;
+        final List<NonThreadSafeCircularBuffer> circularBuffers = new 
ArrayList<>();
+        for (final byte[] stopper : stoppers) {
+            final NonThreadSafeCircularBuffer circularBuffer = new 
NonThreadSafeCircularBuffer(stopper);
+            if (stopper.length > longest) {
+                longest = stopper.length;
+                longestBuffer = circularBuffer;
+                circularBuffers.add(0, circularBuffer);
+            } else {
+                circularBuffers.add(circularBuffer);
+            }
+        }
+
+        long bytesRead = 0;
+        while (true) {
+            final int next = in.read();
+            if (next == -1) {
+                return null;
+            } else if (maxBytes > 0 && bytesRead++ > maxBytes) {
+                throw new BytePatternNotFoundException("Did not encounter any 
byte pattern that was expected; data does not appear to be in the expected 
format");
+            }
+
+            for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
+                if (circ.addAndCompare((byte) next)) {
+                    // The longest buffer has some data that may not have been 
written out yet; we need to make sure
+                    // that we copy out those bytes.
+                    final int bytesToCopy = longest - 
circ.getByteArray().length;
+                    for (int i = 0; i < bytesToCopy; i++) {
+                        final int oldestByte = longestBuffer.getOldestByte();
+                        if (oldestByte != -1) {
+                            out.write(oldestByte);
+                            longestBuffer.addAndCompare((byte) 0);
+                        }
+                    }
+
+                    return circ.getByteArray();
+                }
+            }
+
+            if (longestBuffer.isFilled()) {
+                out.write(longestBuffer.getOldestByte());
+            }
+        }
+    }
+
+    /**
+     * Skips the specified number of bytes from the InputStream
+     *
+     * If unable to skip that number of bytes, throws EOFException
+     *
+     * @param stream the stream to skip over
+     * @param bytesToSkip the number of bytes to skip
+     * @throws IOException if any issues reading or skipping underlying stream
+     */
+    public static void skip(final InputStream stream, final long bytesToSkip) 
throws IOException {
+        if (bytesToSkip <= 0) {
+            return;
+        }
+        long totalSkipped = 0L;
+
+        // If we have a FileInputStream, calling skip(1000000) will return 
1000000 even if the file is only
+        // 3 bytes. As a result, we will skip 1 less than the number 
requested, and then read the last
+        // byte in order to make sure that we've consumed the number of bytes 
requested. We then check that
+        // the final byte, which we read, is not -1.
+        final long actualBytesToSkip = bytesToSkip - 1;
+        while (totalSkipped < actualBytesToSkip) {
+            final long skippedThisIteration = stream.skip(actualBytesToSkip - 
totalSkipped);
+            if (skippedThisIteration == 0) {
+                final int nextByte = stream.read();
+                if (nextByte == -1) {
+                    throw new EOFException();
+                } else {
+                    totalSkipped++;
+                }
+            }
+
+            totalSkipped += skippedThisIteration;
+        }
+
+        final int lastByte = stream.read();
+        if (lastByte == -1) {
+            throw new EOFException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
new file mode 100644
index 0000000..d30af76
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.OutputStream;
+
+/**
+ * This class extends the {@link java.util.zip.ZipOutputStream} by providing a 
constructor that allows the user to specify the compression level. The default 
compression level is 1, as opposed to
+ * Java's default of 5.
+ */
+public class ZipOutputStream extends java.util.zip.ZipOutputStream {
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+
+    public ZipOutputStream(final OutputStream out) {
+        this(out, DEFAULT_COMPRESSION_LEVEL);
+    }
+
+    public ZipOutputStream(final OutputStream out, final int compressionLevel) 
{
+        super(out);
+        def.setLevel(compressionLevel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
new file mode 100644
index 0000000..5d08616
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.exception;
+
+import java.io.IOException;
+
+public class BytePatternNotFoundException extends IOException {
+
+    private static final long serialVersionUID = -4128911284318513973L;
+
+    public BytePatternNotFoundException(final String explanation) {
+        super(explanation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
new file mode 100644
index 0000000..f18d824
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.util.Arrays;
+
+public class NonThreadSafeCircularBuffer {
+
+    private final byte[] lookingFor;
+    private final int[] buffer;
+    private int insertionPointer = 0;
+    private int bufferSize = 0;
+
+    public NonThreadSafeCircularBuffer(final byte[] lookingFor) {
+        this.lookingFor = lookingFor;
+        buffer = new int[lookingFor.length];
+        Arrays.fill(buffer, -1);
+    }
+
+    public byte[] getByteArray() {
+        return lookingFor;
+    }
+
+    /**
+     * Returns the oldest byte in the buffer
+     *
+     * @return the oldest byte
+     */
+    public int getOldestByte() {
+        return buffer[insertionPointer];
+    }
+
+    public boolean isFilled() {
+        return bufferSize >= buffer.length;
+    }
+
+    public boolean addAndCompare(final byte data) {
+        buffer[insertionPointer] = data;
+        insertionPointer = (insertionPointer + 1) % lookingFor.length;
+
+        bufferSize++;
+        if (bufferSize < lookingFor.length) {
+            return false;
+        }
+
+        for (int i = 0; i < lookingFor.length; i++) {
+            final byte compare = (byte) buffer[(insertionPointer + i) % 
lookingFor.length];
+            if (compare != lookingFor[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
new file mode 100644
index 0000000..92061e0
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class BooleanHolder extends ObjectHolder<Boolean> {
+
+    public BooleanHolder(final boolean initialValue) {
+        super(initialValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/EscapeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/EscapeUtils.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/EscapeUtils.java
new file mode 100644
index 0000000..46739e3
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/EscapeUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class EscapeUtils {
+
+    /**
+     * Escapes the specified html by replacing &amp;, &lt;, &gt;, &quot;, 
&#39;, &#x2f; with their corresponding html entity. If html is null, null is 
returned.
+     *
+     * @param html to escape
+     * @return escaped html
+     */
+    public static String escapeHtml(String html) {
+        if (html == null) {
+            return null;
+        }
+
+        html = html.replace("&", "&amp;");
+        html = html.replace("<", "&lt;");
+        html = html.replace(">", "&gt;");
+        html = html.replace("\"", "&quot;");
+        html = html.replace("'", "&#39;");
+        html = html.replace("/", "&#x2f;");
+
+        return html;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
new file mode 100644
index 0000000..03afec0
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FormatUtils {
+
+    private static final String UNION = "|";
+
+    // for Data Sizes
+    private static final double BYTES_IN_KILOBYTE = 1024;
+    private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024;
+    private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024;
+    private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024;
+
+    // for Time Durations
+    private static final String NANOS = join(UNION, "ns", "nano", "nanos", 
"nanoseconds");
+    private static final String MILLIS = join(UNION, "ms", "milli", "millis", 
"milliseconds");
+    private static final String SECS = join(UNION, "s", "sec", "secs", 
"second", "seconds");
+    private static final String MINS = join(UNION, "m", "min", "mins", 
"minute", "minutes");
+    private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", 
"hours");
+    private static final String DAYS = join(UNION, "d", "day", "days");
+
+    private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, 
SECS, MINS, HOURS, DAYS);
+    public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + 
VALID_TIME_UNITS + ")";
+    public static final Pattern TIME_DURATION_PATTERN = 
Pattern.compile(TIME_DURATION_REGEX);
+
+    /**
+     * Formats the specified count by adding commas.
+     *
+     * @param count the value to add commas to
+     * @return the string representation of the given value with commas 
included
+     */
+    public static String formatCount(final long count) {
+        return NumberFormat.getIntegerInstance().format(count);
+    }
+
+    /**
+     * Formats the specified duration in 'mm:ss.SSS' format.
+     *
+     * @param sourceDuration the duration to format
+     * @param sourceUnit the unit to interpret the duration
+     * @return representation of the given time data in minutes/seconds
+     */
+    public static String formatMinutesSeconds(final long sourceDuration, final 
TimeUnit sourceUnit) {
+        final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, 
sourceUnit);
+        final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS", 
Locale.US);
+        return formatter.format(new Date(millis));
+    }
+
+    /**
+     * Formats the specified duration in 'HH:mm:ss.SSS' format.
+     *
+     * @param sourceDuration the duration to format
+     * @param sourceUnit the unit to interpret the duration
+     * @return representation of the given time data in hours/minutes/seconds
+     */
+    public static String formatHoursMinutesSeconds(final long sourceDuration, 
final TimeUnit sourceUnit) {
+        final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, 
sourceUnit);
+        final long millisInHour = TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.HOURS);
+        final int hours = (int) (millis / millisInHour);
+        final long whatsLeft = millis - hours * millisInHour;
+
+        return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS", 
Locale.US).format(new Date(whatsLeft));
+    }
+
+    private static String pad(final int val) {
+        return (val < 10) ? "0" + val : String.valueOf(val);
+    }
+
+    /**
+     * Formats the specified data size in human readable format.
+     *
+     * @param dataSize Data size in bytes
+     * @return Human readable format
+     */
+    public static String formatDataSize(final double dataSize) {
+        // initialize the formatter
+        final NumberFormat format = NumberFormat.getNumberInstance();
+        format.setMaximumFractionDigits(2);
+
+        // check terabytes
+        double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " TB";
+        }
+
+        // check gigabytes
+        dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " GB";
+        }
+
+        // check megabytes
+        dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " MB";
+        }
+
+        // check kilobytes
+        dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE;
+        if (dataSizeToFormat > 1) {
+            return format.format(dataSizeToFormat) + " KB";
+        }
+
+        // default to bytes
+        return format.format(dataSize) + " bytes";
+    }
+
+    public static long getTimeDuration(final String value, final TimeUnit 
desiredUnit) {
+        final Matcher matcher = 
TIME_DURATION_PATTERN.matcher(value.toLowerCase());
+        if (!matcher.matches()) {
+            throw new IllegalArgumentException("Value '" + value + "' is not a 
valid Time Duration");
+        }
+
+        final String duration = matcher.group(1);
+        final String units = matcher.group(2);
+        TimeUnit specifiedTimeUnit = null;
+        switch (units.toLowerCase()) {
+            case "ns":
+            case "nano":
+            case "nanos":
+            case "nanoseconds":
+                specifiedTimeUnit = TimeUnit.NANOSECONDS;
+                break;
+            case "ms":
+            case "milli":
+            case "millis":
+            case "milliseconds":
+                specifiedTimeUnit = TimeUnit.MILLISECONDS;
+                break;
+            case "s":
+            case "sec":
+            case "secs":
+            case "second":
+            case "seconds":
+                specifiedTimeUnit = TimeUnit.SECONDS;
+                break;
+            case "m":
+            case "min":
+            case "mins":
+            case "minute":
+            case "minutes":
+                specifiedTimeUnit = TimeUnit.MINUTES;
+                break;
+            case "h":
+            case "hr":
+            case "hrs":
+            case "hour":
+            case "hours":
+                specifiedTimeUnit = TimeUnit.HOURS;
+                break;
+            case "d":
+            case "day":
+            case "days":
+                specifiedTimeUnit = TimeUnit.DAYS;
+                break;
+        }
+
+        final long durationVal = Long.parseLong(duration);
+        return desiredUnit.convert(durationVal, specifiedTimeUnit);
+    }
+
+    public static String formatUtilization(final double utilization) {
+        return utilization + "%";
+    }
+
+    private static String join(final String delimiter, final String... values) 
{
+        if (values.length == 0) {
+            return "";
+        } else if (values.length == 1) {
+            return values[0];
+        }
+
+        final StringBuilder sb = new StringBuilder();
+        sb.append(values[0]);
+        for (int i = 1; i < values.length; i++) {
+            sb.append(delimiter).append(values[i]);
+        }
+
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
new file mode 100644
index 0000000..213bbc0
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+public class IntegerHolder extends ObjectHolder<Integer> {
+
+    public IntegerHolder(final int initialValue) {
+        super(initialValue);
+    }
+
+    public int addAndGet(final int delta) {
+        final int curValue = get();
+        final int newValue = curValue + delta;
+        set(newValue);
+        return newValue;
+    }
+
+    public int getAndAdd(final int delta) {
+        final int curValue = get();
+        final int newValue = curValue + delta;
+        set(newValue);
+        return curValue;
+    }
+
+    public int incrementAndGet() {
+        return addAndGet(1);
+    }
+
+    public int getAndIncrement() {
+        return getAndAdd(1);
+    }
+
+    public int decrementAndGet() {
+        return addAndGet(-1);
+    }
+
+    public int getAndDecrement() {
+        return getAndAdd(-1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
new file mode 100644
index 0000000..fa2d063
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * Wraps a Long value so that it can be declared <code>final</code> and still 
be accessed from which inner classes; the functionality is similar to that of 
an AtomicLong, but operations on this class
+ * are not atomic. This results in greater performance when the atomicity is 
not needed.
+ */
+public class LongHolder extends ObjectHolder<Long> {
+
+    public LongHolder(final long initialValue) {
+        super(initialValue);
+    }
+
+    public long addAndGet(final long delta) {
+        final long curValue = get();
+        final long newValue = curValue + delta;
+        set(newValue);
+        return newValue;
+    }
+
+    public long getAndAdd(final long delta) {
+        final long curValue = get();
+        final long newValue = curValue + delta;
+        set(newValue);
+        return curValue;
+    }
+
+    public long incrementAndGet() {
+        return addAndGet(1);
+    }
+
+    public long getAndIncrement() {
+        return getAndAdd(1);
+    }
+
+    public long decrementAndGet() {
+        return addAndGet(-1L);
+    }
+
+    public long getAndDecrement() {
+        return getAndAdd(-1L);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
new file mode 100644
index 0000000..0c6c575
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.Arrays;
+
+/**
+ * <p>
+ * A RingBuffer that can be used to scan byte sequences for subsequences.
+ * </p>
+ *
+ * <p>
+ * This class implements an efficient naive search algorithm, which allows the 
user of the library to identify byte sequences in a stream on-the-fly so that 
the stream can be segmented without having
+ * to buffer the data.
+ * </p>
+ *
+ * <p>
+ * The intended usage paradigm is:
+ * <code>
+ * <pre>
+ * final byte[] searchSequence = ...;
+ * final CircularBuffer buffer = new CircularBuffer(searchSequence);
+ * while ((int nextByte = in.read()) > 0) {
+ *      if ( buffer.addAndCompare(nextByte) ) {
+ *          // This byte is the last byte in the given sequence
+ *      } else {
+ *          // This byte does not complete the given sequence
+ *      }
+ * }
+ * </pre>
+ * </code>
+ * </p>
+ */
+public class NaiveSearchRingBuffer {
+
+    private final byte[] lookingFor;
+    private final int[] buffer;
+    private int insertionPointer = 0;
+    private int bufferSize = 0;
+
+    public NaiveSearchRingBuffer(final byte[] lookingFor) {
+        this.lookingFor = lookingFor;
+        this.buffer = new int[lookingFor.length];
+        Arrays.fill(buffer, -1);
+    }
+
+    /**
+     * @return the contents of the internal buffer, which represents the last 
X bytes added to the buffer, where X is the minimum of the number of bytes 
added to the buffer or the length of the byte
+     * sequence for which we are looking
+     */
+    public byte[] getBufferContents() {
+        final int contentLength = Math.min(lookingFor.length, bufferSize);
+        final byte[] contents = new byte[contentLength];
+        for (int i = 0; i < contentLength; i++) {
+            final byte nextByte = (byte) buffer[(insertionPointer + i) % 
lookingFor.length];
+            contents[i] = nextByte;
+        }
+        return contents;
+    }
+
+    /**
+     * @return the oldest byte in the buffer
+     */
+    public int getOldestByte() {
+        return buffer[insertionPointer];
+    }
+
+    /**
+     * @return <code>true</code> if the number of bytes that have been added 
to the buffer is at least equal to the length of the byte sequence for which we 
are searching
+     */
+    public boolean isFilled() {
+        return bufferSize >= buffer.length;
+    }
+
+    /**
+     * Clears the internal buffer so that a new search may begin
+     */
+    public void clear() {
+        Arrays.fill(buffer, -1);
+        insertionPointer = 0;
+        bufferSize = 0;
+    }
+
+    /**
+     * Add the given byte to the buffer and notify whether or not the byte 
completes the desired byte sequence.
+     *
+     * @param data the data to add to the buffer
+     * @return <code>true</code> if this byte completes the byte sequence, 
<code>false</code> otherwise.
+     */
+    public boolean addAndCompare(final byte data) {
+        buffer[insertionPointer] = data;
+        insertionPointer = (insertionPointer + 1) % lookingFor.length;
+
+        bufferSize++;
+        if (bufferSize < lookingFor.length) {
+            return false;
+        }
+
+        for (int i = 0; i < lookingFor.length; i++) {
+            final byte compare = (byte) buffer[(insertionPointer + i) % 
lookingFor.length];
+            if (compare != lookingFor[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
new file mode 100644
index 0000000..12a887c
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * A bean that holds a single value of type T.
+ *
+ */
+public class ObjectHolder<T> {
+
+    private T value;
+
+    public ObjectHolder(final T initialValue) {
+        this.value = initialValue;
+    }
+
+    public T get() {
+        return value;
+    }
+
+    public void set(T value) {
+        this.value = value;
+    }
+}

Reply via email to