Repository: nifi
Updated Branches:
  refs/heads/master 0bcb241db -> 9f95a10df


NIFI-4794: Updated event writers to avoid creating a lot of byte[] by reusing 
buffers. Also removed synchronization on EventWriter when rolling over the 
writer and just moved the writing of the header to happen before making the 
writer available to any other threads. This reduces thread contention during 
rollover.

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2437


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9f95a10d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9f95a10d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9f95a10d

Branch: refs/heads/master
Commit: 9f95a10df93f4e1cb5c80b62e06afe8b1d64314f
Parents: 0bcb241
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Jan 25 12:16:56 2018 -0500
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Mon Feb 19 09:31:11 2018 -0500

----------------------------------------------------------------------
 .../nifi/repository/schema/ByteArrayCache.java  | 48 +++++++++++++
 .../repository/schema/SchemaRecordWriter.java   | 71 +++++++++++++++-----
 .../provenance/EncryptedSchemaRecordReader.java | 17 +----
 .../EventIdFirstSchemaRecordWriter.java         | 65 ++++++++++--------
 .../serialization/CompressableRecordWriter.java | 11 ++-
 .../store/WriteAheadStorePartition.java         | 60 ++++++++---------
 .../util/ByteArrayDataOutputStream.java         | 39 +++++++++++
 .../util/ByteArrayDataOutputStreamCache.java    | 52 ++++++++++++++
 8 files changed, 264 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java
new file mode 100644
index 0000000..32e851a
--- /dev/null
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.repository.schema;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ByteArrayCache {
+    private final BlockingQueue<byte[]> queue;
+    private final int bufferSize;
+
+    public ByteArrayCache(final int maxCapacity, final int bufferSize) {
+        this.queue = new LinkedBlockingQueue<>(maxCapacity);
+        this.bufferSize = bufferSize;
+    }
+
+    public byte[] checkOut() {
+        final byte[] array = queue.poll();
+        if (array != null) {
+            return array;
+        }
+
+        return new byte[bufferSize];
+    }
+
+    public void checkIn(final byte[] array) {
+        if (array.length != bufferSize) {
+            return;
+        }
+
+        queue.offer(array);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
index 5305e5b..67d558a 100644
--- 
a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
+++ 
b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
@@ -34,39 +34,47 @@ public class SchemaRecordWriter {
     public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
 
     private static final Logger logger = 
LoggerFactory.getLogger(SchemaRecordWriter.class);
+    private static final int CACHE_BUFFER_SIZE = 65536;
+    private static final ByteArrayCache byteArrayCache = new 
ByteArrayCache(32, CACHE_BUFFER_SIZE);
 
     public void writeRecord(final Record record, final OutputStream out) 
throws IOException {
         // write sentinel value to indicate that there is a record. This 
allows the reader to then read one
         // byte and check if -1. If so, the reader knows there are no more 
records. If not, then the reader
         // knows that it should be able to continue reading.
         out.write(1);
-        writeRecordFields(record, out);
+
+        final byte[] buffer = byteArrayCache.checkOut();
+        try {
+            writeRecordFields(record, out, buffer);
+        } finally {
+            byteArrayCache.checkIn(buffer);
+        }
     }
 
-    private void writeRecordFields(final Record record, final OutputStream 
out) throws IOException {
-        writeRecordFields(record, record.getSchema(), out);
+    private void writeRecordFields(final Record record, final OutputStream 
out, final byte[] buffer) throws IOException {
+        writeRecordFields(record, record.getSchema(), out, buffer);
     }
 
-    private void writeRecordFields(final Record record, final RecordSchema 
schema, final OutputStream out) throws IOException {
+    private void writeRecordFields(final Record record, final RecordSchema 
schema, final OutputStream out, final byte[] buffer) throws IOException {
         final DataOutputStream dos = out instanceof DataOutputStream ? 
(DataOutputStream) out : new DataOutputStream(out);
         for (final RecordField field : schema.getFields()) {
             final Object value = record.getFieldValue(field);
 
             try {
-                writeFieldRepetitionAndValue(field, value, dos);
+                writeFieldRepetitionAndValue(field, value, dos, buffer);
             } catch (final Exception e) {
                 throw new IOException("Failed to write field '" + 
field.getFieldName() + "'", e);
             }
         }
     }
 
-    private void writeFieldRepetitionAndValue(final RecordField field, final 
Object value, final DataOutputStream dos) throws IOException {
+    private void writeFieldRepetitionAndValue(final RecordField field, final 
Object value, final DataOutputStream dos, final byte[] buffer) throws 
IOException {
         switch (field.getRepetition()) {
             case EXACTLY_ONE: {
                 if (value == null) {
                     throw new IllegalArgumentException("Record does not have a 
value for the '" + field.getFieldName() + "' but the field is required");
                 }
-                writeFieldValue(field, value, dos);
+                writeFieldValue(field, value, dos, buffer);
                 break;
             }
             case ZERO_OR_MORE: {
@@ -83,7 +91,7 @@ public class SchemaRecordWriter {
                 final Collection<?> collection = (Collection<?>) value;
                 dos.writeInt(collection.size());
                 for (final Object fieldValue : collection) {
-                    writeFieldValue(field, fieldValue, dos);
+                    writeFieldValue(field, fieldValue, dos, buffer);
                 }
                 break;
             }
@@ -93,14 +101,25 @@ public class SchemaRecordWriter {
                     break;
                 }
                 dos.write(1);
-                writeFieldValue(field, value, dos);
+                writeFieldValue(field, value, dos, buffer);
                 break;
             }
         }
     }
 
+    private boolean allSingleByteInUtf8(final String value) {
+        for (int i = 0; i < value.length(); i++) {
+            final char ch = value.charAt(i);
+            if (ch < 1 || ch > 127) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     @SuppressWarnings("unchecked")
-    private void writeFieldValue(final RecordField field, final Object value, 
final DataOutputStream out) throws IOException {
+    private void writeFieldValue(final RecordField field, final Object value, 
final DataOutputStream out, final byte[] buffer) throws IOException {
         switch (field.getFieldType()) {
             case BOOLEAN:
                 out.writeBoolean((boolean) value);
@@ -120,9 +139,27 @@ public class SchemaRecordWriter {
                 writeUTFLimited(out, (String) value, field.getFieldName());
                 break;
             case LONG_STRING:
-                final byte[] charArray = ((String) 
value).getBytes(StandardCharsets.UTF_8);
-                out.writeInt(charArray.length);
-                out.write(charArray);
+                // In many cases, we will see a String value that consists 
solely of values in the range of
+                // 1-127, which means that in UTF-8 they will translate into a 
single byte each. If all characters
+                // in the string adhere to this, then we can skip calling 
String.getBytes() because that will allocate
+                // a new byte[] every time, which results in a lot of pressure 
on the garbage collector.
+                final String string = (String) value;
+                final int length = string.length();
+
+                if (length <= buffer.length && allSingleByteInUtf8(string)) {
+                    out.writeInt(length);
+
+                    for (int i = 0; i < length; i++) {
+                        final char ch = string.charAt(i);
+                        buffer[i] = (byte) ch;
+                    }
+
+                    out.write(buffer, 0, length);
+                } else {
+                    final byte[] charArray = ((String) 
value).getBytes(StandardCharsets.UTF_8);
+                    out.writeInt(charArray.length);
+                    out.write(charArray);
+                }
                 break;
             case MAP:
                 final Map<Object, Object> map = (Map<Object, Object>) value;
@@ -132,19 +169,19 @@ public class SchemaRecordWriter {
                 final RecordField valueField = subFields.get(1);
 
                 for (final Map.Entry<Object, Object> entry : map.entrySet()) {
-                    writeFieldRepetitionAndValue(keyField, entry.getKey(), 
out);
-                    writeFieldRepetitionAndValue(valueField, entry.getValue(), 
out);
+                    writeFieldRepetitionAndValue(keyField, entry.getKey(), 
out, buffer);
+                    writeFieldRepetitionAndValue(valueField, entry.getValue(), 
out, buffer);
                 }
                 break;
             case UNION:
                 final NamedValue namedValue = (NamedValue) value;
                 writeUTFLimited(out, namedValue.getName(), 
field.getFieldName());
                 final Record childRecord = (Record) namedValue.getValue();
-                writeRecordFields(childRecord, out);
+                writeRecordFields(childRecord, out, buffer);
                 break;
             case COMPLEX:
                 final Record record = (Record) value;
-                writeRecordFields(record, out);
+                writeRecordFields(record, out, buffer);
                 break;
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java
index fcd7fee..e83ce20 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java
@@ -23,42 +23,29 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
+
 import org.apache.nifi.provenance.schema.LookupTableEventRecord;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.repository.schema.Record;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EncryptedSchemaRecordReader extends 
EventIdFirstSchemaRecordReader {
     private static final Logger logger = 
LoggerFactory.getLogger(EncryptedSchemaRecordReader.class);
 
-    private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000;
-
     private ProvenanceEventEncryptor provenanceEventEncryptor;
 
-    private static final TimedBuffer<TimestampedLong> decryptTimes = new 
TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-
-    private int debugFrequency = DEFAULT_DEBUG_FREQUENCY;
     public static final int SERIALIZATION_VERSION = 1;
 
     public static final String SERIALIZATION_NAME = 
"EncryptedSchemaRecordWriter";
 
-    public EncryptedSchemaRecordReader(final InputStream inputStream, final 
String filename, final TocReader tocReader, final int maxAttributeChars,
-                                       ProvenanceEventEncryptor 
provenanceEventEncryptor) throws IOException {
-        this(inputStream, filename, tocReader, maxAttributeChars, 
provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY);
-    }
 
     public EncryptedSchemaRecordReader(final InputStream inputStream, final 
String filename, final TocReader tocReader, final int maxAttributeChars,
-                                       ProvenanceEventEncryptor 
provenanceEventEncryptor, int debugFrequency) throws IOException {
+                                       ProvenanceEventEncryptor 
provenanceEventEncryptor) throws IOException {
         super(inputStream, filename, tocReader, maxAttributeChars);
         this.provenanceEventEncryptor = provenanceEventEncryptor;
-        this.debugFrequency = debugFrequency;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
index 8f5b2b2..05e6736 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.nifi.provenance.schema.EventFieldNames;
 import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
 import org.apache.nifi.provenance.schema.LookupTableEventRecord;
@@ -36,6 +37,8 @@ import 
org.apache.nifi.provenance.schema.LookupTableEventSchema;
 import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
 import org.apache.nifi.provenance.serialization.StorageSummary;
 import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.provenance.util.ByteArrayDataOutputStream;
+import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache;
 import org.apache.nifi.repository.schema.FieldMapRecord;
 import org.apache.nifi.repository.schema.Record;
 import org.apache.nifi.repository.schema.RecordSchema;
@@ -73,6 +76,8 @@ public class EventIdFirstSchemaRecordWriter extends 
CompressableRecordWriter {
     private static final TimedBuffer<TimestampedLong> bytesWritten = new 
TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
     private static final AtomicLong totalRecordCount = new AtomicLong(0L);
 
+    private static final ByteArrayDataOutputStreamCache streamCache = new 
ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024);
+
     private long firstEventId;
     private long systemTimeOffset;
 
@@ -113,39 +118,43 @@ public class EventIdFirstSchemaRecordWriter extends 
CompressableRecordWriter {
             throw new IOException("Cannot update Provenance Repository because 
this Record Writer has already failed to write to the Repository");
         }
 
-        final long serializeStart = System.nanoTime();
-        final byte[] serialized;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
-            final DataOutputStream dos = new DataOutputStream(baos)) {
-            writeRecord(record, 0L, dos);
-            serialized = baos.toByteArray();
-        }
-
-        final long lockStart = System.nanoTime();
+        final long lockStart;
         final long writeStart;
         final long startBytes;
         final long endBytes;
         final long recordIdentifier;
-        synchronized (this) {
-            writeStart = System.nanoTime();
-            try {
-                recordIdentifier = record.getEventId() == -1L ? 
getIdGenerator().getAndIncrement() : record.getEventId();
-                startBytes = getBytesWritten();
-
-                ensureStreamState(recordIdentifier, startBytes);
-
-                final DataOutputStream out = getBufferedOutputStream();
-                final int recordIdOffset = (int) (recordIdentifier - 
firstEventId);
-                out.writeInt(recordIdOffset);
-                out.writeInt(serialized.length);
-                out.write(serialized);
-
-                recordCount.incrementAndGet();
-                endBytes = getBytesWritten();
-            } catch (final IOException ioe) {
-                markDirty();
-                throw ioe;
+
+        final long serializeStart = System.nanoTime();
+        final ByteArrayDataOutputStream bados = streamCache.checkOut();
+        try {
+            writeRecord(record, 0L, bados.getDataOutputStream());
+
+            lockStart = System.nanoTime();
+            synchronized (this) {
+                writeStart = System.nanoTime();
+                try {
+                    recordIdentifier = record.getEventId() == -1L ? 
getIdGenerator().getAndIncrement() : record.getEventId();
+                    startBytes = getBytesWritten();
+
+                    ensureStreamState(recordIdentifier, startBytes);
+
+                    final DataOutputStream out = getBufferedOutputStream();
+                    final int recordIdOffset = (int) (recordIdentifier - 
firstEventId);
+                    out.writeInt(recordIdOffset);
+
+                    final ByteArrayOutputStream baos = 
bados.getByteArrayOutputStream();
+                    out.writeInt(baos.size());
+                    baos.writeTo(out);
+
+                    recordCount.incrementAndGet();
+                    endBytes = getBytesWritten();
+                } catch (final IOException ioe) {
+                    markDirty();
+                    throw ioe;
+                }
             }
+        } finally {
+            streamCache.checkIn(bados);
         }
 
         if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
index b564600..21e2c07 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
@@ -56,7 +56,7 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
 
         this.compressed = compressed;
         this.fos = new FileOutputStream(file);
-        rawOutStream = new ByteCountingOutputStream(fos);
+        rawOutStream = new ByteCountingOutputStream(new 
BufferedOutputStream(fos));
         this.uncompressedBlockSize = uncompressedBlockSize;
         this.idGenerator = idGenerator;
     }
@@ -68,7 +68,7 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
 
         this.compressed = compressed;
         this.uncompressedBlockSize = uncompressedBlockSize;
-        this.rawOutStream = new ByteCountingOutputStream(out);
+        this.rawOutStream = new ByteCountingOutputStream(new 
BufferedOutputStream(out));
         this.idGenerator = idGenerator;
     }
 
@@ -114,7 +114,6 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
             final long byteOffset = (byteCountingOut == null) ? 
rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
             final TocWriter tocWriter = getTocWriter();
 
-            final OutputStream writableStream;
             if (compressed) {
                 // because of the way that GZIPOutputStream works, we need to 
call close() on it in order for it
                 // to write its trailing bytes. But we don't want to close the 
underlying OutputStream, so we wrap
@@ -128,16 +127,16 @@ public abstract class CompressableRecordWriter extends 
AbstractRecordWriter {
                     tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), 
eventId);
                 }
 
-                writableStream = new BufferedOutputStream(new 
GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+                final OutputStream writableStream = new 
BufferedOutputStream(new GZIPOutputStream(new 
NonCloseableOutputStream(rawOutStream), 1), 65536);
+                this.byteCountingOut = new 
ByteCountingOutputStream(writableStream, byteOffset);
             } else {
                 if (tocWriter != null && eventId != null) {
                     tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), 
eventId);
                 }
 
-                writableStream = new BufferedOutputStream(rawOutStream, 65536);
+                this.byteCountingOut = rawOutStream;
             }
 
-            this.byteCountingOut = new 
ByteCountingOutputStream(writableStream, byteOffset);
             this.out = new DataOutputStream(byteCountingOut);
             resetDirtyFlag();
         } catch (final IOException ioe) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index 22d2a5f..2de78f8 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory;
 public class WriteAheadStorePartition implements EventStorePartition {
     private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadStorePartition.class);
 
-
     private final RepositoryConfiguration config;
     private final File partitionDirectory;
     private final String partitionName;
@@ -253,48 +252,43 @@ public class WriteAheadStorePartition implements 
EventStorePartition {
         final long nextEventId = idGenerator.get();
         final File updatedEventFile = new File(partitionDirectory, nextEventId 
+ ".prov");
         final RecordWriter updatedWriter = 
recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true);
+        updatedWriter.writeHeader(nextEventId);
 
-        // Synchronize on the writer to ensure that no other thread is able to 
obtain the writer and start writing events to it until after it has
-        // been fully initialized (i.e., the header has been written, etc.)
-        synchronized (updatedWriter) {
-            final RecordWriterLease updatedLease = new 
RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), 
config.getMaxEventFileCount());
-            final boolean updated = eventWriterLeaseRef.compareAndSet(lease, 
updatedLease);
+        final RecordWriterLease updatedLease = new 
RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), 
config.getMaxEventFileCount());
+        final boolean updated = eventWriterLeaseRef.compareAndSet(lease, 
updatedLease);
 
-            if (updated) {
-                if (lease != null) {
-                    lease.close();
-                }
+        if (!updated) {
+            try {
+                updatedWriter.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close Record Writer {}; some resources 
may not be cleaned up properly.", updatedWriter, e);
+            }
 
-                updatedWriter.writeHeader(nextEventId);
+            updatedEventFile.delete();
+            return false;
+        }
 
-                synchronized (minEventIdToPathMap) {
-                    minEventIdToPathMap.put(nextEventId, updatedEventFile);
-                }
+        if (lease != null) {
+            lease.close();
+        }
 
-                if (config.isCompressOnRollover() && lease != null && 
lease.getWriter() != null) {
-                    boolean offered = false;
-                    while (!offered && !closed) {
-                        try {
-                            offered = 
filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
-                        } catch (final InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            throw new IOException("Interrupted while waiting 
to enqueue " + lease.getWriter().getFile() + " for compression");
-                        }
-                    }
-                }
+        synchronized (minEventIdToPathMap) {
+            minEventIdToPathMap.put(nextEventId, updatedEventFile);
+        }
 
-                return true;
-            } else {
+        if (config.isCompressOnRollover() && lease != null && 
lease.getWriter() != null) {
+            boolean offered = false;
+            while (!offered && !closed) {
                 try {
-                    updatedWriter.close();
-                } catch (final Exception e) {
-                    logger.warn("Failed to close Record Writer {}; some 
resources may not be cleaned up properly.", updatedWriter, e);
+                    offered = 
filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("Interrupted while waiting to 
enqueue " + lease.getWriter().getFile() + " for compression");
                 }
-
-                updatedEventFile.delete();
-                return false;
             }
         }
+
+        return true;
     }
 
     private Map<ProvenanceEventRecord, StorageSummary> addEvents(final 
Iterable<ProvenanceEventRecord> events, final RecordWriter writer) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
new file mode 100644
index 0000000..23aefb3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+public class ByteArrayDataOutputStream {
+    private final ByteArrayOutputStream baos;
+    private final DataOutputStream dos;
+
+    public ByteArrayDataOutputStream(final int initialCapacity) {
+        baos = new ByteArrayOutputStream(initialCapacity);
+        dos = new DataOutputStream(baos);
+    }
+
+    public ByteArrayOutputStream getByteArrayOutputStream() {
+        return baos;
+    }
+
+    public DataOutputStream getDataOutputStream() {
+        return dos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9f95a10d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
new file mode 100644
index 0000000..9535590
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ByteArrayDataOutputStreamCache {
+    private final BlockingQueue<ByteArrayDataOutputStream> queue;
+    private final int initialBufferSize;
+    private final int maxBufferSize;
+
+    public ByteArrayDataOutputStreamCache(final int maxCapacity, final int 
initialBufferSize, final int maxBufferSize) {
+        this.queue = new LinkedBlockingQueue<>(maxCapacity);
+        this.initialBufferSize = initialBufferSize;
+        this.maxBufferSize = maxBufferSize;
+    }
+
+    public ByteArrayDataOutputStream checkOut() {
+        final ByteArrayDataOutputStream stream = queue.poll();
+        if (stream != null) {
+            return stream;
+        }
+
+        return new ByteArrayDataOutputStream(initialBufferSize);
+    }
+
+    public void checkIn(final ByteArrayDataOutputStream bados) {
+        final int size = bados.getByteArrayOutputStream().size();
+        if (size > maxBufferSize) {
+            return;
+        }
+
+        bados.getByteArrayOutputStream().reset();
+        queue.offer(bados);
+    }
+}

Reply via email to