[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1116 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83643595 --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + * + * This class is NOT thread-safe. + * + */ +public class TextLineDemarcator { + +private final static int INIT_BUFFER_SIZE = 8192; + +private final InputStream is; + +private final int initialBufferSize; + +private byte[] buffer; + +private int index; + +private int mark; + +private long offset; + +private int bufferLength; + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ +public TextLineDemarcator(InputStream is) { +this(is, INIT_BUFFER_SIZE); +} + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ +public TextLineDemarcator(InputStream is, int initialBufferSize) { +if (is == null) { +throw new IllegalArgumentException("'is' must not be null."); +} +if (initialBufferSize < 1) { +throw new IllegalArgumentException("'initialBufferSize' must be > 0."); +} +this.is = is; +this.initialBufferSize = initialBufferSize; +this.buffer = new byte[initialBufferSize]; +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info. + * + * + * @return offset info as long[] + */ +public long[] nextOffsetInfo() { --- End diff -- Yes, it would be easier to read, but based on running some performance tests there is also a price to pay for it although not very significant. Will change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83639515 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83256735 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83255847 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83261038 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260562 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260605 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83258027 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83262537 --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + * + * This class is NOT thread-safe. + * + */ +public class TextLineDemarcator { + +private final static int INIT_BUFFER_SIZE = 8192; + +private final InputStream is; + +private final int initialBufferSize; + +private byte[] buffer; + +private int index; + +private int mark; + +private long offset; + +private int bufferLength; + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ +public TextLineDemarcator(InputStream is) { +this(is, INIT_BUFFER_SIZE); +} + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ +public TextLineDemarcator(InputStream is, int initialBufferSize) { +if (is == null) { +throw new IllegalArgumentException("'is' must not be null."); +} +if (initialBufferSize < 1) { +throw new IllegalArgumentException("'initialBufferSize' must be > 0."); +} +this.is = is; +this.initialBufferSize = initialBufferSize; +this.buffer = new byte[initialBufferSize]; +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info. + * + * + * @return offset info as long[] + */ +public long[] nextOffsetInfo() { --- End diff -- Why are we returning a long[] here instead of a POJO? This makes the code more difficult to follow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260752 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260548 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83263078 --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + * + * This class is NOT thread-safe. + * + */ +public class TextLineDemarcator { + +private final static int INIT_BUFFER_SIZE = 8192; + +private final InputStream is; + +private final int initialBufferSize; + +private byte[] buffer; + +private int index; + +private int mark; + +private long offset; + +private int bufferLength; + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ +public TextLineDemarcator(InputStream is) { +this(is, INIT_BUFFER_SIZE); +} + +/** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ +public TextLineDemarcator(InputStream is, int initialBufferSize) { +if (is == null) { +throw new IllegalArgumentException("'is' must not be null."); +} +if (initialBufferSize < 1) { +throw new IllegalArgumentException("'initialBufferSize' must be > 0."); +} +this.is = is; +this.initialBufferSize = initialBufferSize; +this.buffer = new byte[initialBufferSize]; +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info. + * + * + * @return offset info as long[] + */ +public long[] nextOffsetInfo() { +return this.nextOffsetInfo(null); +} + +/** + * Will compute the next offset info for a + * text line (line terminated by either '\r', '\n' or '\r\n'). + * + * The offset info computed and returned as long[] consisting of + * 4 elements {startOffset, length, crlfLength, startsWithMatch}. + * + *startOffset - the offset in the overall stream which represents the beginning of the text line + *length - length of the text line including CRLF characters + *crlfLength - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r') + * or 2 (if line ends with '\r\n'). + *startsWithMatch - value is always 1 unless 'startsWith' is provided. If 'startsWith' is provided it will + * be compared to the
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83256378 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260348 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83257640 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //
[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260507 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); -private List properties; -private Set relationships; +private static final List properties; +private static final Set relationships; -@Override -protected void init(final ProcessorInitializationContext context) { -final List properties = new ArrayList<>(); +static { +properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); -this.properties = Collections.unmodifiableList(properties); -final Set relationships = new HashSet<>(); +relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); -this.relationships = Collections.unmodifiableSet(relationships); } -@Override -protected Collection customValidate(ValidationContext validationContext) { -List results = new ArrayList<>(); - -final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 -&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - -results.add(new ValidationResult.Builder() -.subject("Maximum Fragment Size") -.valid(!invalidState) -.explanation("Property must be specified when Line Split Count is 0") -.build() -); -return results; -} - -@Override -public Set getRelationships() { -return relationships; -} - -@Override -protected List getSupportedPropertyDescriptors() { -return properties; -} - -private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { -final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - -byte[] leadingBytes = leadingNewLineBytes; -int numLines = 0; -long totalBytes = 0L; -for (int i = 0; i < maxNumLines; i++) { -final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); -final long bytes = eolMarker.getBytesConsumed(); -leadingBytes = eolMarker.getLeadingNewLineBytes(); - -if (includeLineDelimiter && out != null) { -if (leadingBytes != null) { -out.write(leadingBytes); -leadingBytes = null; -} -eolBuffer.drainTo(out); -} -totalBytes += bytes; -if (bytes <= 0) { -return numLines; -} -numLines++; -if (totalBytes >= maxByteCount) { -break; -} -} -return numLines; -} - -private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { -long bytesRead = 0L; -final ByteArrayOutputStream buffer; -if (out != null) { -buffer = new ByteArrayOutputStream(); -} else { -buffer = null; -} -byte[] bytesToWriteFirst = leadingNewLineBytes; - -in.mark(Integer.MAX_VALUE); -while (true) { -final int nextByte = in.read(); - -// if we hit end of stream we're done -if (nextByte == -1) { -if (buffer != null) { -buffer.writeTo(out); -buffer.close(); -} -return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); //