http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java new file mode 100644 index 0000000..6d36ad0 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.search.SearchTerm; + +public class SearchState<T> { + + private Node currentNode; + private final Map<SearchTerm<T>, List<Long>> resultMap; + private long bytesRead; + + SearchState(final Node rootNode) { + resultMap = new HashMap<>(5); + currentNode = rootNode; + bytesRead = 0L; + } + + void incrementBytesRead(final long increment) { + bytesRead += increment; + } + + void setCurrentNode(final Node curr) { + currentNode = curr; + } + + public Node getCurrentNode() { + return currentNode; + } + + public Map<SearchTerm<T>, List<Long>> getResults() { + return new HashMap<>(resultMap); + } + + void addResult(final SearchTerm matchingTerm) { + final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5); + indexes.add(bytesRead); + resultMap.put(matchingTerm, indexes); + } + + public boolean foundMatch() { + return !resultMap.isEmpty(); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java new file mode 100644 index 0000000..2b95897 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +public interface EntityAccess<T> { + + T aggregate(T oldValue, T toAdd); + + T createNew(); + + long getTimestamp(T entity); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java new file mode 100644 index 0000000..193abc6 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +public class LongEntityAccess implements EntityAccess<TimestampedLong> { + + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } + + return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); + } + + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } + + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java new file mode 100644 index 0000000..dd8e523 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TimedBuffer<T> { + + private final int numBins; + private final EntitySum<T>[] bins; + private final EntityAccess<T> entityAccess; + private final TimeUnit binPrecision; + + @SuppressWarnings("unchecked") + public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) { + this.binPrecision = binPrecision; + this.numBins = numBins + 1; + this.bins = new EntitySum[this.numBins]; + for (int i = 0; i < this.numBins; i++) { + this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor); + } + this.entityAccess = accessor; + } + + public T add(final T entity) { + final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins); + final EntitySum<T> sum = bins[binIdx]; + + return sum.addOrReset(entity); + } + + public T getAggregateValue(final long sinceEpochMillis) { + final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins); + + T total = null; + for (int i = 0; i < numBins; i++) { + int binIdx = (startBinIdx + i) % numBins; + final EntitySum<T> bin = bins[binIdx]; + + if (!bin.isExpired()) { + total = entityAccess.aggregate(total, bin.getValue()); + } + } + + return total; + } + + private static class EntitySum<S> { + + private final EntityAccess<S> entityAccess; + private final AtomicReference<S> ref = new AtomicReference<>(); + private final TimeUnit binPrecision; + private final int numConfiguredBins; + + public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) { + this.binPrecision = binPrecision; + this.entityAccess = aggregator; + this.numConfiguredBins = numConfiguredBins; + } + + private S add(final S event) { + S newValue; + S value; + do { + value = ref.get(); + newValue = entityAccess.aggregate(value, event); + } while (!ref.compareAndSet(value, newValue)); + + return newValue; + } + + public S getValue() { + return ref.get(); + } + + public boolean isExpired() { + // entityAccess.getTimestamp(curValue) represents the time at which the current value + // was last updated. If the last value is less than current time - 1 binPrecision, then it + // means that we've rolled over and need to reset the value. + final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision); + + final S curValue = ref.get(); + return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod); + } + + public S addOrReset(final S event) { + // entityAccess.getTimestamp(curValue) represents the time at which the current value + // was last updated. If the last value is less than current time - 1 binPrecision, then it + // means that we've rolled over and need to reset the value. + final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision); + + final S curValue = ref.get(); + if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) { + ref.compareAndSet(curValue, entityAccess.createNew()); + } + return add(event); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java new file mode 100644 index 0000000..07d31ea --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +public class TimestampedLong { + + private final Long value; + private final long timestamp = System.currentTimeMillis(); + + public TimestampedLong(final Long value) { + this.value = value; + } + + public Long getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java new file mode 100644 index 0000000..bd30a96 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; + +import org.junit.Test; + +public class TestCompressionInputOutputStreams { + + @Test + public void testSimple() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] data = "Hello, World!".getBytes("UTF-8"); + + final CompressionOutputStream cos = new CompressionOutputStream(baos); + cos.write(data); + cos.flush(); + cos.close(); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data, decompressed)); + } + + @Test + public void testDataLargerThanBuffer() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(str); + } + final byte[] data = sb.toString().getBytes("UTF-8"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + cos.write(data); + cos.flush(); + cos.close(); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data, decompressed)); + } + + @Test + public void testDataLargerThanBufferWhileFlushing() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + final byte[] data = str.getBytes("UTF-8"); + + final StringBuilder sb = new StringBuilder(); + final byte[] data1024; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 1024; i++) { + cos.write(data); + cos.flush(); + sb.append(str); + } + cos.close(); + data1024 = sb.toString().getBytes("UTF-8"); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data1024, decompressed)); + } + + @Test + public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + final byte[] data = str.getBytes("UTF-8"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 512; i++) { + cos.write(data); + cos.flush(); + } + cos.close(); + + final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 512; i++) { + cos2.write(data); + cos2.flush(); + } + cos2.close(); + + final byte[] data512; + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 512; i++) { + sb.append(str); + } + data512 = sb.toString().getBytes("UTF-8"); + + final byte[] compressedBytes = baos.toByteArray(); + final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes); + + final CompressionInputStream cis = new CompressionInputStream(bais); + final byte[] decompressed = readFully(cis); + assertTrue(Arrays.equals(data512, decompressed)); + + final CompressionInputStream cis2 = new CompressionInputStream(bais); + final byte[] decompressed2 = readFully(cis2); + assertTrue(Arrays.equals(data512, decompressed2)); + } + + private byte[] readFully(final InputStream in) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) >= 0) { + baos.write(buffer, 0, len); + } + + return baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java new file mode 100644 index 0000000..27b1493 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import junit.framework.TestCase; + +public class ByteCountingInputStreamTest extends TestCase { + + final ByteArrayInputStream reader = new ByteArrayInputStream("abcdefghijklmnopqrstuvwxyz".getBytes()); + + public void testReset() throws Exception { + + final ByteArrayInputStream reader = new ByteArrayInputStream("abcdefghijklmnopqrstuvwxyz".getBytes()); + final ByteCountingInputStream bcis = new ByteCountingInputStream(reader); + int tmp; + + /* verify first 2 bytes */ + tmp = bcis.read(); + assertEquals(tmp, 97); + tmp = bcis.read(); + assertEquals(tmp, 98); + + /* save bytes read and place mark */ + final long bytesAtMark = bcis.getBytesRead(); + bcis.mark(0); + + /* verify next 2 bytes */ + tmp = bcis.read(); + assertEquals(tmp, 99); + tmp = bcis.read(); + assertEquals(tmp, 100); + + /* verify reset returns to position when mark was placed */ + bcis.reset(); + assertEquals(bytesAtMark, bcis.getBytesRead()); + + /* verify that the reset bug has been fixed (bug would reduce bytes read count) */ + bcis.reset(); + assertEquals(bytesAtMark, bcis.getBytesRead()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java new file mode 100644 index 0000000..52bd8de --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("Tests are time-based") +public class TestLeakyBucketThrottler { + + @Test(timeout = 10000) + public void testOutputStreamInterface() throws IOException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + final byte[] data = new byte[1024 * 1024 * 4]; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); + + final long start = System.currentTimeMillis(); + throttledOut.write(data); + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + } + + @Test(timeout = 10000) + public void testInputStreamInterface() throws IOException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + final byte[] data = new byte[1024 * 1024 * 4]; + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final InputStream throttledIn = throttler.newThrottledInputStream(bais); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] buffer = new byte[4096]; + final long start = System.currentTimeMillis(); + int len; + while ((len = throttledIn.read(buffer)) > 0) { + baos.write(buffer, 0, len); + } + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + baos.close(); + } + + @Test(timeout = 10000) + public void testDirectInterface() throws IOException, InterruptedException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + // create 3 threads, each sending ~2 MB + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < 3; i++) { + final Thread t = new WriterThread(i, throttler, baos); + threads.add(t); + } + + final long start = System.currentTimeMillis(); + for (final Thread t : threads) { + t.start(); + } + + for (final Thread t : threads) { + t.join(); + } + final long elapsed = System.currentTimeMillis() - start; + + throttler.close(); + + // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to + // allow for busy-ness and the fact that we could write a tiny bit more than the limit. + assertTrue(elapsed > 5000); + assertTrue(elapsed < 7000); + + // ensure bytes were copied out appropriately + assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); + assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); + } + + private static class WriterThread extends Thread { + + private final int idx; + private final byte[] data = new byte[1024 * 1024 * 2 + 1]; + private final LeakyBucketStreamThrottler throttler; + private final OutputStream out; + + public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) { + this.idx = idx; + this.throttler = throttler; + this.out = out; + this.data[this.data.length - 1] = (byte) 'A'; + } + + @Override + public void run() { + long startMillis = System.currentTimeMillis(); + long bytesWritten = 0L; + try { + throttler.copy(new ByteArrayInputStream(data), out); + } catch (IOException e) { + e.printStackTrace(); + return; + } + long now = System.currentTimeMillis(); + long millisElapsed = now - startMillis; + bytesWritten += data.length; + float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F; + System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec"); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java new file mode 100644 index 0000000..0838e96 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class TestNaiveSearchRingBuffer { + + @Test + public void testAddAndCompare() { + final byte[] pattern = new byte[]{ + '\r', '0', 38, 48 + }; + + final byte[] search = new byte[]{ + '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38 + }; + + final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); + int counter = -1; + for (final byte b : search) { + counter++; + final boolean matched = circ.addAndCompare(b); + if (counter == 10) { + assertTrue(matched); + } else { + assertFalse(matched); + } + } + } + + @Test + public void testGetOldestByte() { + final byte[] pattern = new byte[]{ + '\r', '0', 38, 48 + }; + + final byte[] search = new byte[]{ + '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, '\r', '0', 38, 48, 83, 92, 78, 4, 38 + }; + + final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); + int counter = -1; + for (final byte b : search) { + counter++; + final boolean matched = circ.addAndCompare(b); + if (counter == 13) { + assertTrue(matched); + } else { + assertFalse(matched); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java new file mode 100644 index 0000000..ec04efb --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.UUID; + +import org.junit.Test; + +public class TestCompoundUpdateMonitor { + + @Test + public void test() throws IOException { + final UpdateMonitor lastModified = new LastModifiedMonitor(); + final MD5SumMonitor md5 = new MD5SumMonitor(); + final CompoundUpdateMonitor compound = new CompoundUpdateMonitor(lastModified, md5); + + final File file = new File("target/" + UUID.randomUUID().toString()); + if (file.exists()) { + assertTrue(file.delete()); + } + assertTrue(file.createNewFile()); + + final Path path = file.toPath(); + + final Object curState = compound.getCurrentState(path); + final Object state2 = compound.getCurrentState(path); + + assertEquals(curState, state2); + file.setLastModified(System.currentTimeMillis() + 1000L); + final Object state3 = compound.getCurrentState(path); + assertEquals(state2, state3); + + final Object state4 = compound.getCurrentState(path); + assertEquals(state3, state4); + + final long lastModifiedDate = file.lastModified(); + try (final OutputStream out = new FileOutputStream(file)) { + out.write("Hello".getBytes("UTF-8")); + } + + file.setLastModified(lastModifiedDate); + + final Object state5 = compound.getCurrentState(path); + assertNotSame(state4, state5); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java new file mode 100644 index 0000000..3440c16 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.file.monitor; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + +import org.junit.Test; + +public class TestSynchronousFileWatcher { + + @Test + public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException { + final Path path = Paths.get("target/1.txt"); + Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING); + final UpdateMonitor monitor = new MD5SumMonitor(); + + final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L); + assertFalse(watcher.checkAndReset()); + Thread.sleep(30L); + assertFalse(watcher.checkAndReset()); + + final FileOutputStream fos = new FileOutputStream(path.toFile()); + try { + fos.write("Good-bye, World!".getBytes("UTF-8")); + fos.getFD().sync(); + } finally { + fos.close(); + } + + assertTrue(watcher.checkAndReset()); + assertFalse(watcher.checkAndReset()); + + Thread.sleep(30L); + assertFalse(watcher.checkAndReset()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java new file mode 100644 index 0000000..b01b495 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.util.RingBuffer; +import org.apache.nifi.util.RingBuffer.ForEachEvaluator; +import org.apache.nifi.util.RingBuffer.IterationDirection; + +import org.junit.Test; + +/** + * + */ +public class TestRingBuffer { + + @Test + public void testGetNewestElement() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + for (int i = 0; i < 11; i++) { + ringBuffer.add(i); + assertEquals(i, ringBuffer.getNewestElement().intValue()); + } + } + + @Test + public void testAsList() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + final List<Integer> emptyList = ringBuffer.asList(); + assertTrue(emptyList.isEmpty()); + + for (int i = 0; i < 3; i++) { + ringBuffer.add(i); + } + + List<Integer> list = ringBuffer.asList(); + assertEquals(3, list.size()); + for (int i = 0; i < 3; i++) { + assertEquals(Integer.valueOf(i), list.get(i)); + } + + for (int i = 3; i < 10; i++) { + ringBuffer.add(i); + } + + list = ringBuffer.asList(); + assertEquals(10, list.size()); + for (int i = 0; i < 10; i++) { + assertEquals(Integer.valueOf(i), list.get(i)); + } + } + + @Test + public void testIterateForwards() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + final AtomicInteger countHolder = new AtomicInteger(0); + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + final int expected = values[counter++]; + countHolder.incrementAndGet(); + assertEquals(expected, value.intValue()); + return true; + } + + }, IterationDirection.FORWARD); + + assertEquals(4, countHolder.get()); + } + + @Test + public void testIterateForwardsAfterFull() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + for (int i = 0; i < 12; i++) { + ringBuffer.add(i); + } + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + if (counter < 6) { + assertEquals(counter + 6, value.intValue()); + } else { + final int expected = values[counter - 6]; + assertEquals(expected, value.intValue()); + } + + counter++; + return true; + } + + }, IterationDirection.FORWARD); + } + + @Test + public void testIterateBackwards() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + final AtomicInteger countHolder = new AtomicInteger(0); + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + final int index = values.length - 1 - counter; + final int expected = values[index]; + countHolder.incrementAndGet(); + + assertEquals(expected, value.intValue()); + counter++; + return true; + } + + }, IterationDirection.BACKWARD); + + assertEquals(4, countHolder.get()); + } + + @Test + public void testIterateBackwardsAfterFull() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + for (int i = 0; i < 12; i++) { + ringBuffer.add(i); + } + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + if (counter < values.length) { + final int index = values.length - 1 - counter; + final int expected = values[index]; + + assertEquals(expected, value.intValue()); + counter++; + } + + return true; + } + + }, IterationDirection.BACKWARD); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java new file mode 100644 index 0000000..39ca330 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +public class TestTimedBuffer { + + @Test + public void testAgesOff() throws InterruptedException { + final LongEntityAccess access = new LongEntityAccess(); + final TimedBuffer<TimestampedLong> buffer = new TimedBuffer<>(TimeUnit.SECONDS, 2, access); + + buffer.add(new TimestampedLong(1000000L)); + TimestampedLong aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(1000000L, aggregate.getValue().longValue()); + Thread.sleep(1000L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(1000000L, aggregate.getValue().longValue()); + Thread.sleep(1500L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertNull(aggregate); + } + + @Test + public void testAggregation() throws InterruptedException { + final LongEntityAccess access = new LongEntityAccess(); + final TimedBuffer<TimestampedLong> buffer = new TimedBuffer<>(TimeUnit.SECONDS, 2, access); + + buffer.add(new TimestampedLong(1000000L)); + buffer.add(new TimestampedLong(1000000L)); + buffer.add(new TimestampedLong(25000L)); + + TimestampedLong aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(2025000L, aggregate.getValue().longValue()); + Thread.sleep(1000L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(2025000L, aggregate.getValue().longValue()); + Thread.sleep(1500L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertNull(aggregate); + } + + private static class TimestampedLong { + + private final Long value; + private final long timestamp = System.currentTimeMillis(); + + public TimestampedLong(final Long value) { + this.value = value; + } + + public Long getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + } + + private static class LongEntityAccess implements EntityAccess<TimestampedLong> { + + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } + + return new TimestampedLong(oldValue.getValue().longValue() + toAdd.getValue().longValue()); + } + + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } + + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/resources/logback-test.xml b/nifi-commons/nifi-utils/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8651d47 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration scan="true" scanPeriod="30 seconds"> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%-4r [%t] %-5p %c - %m%n</pattern> + </encoder> + </appender> + + <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> + <logger name="org.apache.nifi" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + </root> + +</configuration> + http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/pom.xml b/nifi-commons/nifi-web-utils/pom.xml new file mode 100644 index 0000000..8c51d7b --- /dev/null +++ b/nifi-commons/nifi-web-utils/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-web-utils</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security-utils</artifactId> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </dependency> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java new file mode 100644 index 0000000..1eaf366 --- /dev/null +++ b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.util; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import java.net.URI; +import java.util.Map; +import javax.ws.rs.core.MediaType; + +/** + * + */ +public class ClientUtils { + + private final Client client; + + public ClientUtils(Client client) { + this.client = client; + } + + /** + * Gets the content at the specified URI. + * + * @param uri the URI to get the content of + * @return the client response resulting from getting the content of the URI + * @throws ClientHandlerException if issues occur handling the request + * @throws UniformInterfaceException if any interface violations occur + */ + public ClientResponse get(final URI uri) throws ClientHandlerException, UniformInterfaceException { + return get(uri, null); + } + + /** + * Gets the content at the specified URI using the given query parameters. + * + * @param uri the URI to get the content of + * @param queryParams the query parameters to use in the request + * @return the client response resulting from getting the content of the URI + * @throws ClientHandlerException if issues occur handling the request + * @throws UniformInterfaceException if any interface violations occur + */ + public ClientResponse get(final URI uri, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + if (queryParams != null) { + for (final Map.Entry<String, String> queryEntry : queryParams.entrySet()) { + webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); + } + } + + return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + } + + /** + * Performs a POST using the specified url and entity body. + * + * @param uri the URI to post to + * @param entity the item to post + * @return the client response of the request + */ + public ClientResponse post(URI uri, Object entity) throws ClientHandlerException, UniformInterfaceException { + // get the resource + WebResource.Builder resourceBuilder = client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON); + + // include the request entity + if (entity != null) { + resourceBuilder = resourceBuilder.entity(entity); + } + + // perform the request + return resourceBuilder.post(ClientResponse.class); + } + + /** + * Performs a POST using the specified url and form data. + * + * @param uri the uri to post to + * @param formData the data to post + * @return the client reponse of the post + */ + public ClientResponse post(URI uri, Map<String, String> formData) throws ClientHandlerException, UniformInterfaceException { + // convert the form data + MultivaluedMapImpl entity = new MultivaluedMapImpl(); + for (String key : formData.keySet()) { + entity.add(key, formData.get(key)); + } + + // get the resource + WebResource.Builder resourceBuilder = client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED); + + // add the form data if necessary + if (!entity.isEmpty()) { + resourceBuilder = resourceBuilder.entity(entity); + } + + // perform the request + return resourceBuilder.post(ClientResponse.class); + } + + /** + * Performs a HEAD request to the specified URI. + * + * @param uri the uri to request the head of + * @return the client response of the request + * @throws ClientHandlerException for issues handling the request + * @throws UniformInterfaceException for issues with the request + */ + public ClientResponse head(final URI uri) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + return webResource.head(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java new file mode 100644 index 0000000..4e7f5b6 --- /dev/null +++ b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.util; + +import javax.ws.rs.ext.ContextResolver; +import javax.ws.rs.ext.Provider; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +@Provider +public class ObjectMapperResolver implements ContextResolver<ObjectMapper> { + + private final ObjectMapper mapper; + + public ObjectMapperResolver() throws Exception { + mapper = new ObjectMapper(); + + final AnnotationIntrospector jaxbIntrospector = new JaxbAnnotationIntrospector(); + final SerializationConfig serializationConfig = mapper.getSerializationConfig(); + final DeserializationConfig deserializationConfig = mapper.getDeserializationConfig(); + + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + mapper.setDeserializationConfig(deserializationConfig.withAnnotationIntrospector(jaxbIntrospector)); + } + + @Override + public ObjectMapper getContext(Class<?> objectType) { + return mapper; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java new file mode 100644 index 0000000..e27f91c --- /dev/null +++ b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.security.cert.Certificate; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; + +import org.apache.nifi.security.util.CertificateUtils; + +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.client.urlconnection.HTTPSProperties; + +/** + * Common utilities related to web development. + * + */ +public final class WebUtils { + + private static Logger logger = LoggerFactory.getLogger(WebUtils.class); + + final static ReadWriteLock lock = new ReentrantReadWriteLock(); + + private WebUtils() { + } + + /** + * Creates a client for non-secure requests. The client will be created + * using the given configuration. Additionally, the client will be + * automatically configured for JSON serialization/deserialization. + * + * @param config client configuration + * + * @return a Client instance + */ + public static Client createClient(final ClientConfig config) { + return createClientHelper(config, null); + } + + /** + * Creates a client for secure requests. The client will be created using + * the given configuration and security context. Additionally, the client + * will be automatically configured for JSON serialization/deserialization. + * + * @param config client configuration + * @param ctx security context + * + * @return a Client instance + */ + public static Client createClient(final ClientConfig config, final SSLContext ctx) { + return createClientHelper(config, ctx); + } + + /** + * A helper method for creating clients. The client will be created using + * the given configuration and security context. Additionally, the client + * will be automatically configured for JSON serialization/deserialization. + * + * @param config client configuration + * @param ctx security context, which may be null for non-secure client + * creation + * + * @return a Client instance + */ + private static Client createClientHelper(final ClientConfig config, final SSLContext ctx) { + + final ClientConfig finalConfig = (config == null) ? new DefaultClientConfig() : config; + + if (ctx != null && StringUtils.isBlank((String) finalConfig.getProperty(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES))) { + + // custom hostname verifier that checks subject alternative names against the hostname of the URI + final HostnameVerifier hostnameVerifier = new HostnameVerifier() { + @Override + public boolean verify(final String hostname, final SSLSession ssls) { + + try { + for (final Certificate peerCertificate : ssls.getPeerCertificates()) { + if (peerCertificate instanceof X509Certificate) { + final X509Certificate x509Cert = (X509Certificate) peerCertificate; + final List<String> subjectAltNames = CertificateUtils.getSubjectAlternativeNames(x509Cert); + if (subjectAltNames.contains(hostname.toLowerCase())) { + return true; + } + } + } + } catch (final SSLPeerUnverifiedException | CertificateParsingException ex) { + logger.warn("Hostname Verification encountered exception verifying hostname due to: " + ex, ex); + } + + return false; + } + }; + + finalConfig.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new HTTPSProperties(hostnameVerifier, ctx)); + } + + finalConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE); + finalConfig.getClasses().add(ObjectMapperResolver.class); + + // web client for restful request + return Client.create(finalConfig); + + } + + /** + * Serializes the given object to hexadecimal. Serialization uses Java's + * native serialization mechanism, the ObjectOutputStream. + * + * @param obj an object + * @return the serialized object as hex + */ + public static String serializeObjectToHex(final Serializable obj) { + + final ByteArrayOutputStream serializedObj = new ByteArrayOutputStream(); + + // IOException can never be thrown because we are serializing to an in memory byte array + try { + final ObjectOutputStream oos = new ObjectOutputStream(serializedObj); + oos.writeObject(obj); + oos.close(); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + + logger.debug(String.format("Serialized object '%s' size: %d", obj, serializedObj.size())); + + // hex encode the binary + return new String(Hex.encodeHex(serializedObj.toByteArray(), /* tolowercase */ true)); + } + + /** + * Deserializes a Java serialized, hex-encoded string into a Java object. + * This method is the inverse of the serializeObjectToHex method in this + * class. + * + * @param hexEncodedObject a string + * @return the object + * @throws ClassNotFoundException if the class could not be found + */ + public static Serializable deserializeHexToObject(final String hexEncodedObject) throws ClassNotFoundException { + + // decode the hex encoded object + byte[] serializedObj; + try { + serializedObj = Hex.decodeHex(hexEncodedObject.toCharArray()); + } catch (final DecoderException de) { + throw new IllegalArgumentException(de); + } + + // IOException can never be thrown because we are deserializing from an in memory byte array + try { + // deserialize bytes into object + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(serializedObj)); + return (Serializable) ois.readObject(); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-write-ahead-log/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/.gitignore b/nifi-commons/nifi-write-ahead-log/.gitignore new file mode 100755 index 0000000..19f2e00 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/.gitignore @@ -0,0 +1,2 @@ +/target +/target http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-write-ahead-log/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-write-ahead-log/pom.xml b/nifi-commons/nifi-write-ahead-log/pom.xml new file mode 100644 index 0000000..7e514a7 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-write-ahead-log</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + </dependencies> +</project>