This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0e9830d227b433f8975ce93a4882e7fa04725f02 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Sat Jun 23 22:58:15 2018 +0200 [CAMEL-12688] Improve Scanner performances --- .../apache/camel/builder/ExpressionBuilder.java | 11 +- .../camel/component/dataset/FileDataSet.java | 8 +- .../org/apache/camel/impl/FileStateRepository.java | 15 +- .../java/org/apache/camel/processor/Splitter.java | 15 +- .../idempotent/FileIdempotentRepository.java | 42 +-- .../camel/support/TokenPairExpressionIterator.java | 4 +- .../support/TokenXMLPairExpressionIterator.java | 4 +- .../java/org/apache/camel/util/GroupIterator.java | 13 +- .../org/apache/camel/util/GroupTokenIterator.java | 14 +- .../main/java/org/apache/camel/util/IOHelper.java | 23 ++ .../java/org/apache/camel/util/ObjectHelper.java | 59 ++-- .../main/java/org/apache/camel/util/Scanner.java | 303 +++++++++++++++++++++ .../java/org/apache/camel/util/SkipIterator.java | 14 +- .../apache/camel/util/GroupTokenIteratorTest.java | 11 +- 14 files changed, 390 insertions(+), 146 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java index 3641d0f..11f640b 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java +++ b/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.Set; import java.util.TimeZone; import java.util.concurrent.atomic.AtomicReference; @@ -1558,8 +1558,7 @@ public final class ExpressionBuilder { public Object evaluate(Exchange exchange) { String text = simpleExpression(token).evaluate(exchange, String.class); Object value = expression.evaluate(exchange, Object.class); - Scanner scanner = ObjectHelper.getScanner(exchange, value); - scanner.useDelimiter(text); + Scanner scanner = ObjectHelper.getScanner(exchange, value, text); return scanner; } @@ -1625,18 +1624,16 @@ public final class ExpressionBuilder { */ public static Expression regexTokenizeExpression(final Expression expression, final String regexTokenizer) { - final Pattern pattern = Pattern.compile(regexTokenizer); return new ExpressionAdapter() { public Object evaluate(Exchange exchange) { Object value = expression.evaluate(exchange, Object.class); - Scanner scanner = ObjectHelper.getScanner(exchange, value); - scanner.useDelimiter(pattern); + Scanner scanner = ObjectHelper.getScanner(exchange, value, regexTokenizer); return scanner; } @Override public String toString() { - return "regexTokenize(" + expression + ", " + pattern.pattern() + ")"; + return "regexTokenize(" + expression + ", " + regexTokenizer + ")"; } }; } diff --git a/camel-core/src/main/java/org/apache/camel/component/dataset/FileDataSet.java b/camel-core/src/main/java/org/apache/camel/component/dataset/FileDataSet.java index 24f263d..1551035 100644 --- a/camel-core/src/main/java/org/apache/camel/component/dataset/FileDataSet.java +++ b/camel-core/src/main/java/org/apache/camel/component/dataset/FileDataSet.java @@ -22,7 +22,9 @@ import java.io.FileReader; import java.io.IOException; import java.util.LinkedList; import java.util.List; -import java.util.Scanner; + +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.Scanner; /** * A DataSet that reads payloads from a file that are used to create each message exchange @@ -80,9 +82,7 @@ public class FileDataSet extends ListDataSet { private void readSourceFile() throws IOException { List<Object> bodies = new LinkedList<>(); - try (BufferedReader br = new BufferedReader(new FileReader(sourceFile))) { - Scanner scanner = new Scanner(br); - scanner.useDelimiter(delimiter); + try (Scanner scanner = new Scanner(sourceFile, null, delimiter)) { while (scanner.hasNext()) { String nextPayload = scanner.next(); if ((nextPayload != null) && (nextPayload.length() > 0)) { diff --git a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java index efbd068..bac7f95 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java @@ -21,7 +21,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.api.management.ManagedAttribute; @@ -217,12 +217,9 @@ public class FileStateRepository extends ServiceSupport implements StateReposito LOG.trace("Loading to 1st level cache from state filestore: {}", fileStore); cache.clear(); - Scanner scanner = null; - try { - scanner = new Scanner(fileStore); - scanner.useDelimiter(STORE_DELIMITER); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); + try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) { + while (scanner.hasNext()) { + String line = scanner.next(); int separatorIndex = line.indexOf(KEY_VALUE_DELIMITER); String key = line.substring(0, separatorIndex); String value = line.substring(separatorIndex + KEY_VALUE_DELIMITER.length()); @@ -230,10 +227,6 @@ public class FileStateRepository extends ServiceSupport implements StateReposito } } catch (IOException e) { throw ObjectHelper.wrapRuntimeCamelException(e); - } finally { - if (scanner != null) { - scanner.close(); - } } LOG.debug("Loaded {} to the 1st level cache from state filestore: {}", cache.size(), fileStore); diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index 2a50bc6..6eb6833 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -23,7 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; @@ -230,18 +230,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac @Override public void close() throws IOException { - if (value instanceof Scanner) { - // special for Scanner which implement the Closeable since JDK7 - Scanner scanner = (Scanner) value; - scanner.close(); - IOException ioException = scanner.ioException(); - if (ioException != null) { - throw ioException; - } - } else if (value instanceof Closeable) { - // we should throw out the exception here - IOHelper.closeWithException((Closeable) value); - } + IOHelper.closeIterator(value); } } diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java index e441977..eef5c6d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.api.management.ManagedAttribute; @@ -289,22 +289,15 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote return false; } - Scanner scanner = null; - try { - scanner = new Scanner(fileStore); - scanner.useDelimiter(STORE_DELIMITER); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); + try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) { + while (scanner.hasNext()) { + String line = scanner.next(); if (line.equals(key)) { return true; } } } catch (IOException e) { throw ObjectHelper.wrapRuntimeCamelException(e); - } finally { - if (scanner != null) { - scanner.close(); - } } return false; } @@ -352,10 +345,9 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote boolean found = false; Scanner scanner = null; try { - scanner = new Scanner(fileStore); - scanner.useDelimiter(STORE_DELIMITER); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); + scanner = new Scanner(fileStore, null, STORE_DELIMITER); + while (scanner.hasNext()) { + String line = scanner.next(); if (key.equals(line)) { found = true; } else { @@ -416,10 +408,9 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote Scanner scanner = null; int count = 0; try { - scanner = new Scanner(fileStore); - scanner.useDelimiter(STORE_DELIMITER); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); + scanner = new Scanner(fileStore, null, STORE_DELIMITER); + while (scanner.hasNext()) { + String line = scanner.next(); count++; if (count > dropOldestFileStore) { lines.add(line); @@ -485,20 +476,13 @@ public class FileIdempotentRepository extends ServiceSupport implements Idempote LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore); cache.clear(); - Scanner scanner = null; - try { - scanner = new Scanner(fileStore); - scanner.useDelimiter(STORE_DELIMITER); - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); + try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) { + while (scanner.hasNext()) { + String line = scanner.next(); cache.put(line, line); } } catch (IOException e) { throw ObjectHelper.wrapRuntimeCamelException(e); - } finally { - if (scanner != null) { - scanner.close(); - } } LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore); diff --git a/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java b/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java index 39c9a10..0ccfdd3 100644 --- a/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java +++ b/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java @@ -20,7 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; @@ -160,7 +160,7 @@ public class TokenPairExpressionIterator extends ExpressionAdapter { void init() { // use end token as delimiter - this.scanner = new Scanner(in, charset).useDelimiter(scanEndToken); + this.scanner = new Scanner(in, charset, scanEndToken); // this iterator will do look ahead as we may have data // after the last end token, which the scanner would find // so we need to be one step ahead of the scanner diff --git a/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java b/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java index 6e838fb..39664c9 100644 --- a/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java +++ b/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java @@ -20,7 +20,7 @@ import java.io.InputStream; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -122,7 +122,7 @@ public class TokenXMLPairExpressionIterator extends TokenPairExpressionIterator @Override void init() { // use scan end token as delimiter which supports attributes/namespaces - this.scanner = new Scanner(in, charset).useDelimiter(scanEndToken); + this.scanner = new Scanner(in, charset, scanEndToken); // this iterator will do look ahead as we may have data // after the last end token, which the scanner would find // so we need to be one step ahead of the scanner diff --git a/camel-core/src/main/java/org/apache/camel/util/GroupIterator.java b/camel-core/src/main/java/org/apache/camel/util/GroupIterator.java index e5c6ab2..b56c421 100644 --- a/camel-core/src/main/java/org/apache/camel/util/GroupIterator.java +++ b/camel-core/src/main/java/org/apache/camel/util/GroupIterator.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Scanner; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -77,17 +76,7 @@ public final class GroupIterator implements Iterator<Object>, Closeable { @Override public void close() throws IOException { try { - if (it instanceof Scanner) { - // special for Scanner which implement the Closeable since JDK7 - Scanner scanner = (Scanner) it; - scanner.close(); - IOException ioException = scanner.ioException(); - if (ioException != null) { - throw ioException; - } - } else if (it instanceof Closeable) { - IOHelper.closeWithException((Closeable) it); - } + IOHelper.closeIterator(it); } finally { // we are now closed closed = true; diff --git a/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java b/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java index 2f05116..0fa0a5c 100644 --- a/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java +++ b/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java @@ -21,7 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; @@ -102,17 +102,7 @@ public final class GroupTokenIterator implements Iterator<Object>, Closeable { @Override public void close() throws IOException { try { - if (it instanceof Scanner) { - // special for Scanner which implement the Closeable since JDK7 - Scanner scanner = (Scanner) it; - scanner.close(); - IOException ioException = scanner.ioException(); - if (ioException != null) { - throw ioException; - } - } else if (it instanceof Closeable) { - IOHelper.closeWithException((Closeable) it); - } + IOHelper.closeIterator(it); } finally { // close the buffer as well bos.close(); diff --git a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java index 602c07b..e74cf44 100644 --- a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java @@ -417,6 +417,29 @@ public final class IOHelper { } } + public static void closeIterator(Object it) throws IOException { + if (it instanceof java.util.Scanner) { + // special for Scanner which implement the Closeable since JDK7 + java.util.Scanner scanner = (java.util.Scanner) it; + scanner.close(); + IOException ioException = scanner.ioException(); + if (ioException != null) { + throw ioException; + } + } else if (it instanceof Scanner) { + // special for Scanner which implement the Closeable since JDK7 + Scanner scanner = (Scanner) it; + scanner.close(); + IOException ioException = scanner.ioException(); + if (ioException != null) { + throw ioException; + } + + } else if (it instanceof Closeable) { + IOHelper.closeWithException((Closeable) it); + } + } + public static void validateCharset(String charset) throws UnsupportedCharsetException { if (charset != null) { if (Charset.isSupported(charset)) { diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java index ab98900..949a253 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java @@ -18,7 +18,6 @@ package org.apache.camel.util; import java.io.Closeable; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.lang.annotation.Annotation; @@ -44,7 +43,6 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.Scanner; import java.util.concurrent.Callable; import java.util.function.Consumer; import java.util.function.Function; @@ -932,9 +930,6 @@ public final class ObjectHelper { // this code is optimized to only use a Scanner if needed, eg there is a delimiter if (delimiter != null && (pattern || s.contains(delimiter))) { - // use a scanner if it contains the delimiter or is a pattern - final Scanner scanner = new Scanner((String)value); - if (DEFAULT_DELIMITER.equals(delimiter)) { // we use the default delimiter which is a comma, then cater for bean expressions with OGNL // which may have balanced parentheses pairs as well. @@ -948,7 +943,8 @@ public final class ObjectHelper { // http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))"; } - scanner.useDelimiter(delimiter); + // use a scanner if it contains the delimiter or is a pattern + final Scanner scanner = new Scanner((String) value, delimiter); return new Iterable<Object>() { @Override @@ -1938,9 +1934,10 @@ public final class ObjectHelper { * * @param exchange the current exchange * @param value the value, typically the message IN body + * @param delimiter the delimiter pattern to use * @return the scanner, is newer <tt>null</tt> */ - public static Scanner getScanner(Exchange exchange, Object value) { + public static Scanner getScanner(Exchange exchange, Object value, String delimiter) { if (value instanceof WrappedFile) { WrappedFile<?> gf = (WrappedFile<?>) value; Object body = gf.getBody(); @@ -1949,41 +1946,33 @@ public final class ObjectHelper { value = body; } else { // generic file is just a wrapper for the real file so call again with the real file - return getScanner(exchange, gf.getFile()); + return getScanner(exchange, gf.getFile(), delimiter); } } - String charset = exchange.getProperty(Exchange.CHARSET_NAME, String.class); - - Scanner scanner = null; + Scanner scanner; if (value instanceof Readable) { - scanner = new Scanner((Readable)value); - } else if (value instanceof InputStream) { - scanner = charset == null ? new Scanner((InputStream)value) : new Scanner((InputStream)value, charset); - } else if (value instanceof File) { - try { - scanner = charset == null ? new Scanner((File)value) : new Scanner((File)value, charset); - } catch (FileNotFoundException e) { - throw new RuntimeCamelException(e); - } + scanner = new Scanner((Readable) value, delimiter); } else if (value instanceof String) { - scanner = new Scanner((String)value); - } else if (value instanceof ReadableByteChannel) { - scanner = charset == null ? new Scanner((ReadableByteChannel)value) : new Scanner((ReadableByteChannel)value, charset); - } - - if (scanner == null) { - // value is not a suitable type, try to convert value to a string - String text = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); - if (text != null) { - scanner = new Scanner(text); + scanner = new Scanner((String) value, delimiter); + } else { + String charset = exchange.getProperty(Exchange.CHARSET_NAME, String.class); + if (value instanceof File) { + try { + scanner = new Scanner((File) value, charset, delimiter); + } catch (IOException e) { + throw new RuntimeCamelException(e); + } + } else if (value instanceof InputStream) { + scanner = new Scanner((InputStream) value, charset, delimiter); + } else if (value instanceof ReadableByteChannel) { + scanner = new Scanner((ReadableByteChannel) value, charset, delimiter); + } else { + // value is not a suitable type, try to convert value to a string + String text = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); + scanner = new Scanner(text, delimiter); } } - - if (scanner == null) { - scanner = new Scanner(""); - } - return scanner; } diff --git a/camel-core/src/main/java/org/apache/camel/util/Scanner.java b/camel-core/src/main/java/org/apache/camel/util/Scanner.java new file mode 100644 index 0000000..a042c57 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/Scanner.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.nio.CharBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.UnsupportedCharsetException; +import java.util.InputMismatchException; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public final class Scanner implements Iterator<String>, Closeable { + + private static final Map<String, Pattern> CACHE = LRUCacheFactory.newLRUCache(7); + + private static final String WHITESPACE_PATTERN = "\\s+"; + + private static final String FIND_ANY_PATTERN = "(?s).*"; + + private static final int BUFFER_SIZE = 1024; + + private Readable source; + private Pattern delimPattern; + private Matcher matcher; + private CharBuffer buf; + private int position; + private boolean inputExhausted = false; + private boolean needInput = false; + private boolean skipped = false; + private int savedPosition = -1; + private boolean closed = false; + private IOException lastIOException; + + public Scanner(InputStream source, String charsetName, String pattern) { + this(new InputStreamReader(Objects.requireNonNull(source, "source"), toDecoder(charsetName)), cachePattern(pattern)); + } + + public Scanner(File source, String charsetName, String pattern) throws FileNotFoundException { + this(new FileInputStream(Objects.requireNonNull(source, "source")).getChannel(), charsetName, pattern); + } + + public Scanner(String source, String pattern) { + this(new StringReader(Objects.requireNonNull(source, "source")), cachePattern(pattern)); + } + + public Scanner(ReadableByteChannel source, String charsetName, String pattern) { + this(Channels.newReader(Objects.requireNonNull(source, "source"), toDecoder(charsetName), -1), cachePattern(pattern)); + } + + public Scanner(Readable source, String pattern) { + this(Objects.requireNonNull(source, "source"), cachePattern(pattern)); + } + + private Scanner(Readable source, Pattern pattern) { + this.source = source; + delimPattern = pattern != null ? pattern : cachePattern(WHITESPACE_PATTERN); + buf = CharBuffer.allocate(BUFFER_SIZE); + buf.limit(0); + matcher = delimPattern.matcher(buf); + matcher.useTransparentBounds(true); + matcher.useAnchoringBounds(false); + } + + private static CharsetDecoder toDecoder(String charsetName) { + try { + Charset cs = charsetName != null ? Charset.forName(charsetName) : Charset.defaultCharset(); + return cs.newDecoder(); + } catch (IllegalCharsetNameException | UnsupportedCharsetException e) { + throw new IllegalArgumentException(e); + } + } + + public boolean hasNext() { + checkClosed(); + saveState(); + while (!inputExhausted) { + if (hasTokenInBuffer()) { + revertState(); + return true; + } + readMore(); + } + boolean result = hasTokenInBuffer(); + revertState(); + return result; + } + + public String next() { + checkClosed(); + while (true) { + String token = getCompleteTokenInBuffer(); + if (token != null) { + skipped = false; + return token; + } + if (needInput) { + readMore(); + } else { + throwFor(); + } + } + } + + private void saveState() { + savedPosition = position; + } + + private void revertState() { + position = savedPosition; + savedPosition = -1; + skipped = false; + } + + private void readMore() { + if (buf.limit() == buf.capacity()) { + expandBuffer(); + } + int p = buf.position(); + buf.position(buf.limit()); + buf.limit(buf.capacity()); + int n; + try { + n = source.read(buf); + } catch (IOException ioe) { + lastIOException = ioe; + n = -1; + } + if (n == -1) { + inputExhausted = true; + needInput = false; + } else if (n > 0) { + needInput = false; + } + buf.limit(buf.position()); + buf.position(p); + } + + private void expandBuffer() { + int offset = savedPosition == -1 ? position : savedPosition; + buf.position(offset); + if (offset > 0) { + buf.compact(); + translateSavedIndexes(offset); + position -= offset; + buf.flip(); + } else { + int newSize = buf.capacity() * 2; + CharBuffer newBuf = CharBuffer.allocate(newSize); + newBuf.put(buf); + newBuf.flip(); + translateSavedIndexes(offset); + position -= offset; + buf = newBuf; + matcher.reset(buf); + } + } + + private void translateSavedIndexes(int offset) { + if (savedPosition != -1) { + savedPosition -= offset; + } + } + + private void throwFor() { + skipped = false; + if ((inputExhausted) && (position == buf.limit())) { + throw new NoSuchElementException(); + } else { + throw new InputMismatchException(); + } + } + + private boolean hasTokenInBuffer() { + matcher.usePattern(delimPattern); + matcher.region(position, buf.limit()); + if (matcher.lookingAt()) { + position = matcher.end(); + } + return position != buf.limit(); + } + + private String getCompleteTokenInBuffer() { + matcher.usePattern(delimPattern); + if (!skipped) { + matcher.region(position, buf.limit()); + if (matcher.lookingAt()) { + if (matcher.hitEnd() && !inputExhausted) { + needInput = true; + return null; + } + skipped = true; + position = matcher.end(); + } + } + if (position == buf.limit()) { + if (inputExhausted) { + return null; + } + needInput = true; + return null; + } + matcher.region(position, buf.limit()); + boolean foundNextDelim = matcher.find(); + if (foundNextDelim && (matcher.end() == position)) { + foundNextDelim = matcher.find(); + } + if (foundNextDelim) { + if (matcher.requireEnd() && !inputExhausted) { + needInput = true; + return null; + } + int tokenEnd = matcher.start(); + matcher.usePattern(cachePattern(FIND_ANY_PATTERN)); + matcher.region(position, tokenEnd); + if (matcher.matches()) { + String s = matcher.group(); + position = matcher.end(); + return s; + } else { + return null; + } + } + if (inputExhausted) { + matcher.usePattern(cachePattern(FIND_ANY_PATTERN)); + matcher.region(position, buf.limit()); + if (matcher.matches()) { + String s = matcher.group(); + position = matcher.end(); + return s; + } + return null; + } + needInput = true; + return null; + } + + private void checkClosed() { + if (closed) { + throw new IllegalStateException(); + } + } + + public void close() { + if (!closed) { + if (source instanceof Closeable) { + try { + ((Closeable) source).close(); + } catch (IOException e) { + lastIOException = e; + } + } + closed = true; + } + } + + public IOException ioException() { + return lastIOException; + } + + private static Pattern cachePattern(String pattern) { + if (pattern == null) { + return null; + } + return CACHE.computeIfAbsent(pattern, new Function<String, Pattern>() { + @Override + public Pattern apply(String s) { + return Pattern.compile(s); + } + }); + } + +} diff --git a/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java b/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java index ee43064..9c9b622 100644 --- a/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java +++ b/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java @@ -19,7 +19,7 @@ package org.apache.camel.util; import java.io.Closeable; import java.io.IOException; import java.util.Iterator; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; @@ -58,17 +58,7 @@ public final class SkipIterator implements Iterator<Object>, Closeable { @Override public void close() throws IOException { try { - if (it instanceof Scanner) { - // special for Scanner which implement the Closeable since JDK7 - Scanner scanner = (Scanner) it; - scanner.close(); - IOException ioException = scanner.ioException(); - if (ioException != null) { - throw ioException; - } - } else if (it instanceof Closeable) { - IOHelper.closeWithException((Closeable) it); - } + IOHelper.closeIterator(it); } finally { // we are now closed closed = true; diff --git a/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java b/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java index 84eb610..dfff48c 100644 --- a/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java +++ b/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java @@ -19,7 +19,7 @@ package org.apache.camel.util; import java.io.ByteArrayInputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Scanner; +import org.apache.camel.util.Scanner; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -52,8 +52,7 @@ public class GroupTokenIteratorTest extends TestSupport { public void testGroupIterator() throws Exception { String s = "ABC\nDEF\nGHI\nJKL\nMNO\nPQR\nSTU\nVW"; - Scanner scanner = new Scanner(s); - scanner.useDelimiter("\n"); + Scanner scanner = new Scanner(s, "\n"); GroupTokenIterator gi = new GroupTokenIterator(exchange, scanner, "\n", 3, false); @@ -68,8 +67,7 @@ public class GroupTokenIteratorTest extends TestSupport { public void testGroupIteratorSkipFirst() throws Exception { String s = "##comment\nABC\nDEF\nGHI\nJKL\nMNO\nPQR\nSTU\nVW"; - Scanner scanner = new Scanner(s); - scanner.useDelimiter("\n"); + Scanner scanner = new Scanner(s, "\n"); GroupTokenIterator gi = new GroupTokenIterator(exchange, scanner, "\n", 3, true); @@ -92,8 +90,7 @@ public class GroupTokenIteratorTest extends TestSupport { ByteArrayInputStream in = new ByteArrayInputStream(buf); - Scanner scanner = new Scanner(in, StandardCharsets.UTF_8.displayName()); - scanner.useDelimiter("\n"); + Scanner scanner = new Scanner(in, StandardCharsets.UTF_8.displayName(), "\n"); exchange.setProperty(Exchange.CHARSET_NAME, StandardCharsets.UTF_8.displayName()); GroupTokenIterator gi = new GroupTokenIterator(exchange, scanner, "\n", 1, false);