[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/1116


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-17 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83643595
  
--- Diff: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of demarcator of text lines in the provided
+ * {@link InputStream}. It works similar to the {@link BufferedReader} and 
its
+ * {@link BufferedReader#readLine()} methods except that it does not 
create a
+ * String representing the text line and instead returns the offset info 
for the
+ * computed text line. See {@link #nextOffsetInfo()} and
+ * {@link #nextOffsetInfo(byte[])} for more details.
+ * 
+ * This class is NOT thread-safe.
+ * 
+ */
+public class TextLineDemarcator {
+
+private final static int INIT_BUFFER_SIZE = 8192;
+
+private final InputStream is;
+
+private final int initialBufferSize;
+
+private byte[] buffer;
+
+private int index;
+
+private int mark;
+
+private long offset;
+
+private int bufferLength;
+
+/**
+ * Constructs an instance of demarcator with provided {@link 
InputStream}
+ * and default buffer size.
+ */
+public TextLineDemarcator(InputStream is) {
+this(is, INIT_BUFFER_SIZE);
+}
+
+/**
+ * Constructs an instance of demarcator with provided {@link 
InputStream}
+ * and initial buffer size.
+ */
+public TextLineDemarcator(InputStream is, int initialBufferSize) {
+if (is == null) {
+throw new IllegalArgumentException("'is' must not be null.");
+}
+if (initialBufferSize < 1) {
+throw new IllegalArgumentException("'initialBufferSize' must 
be > 0.");
+}
+this.is = is;
+this.initialBufferSize = initialBufferSize;
+this.buffer = new byte[initialBufferSize];
+}
+
+/**
+ * Will compute the next offset info for a
+ * text line (line terminated by either '\r', '\n' or '\r\n').
+ * 
+ * The offset info computed and returned as long[] 
consisting of
+ * 4 elements {startOffset, length, crlfLength, 
startsWithMatch}.
+ *  
+ *startOffset - the offset in the overall stream which 
represents the beginning of the text line
+ *length - length of the text line including CRLF 
characters
+ *crlfLength - the length of the CRLF. Could be either 
1 (if line ends with '\n' or '\r')
+ *  or 2 (if line ends with 
'\r\n').
+ *startsWithMatch - value is always 1. See {@link 
#nextOffsetInfo(byte[])} for more info.
+ *  
+ *
+ * @return offset info as long[]
+ */
+public long[] nextOffsetInfo() {
--- End diff --

Yes, it would be easier to read, but based on running some performance 
tests there is also a price to pay for it although not very significant. Will 
change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-17 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83639515
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83256735
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83255847
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83261038
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83260562
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83260605
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83258027
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83262537
  
--- Diff: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of demarcator of text lines in the provided
+ * {@link InputStream}. It works similar to the {@link BufferedReader} and 
its
+ * {@link BufferedReader#readLine()} methods except that it does not 
create a
+ * String representing the text line and instead returns the offset info 
for the
+ * computed text line. See {@link #nextOffsetInfo()} and
+ * {@link #nextOffsetInfo(byte[])} for more details.
+ * 
+ * This class is NOT thread-safe.
+ * 
+ */
+public class TextLineDemarcator {
+
+private final static int INIT_BUFFER_SIZE = 8192;
+
+private final InputStream is;
+
+private final int initialBufferSize;
+
+private byte[] buffer;
+
+private int index;
+
+private int mark;
+
+private long offset;
+
+private int bufferLength;
+
+/**
+ * Constructs an instance of demarcator with provided {@link 
InputStream}
+ * and default buffer size.
+ */
+public TextLineDemarcator(InputStream is) {
+this(is, INIT_BUFFER_SIZE);
+}
+
+/**
+ * Constructs an instance of demarcator with provided {@link 
InputStream}
+ * and initial buffer size.
+ */
+public TextLineDemarcator(InputStream is, int initialBufferSize) {
+if (is == null) {
+throw new IllegalArgumentException("'is' must not be null.");
+}
+if (initialBufferSize < 1) {
+throw new IllegalArgumentException("'initialBufferSize' must 
be > 0.");
+}
+this.is = is;
+this.initialBufferSize = initialBufferSize;
+this.buffer = new byte[initialBufferSize];
+}
+
+/**
+ * Will compute the next offset info for a
+ * text line (line terminated by either '\r', '\n' or '\r\n').
+ * 
+ * The offset info computed and returned as long[] 
consisting of
+ * 4 elements {startOffset, length, crlfLength, 
startsWithMatch}.
+ *  
+ *startOffset - the offset in the overall stream which 
represents the beginning of the text line
+ *length - length of the text line including CRLF 
characters
+ *crlfLength - the length of the CRLF. Could be either 
1 (if line ends with '\n' or '\r')
+ *  or 2 (if line ends with 
'\r\n').
+ *startsWithMatch - value is always 1. See {@link 
#nextOffsetInfo(byte[])} for more info.
+ *  
+ *
+ * @return offset info as long[]
+ */
+public long[] nextOffsetInfo() {
--- End diff --

Why are we returning a long[] here instead of a POJO? This makes the code 
more difficult to follow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83260752
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83260548
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83263078
  
--- Diff: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of demarcator of text lines in the provided
+ * {@link InputStream}. It works similar to the {@link BufferedReader} and 
its
+ * {@link BufferedReader#readLine()} methods except that it does not 
create a
+ * String representing the text line and instead returns the offset info 
for the
+ * computed text line. See {@link #nextOffsetInfo()} and
+ * {@link #nextOffsetInfo(byte[])} for more details.
+ * 
+ * This class is NOT thread-safe.
+ * 
+ */
+public class TextLineDemarcator {
+
+private final static int INIT_BUFFER_SIZE = 8192;
+
+private final InputStream is;
+
+private final int initialBufferSize;
+
+private byte[] buffer;
+
+private int index;
+
+private int mark;
+
+private long offset;
+
+private int bufferLength;
+
+/**
+ * Constructs an instance of demarcator with provided {@link 
InputStream}
+ * and default buffer size.
+ */
+public TextLineDemarcator(InputStream is) {
+this(is, INIT_BUFFER_SIZE);
+}
+
+/**
+ * Constructs an instance of demarcator with provided {@link 
InputStream}
+ * and initial buffer size.
+ */
+public TextLineDemarcator(InputStream is, int initialBufferSize) {
+if (is == null) {
+throw new IllegalArgumentException("'is' must not be null.");
+}
+if (initialBufferSize < 1) {
+throw new IllegalArgumentException("'initialBufferSize' must 
be > 0.");
+}
+this.is = is;
+this.initialBufferSize = initialBufferSize;
+this.buffer = new byte[initialBufferSize];
+}
+
+/**
+ * Will compute the next offset info for a
+ * text line (line terminated by either '\r', '\n' or '\r\n').
+ * 
+ * The offset info computed and returned as long[] 
consisting of
+ * 4 elements {startOffset, length, crlfLength, 
startsWithMatch}.
+ *  
+ *startOffset - the offset in the overall stream which 
represents the beginning of the text line
+ *length - length of the text line including CRLF 
characters
+ *crlfLength - the length of the CRLF. Could be either 
1 (if line ends with '\n' or '\r')
+ *  or 2 (if line ends with 
'\r\n').
+ *startsWithMatch - value is always 1. See {@link 
#nextOffsetInfo(byte[])} for more info.
+ *  
+ *
+ * @return offset info as long[]
+ */
+public long[] nextOffsetInfo() {
+return this.nextOffsetInfo(null);
+}
+
+/**
+ * Will compute the next offset info for a
+ * text line (line terminated by either '\r', '\n' or '\r\n').
+ * 
+ * The offset info computed and returned as long[] 
consisting of
+ * 4 elements {startOffset, length, crlfLength, 
startsWithMatch}.
+ *  
+ *startOffset - the offset in the overall stream which 
represents the beginning of the text line
+ *length - length of the text line including CRLF 
characters
+ *crlfLength - the length of the CRLF. Could be either 
1 (if line ends with '\n' or '\r')
+ *  or 2 (if line ends with 
'\r\n').
+ *startsWithMatch - value is always 1 unless 
'startsWith' is provided. If 'startsWith' is provided it will
+ *  be compared to the 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83256378
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83260348
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83257640
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // 

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

2016-10-13 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1116#discussion_r83260507
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -150,548 +145,320 @@
 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
 .build();
 
-private List properties;
-private Set relationships;
+private static final List properties;
+private static final Set relationships;
 
-@Override
-protected void init(final ProcessorInitializationContext context) {
-final List properties = new ArrayList<>();
+static {
+properties = new ArrayList<>();
 properties.add(LINE_SPLIT_COUNT);
 properties.add(FRAGMENT_MAX_SIZE);
 properties.add(HEADER_LINE_COUNT);
 properties.add(HEADER_MARKER);
 properties.add(REMOVE_TRAILING_NEWLINES);
-this.properties = Collections.unmodifiableList(properties);
 
-final Set relationships = new HashSet<>();
+relationships = new HashSet<>();
 relationships.add(REL_ORIGINAL);
 relationships.add(REL_SPLITS);
 relationships.add(REL_FAILURE);
-this.relationships = Collections.unmodifiableSet(relationships);
 }
 
-@Override
-protected Collection 
customValidate(ValidationContext validationContext) {
-List results = new ArrayList<>();
-
-final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-&& 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
-results.add(new ValidationResult.Builder()
-.subject("Maximum Fragment Size")
-.valid(!invalidState)
-.explanation("Property must be specified when Line Split Count 
is 0")
-.build()
-);
-return results;
-}
-
-@Override
-public Set getRelationships() {
-return relationships;
-}
-
-@Override
-protected List getSupportedPropertyDescriptors() {
-return properties;
-}
-
-private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
-  final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
-final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-byte[] leadingBytes = leadingNewLineBytes;
-int numLines = 0;
-long totalBytes = 0L;
-for (int i = 0; i < maxNumLines; i++) {
-final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-final long bytes = eolMarker.getBytesConsumed();
-leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-if (includeLineDelimiter && out != null) {
-if (leadingBytes != null) {
-out.write(leadingBytes);
-leadingBytes = null;
-}
-eolBuffer.drainTo(out);
-}
-totalBytes += bytes;
-if (bytes <= 0) {
-return numLines;
-}
-numLines++;
-if (totalBytes >= maxByteCount) {
-break;
-}
-}
-return numLines;
-}
-
-private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
-   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
-long bytesRead = 0L;
-final ByteArrayOutputStream buffer;
-if (out != null) {
-buffer = new ByteArrayOutputStream();
-} else {
-buffer = null;
-}
-byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-in.mark(Integer.MAX_VALUE);
-while (true) {
-final int nextByte = in.read();
-
-// if we hit end of stream we're done
-if (nextByte == -1) {
-if (buffer != null) {
-buffer.writeTo(out);
-buffer.close();
-}
-return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  //