[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663176#comment-15663176 ] ASF GitHub Bot commented on NIFI-2851: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1215 > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663175#comment-15663175 ] ASF subversion and git services commented on NIFI-2851: --- Commit 13ea909122a39f36b7c63effabca0fe8a43298aa in nifi's branch refs/heads/master from [~ijokarumawak] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=13ea909 ] NIFI-2851: Fixed CheckStyle error. This closes #1215. Signed-off-by: Andy LoPresto > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663166#comment-15663166 ] ASF GitHub Bot commented on NIFI-2851: -- Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/1215 Found this as well during my PR. Checked that Koji's fix resolves the checkstyle error. Merging. > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662836#comment-15662836 ] ASF GitHub Bot commented on NIFI-2851: -- GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/1215 NIFI-2851: Fixed CheckStyle error. The latest master branch fails with CheckStyle by a test class introduced by NIFI-2851. https://travis-ci.org/apache/nifi/builds/175185419 Confirmed this PR fixes the CheckStyle error locally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-2851 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1215.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1215 commit 15f314c0f07df898aafb08788859a6ba4f05f177 Author: Koji Kawamura Date: 2016-11-14T05:28:05Z NIFI-2851: Fixed CheckStyle error. > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658142#comment-15658142 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/1116 @olegz thanks for jumping on this. Sorry it's taken me so long to get back to it. I verified the changes are good now. I added an additional unit test to verify a corner case that was problematic in the StreamDemarcator and all is looking good. +1 merged to master! > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658137#comment-15658137 ] ASF subversion and git services commented on NIFI-2851: --- Commit ad924745933c3e018e0d78282d9d085978b8cd2b in nifi's branch refs/heads/master from [~markap14] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=ad92474 ] NIFI-2851: Added additional unit test to ensure correctness of demarcation when demarcator falls between buffered data This closes #1116. > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658138#comment-15658138 ] ASF GitHub Bot commented on NIFI-2851: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1116 > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658136#comment-15658136 ] ASF subversion and git services commented on NIFI-2851: --- Commit 41f519e84cb5d0bf8be4ce93e26a042d16cda273 in nifi's branch refs/heads/master from [~ozhurakousky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=41f519e ] NIFI-2851 initial commit of perf improvements on SplitText - introduced org.apache.nifi.stream.io.util.TextLineDemarcator - refactored SplitText to use org.apache.nifi.stream.io.util.TextLineDemarcator - updated SplitText's capability discription to provide more clarity around splits with headers. > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582434#comment-15582434 ] ASF GitHub Bot commented on NIFI-2851: -- Github user olegz commented on the issue: https://github.com/apache/nifi/pull/1116 @markap14 PR comments are addressed. Thanks for reviewing. Back at ya! > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582286#comment-15582286 ] ASF GitHub Bot commented on NIFI-2851: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83643595 --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + * + * This class is NOT thread-safe. + * + */ +public class TextLineDemarcator { + +private final static int INIT_BUFFER_SIZE = 8192; + +private final InputStream is; + +private final int initialBufferSize; + +private byte[] buffer; + +private int index; + +private int mark; + +private long offset; + +private int bufferLength; + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ +public TextLineDemarcator(InputStream is) { +this(is, INIT_BUFFER_SIZE); +} + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ +public TextLineDemarcator(InputStream is, int initialBufferSize) { +if (is == null) { +throw new IllegalArgumentException("'is' must not be null."); +} +if (initialBufferSize < 1) { +throw new IllegalArgumentException("'initialBufferSize' must be > 0."); +} +this.is = is; +this.initialBufferSize = initialBufferSize; +this.buffer = new byte[initialBufferSize]; +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info. + * + * + * @return offset info as long[] + */ +public long[] nextOffsetInfo() { --- End diff -- Yes, it would be easier to read, but based on running some performance tests there is also a price to pay for it although not very significant. Will change > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line ch
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582224#comment-15582224 ] ASF GitHub Bot commented on NIFI-2851: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83639515 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572537#comment-15572537 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83261038 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572527#comment-15572527 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260348 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572530#comment-15572530 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260548 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572524#comment-15572524 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83257640 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572528#comment-15572528 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260752 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572532#comment-15572532 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83262537 --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + * + * This class is NOT thread-safe. + * + */ +public class TextLineDemarcator { + +private final static int INIT_BUFFER_SIZE = 8192; + +private final InputStream is; + +private final int initialBufferSize; + +private byte[] buffer; + +private int index; + +private int mark; + +private long offset; + +private int bufferLength; + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ +public TextLineDemarcator(InputStream is) { +this(is, INIT_BUFFER_SIZE); +} + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ +public TextLineDemarcator(InputStream is, int initialBufferSize) { +if (is == null) { +throw new IllegalArgumentException("'is' must not be null."); +} +if (initialBufferSize < 1) { +throw new IllegalArgumentException("'initialBufferSize' must be > 0."); +} +this.is = is; +this.initialBufferSize = initialBufferSize; +this.buffer = new byte[initialBufferSize]; +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info. + * + * + * @return offset info as long[] + */ +public long[] nextOffsetInfo() { --- End diff -- Why are we returning a long[] here instead of a POJO? This makes the code more difficult to follow. > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 li
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572536#comment-15572536 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260605 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572534#comment-15572534 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83258027 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572526#comment-15572526 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260507 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572533#comment-15572533 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83255847 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572535#comment-15572535 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260562 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572531#comment-15572531 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83256735 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572525#comment-15572525 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83263078 --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + * + * This class is NOT thread-safe. + * + */ +public class TextLineDemarcator { + +private final static int INIT_BUFFER_SIZE = 8192; + +private final InputStream is; + +private final int initialBufferSize; + +private byte[] buffer; + +private int index; + +private int mark; + +private long offset; + +private int bufferLength; + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ +public TextLineDemarcator(InputStream is) { +this(is, INIT_BUFFER_SIZE); +} + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ +public TextLineDemarcator(InputStream is, int initialBufferSize) { +if (is == null) { +throw new IllegalArgumentException("'is' must not be null."); +} +if (initialBufferSize < 1) { +throw new IllegalArgumentException("'initialBufferSize' must be > 0."); +} +this.is = is; +this.initialBufferSize = initialBufferSize; +this.buffer = new byte[initialBufferSize]; +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info. + * + * + * @return offset info as long[] + */ +public long[] nextOffsetInfo() { +return this.nextOffsetInfo(null); +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + *
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572529#comment-15572529 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83256378 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572440#comment-15572440 ] Oleg Zhurakousky commented on NIFI-2851: So the test has been fixed and CapabilityDescription was updated > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1434#comment-1434 ] ASF GitHub Bot commented on NIFI-2851: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r82415774 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java --- @@ -798,6 +798,7 @@ public void testWithSplitThatStartsWithNewLine() { } @Test +@Ignore // temporary, fixing it --- End diff -- No, didn't want to hold the PR as this is kind of an edge case which I hope to address after thinking about it a bit. Basically the assertion fails here ```splits.get(1).assertContentEquals("");``` since it now comes it as new line. But I also wonder why is this a split to begin with with no data? So, i wanted to look some more as well as see if it is actually a bug that needs to be fixed, but didn't want to hold the PR since regardless this is an edge case. > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1414#comment-1414 ] ASF GitHub Bot commented on NIFI-2851: -- GitHub user olegz opened a pull request: https://github.com/apache/nifi/pull/1116 NIFI-2851 initial comit of perf improvements on SplitText Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/olegz/nifi NIFI-2851 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1116.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1116 commit 0818d8689cb474bb1ad81dcd672cb0e31078825e Author: Oleg Zhurakousky Date: 2016-10-07T15:37:32Z NIFI-2851 initial comit of perf improvements on SplitText > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2851) Improve performance of SplitText
[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1425#comment-1425 ] ASF GitHub Bot commented on NIFI-2851: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r82414967 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java --- @@ -798,6 +798,7 @@ public void testWithSplitThatStartsWithNewLine() { } @Test +@Ignore // temporary, fixing it --- End diff -- @olegz did you overlook this? > Improve performance of SplitText > > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)