This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
The following commit(s) were added to refs/heads/master by this push: new 672af9b Add ability for jvm-dtest to grep instance logs 672af9b is described below commit 672af9b56f1729c6511a2279923eb435df4b7b9b Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri Sep 18 14:53:07 2020 -0700 Add ability for jvm-dtest to grep instance logs Patch by David Capwell; reviewed by Alex Petrov, Yifan Cai for CASSANDRA-16120 --- pom.xml | 2 +- .../cassandra/distributed/api/IInstance.java | 19 + .../cassandra/distributed/api/LineIterator.java | 37 ++ .../cassandra/distributed/api/LogAction.java | 416 +++++++++++++++++++++ .../cassandra/distributed/api/LogResult.java | 25 ++ .../cassandra/distributed/api/LogActionTest.java | 242 ++++++++++++ 6 files changed, 740 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 49113d7..020b404 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.mockito</groupId> + <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <version>3.5.10</version> <scope>test</scope> diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java index 496d33d..4ffc36a 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java @@ -90,4 +90,23 @@ public interface IInstance extends IIsolatedExecutor void forceCompact(String keyspace, String table); + List<Throwable> getUncaughtExceptions(); + + default boolean getLogsEnabled() + { + try + { + logs(); + return true; + } + catch (UnsupportedOperationException e) + { + return false; + } + } + + default LogAction logs() + { + throw new UnsupportedOperationException(); + } } diff --git a/src/main/java/org/apache/cassandra/distributed/api/LineIterator.java b/src/main/java/org/apache/cassandra/distributed/api/LineIterator.java new file mode 100644 index 0000000..8971232 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/LineIterator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.util.Iterator; + +public interface LineIterator extends Iterator<String>, AutoCloseable +{ + /** + * @return current position of the iterator + * @see LogAction#grep(long, String) + * @see LogAction#watchFor(long, String) + */ + long mark(); + + @Override + default void close() + { + + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/LogAction.java b/src/main/java/org/apache/cassandra/distributed/api/LogAction.java new file mode 100644 index 0000000..8f9e693 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/LogAction.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public interface LogAction +{ + /** + * @return current position of the iterator + * @see LogAction#grep(long, String) + * @see LogAction#watchFor(long, String) + */ + long mark(); + + LineIterator match(long startPosition, Predicate<String> fn); + + default LineIterator match(Predicate<String> fn){ + return match(Internal.DEFAULT_START_POSITION, fn); + } + + default LogResult<List<String>> watchFor(long startPosition, Duration timeout, Predicate<String> fn) throws TimeoutException + { + long nowNanos = System.nanoTime(); + long deadlineNanos = nowNanos + timeout.toNanos(); + long previousPosition = startPosition; + List<String> matches = new ArrayList<>(); + while (System.nanoTime() <= deadlineNanos) + { + if (previousPosition == mark()) + { + // still matching... wait a bit + Internal.sleepUninterruptibly(1, TimeUnit.SECONDS); + continue; + } + // position not matching, try to read + try (LineIterator it = match(previousPosition, fn)) + { + while (it.hasNext()) + matches.add(it.next()); + if (!matches.isEmpty()) + return new BasicLogResult<>(it.mark(), matches); + previousPosition = it.mark(); + } + } + throw new TimeoutException(); + } + + default LogResult<List<String>> watchFor(Duration timeout, Predicate<String> fn) throws TimeoutException { + return watchFor(Internal.DEFAULT_START_POSITION, timeout, fn); + } + + default LogResult<List<String>> watchFor(Predicate<String> fn) throws TimeoutException { + return watchFor(Internal.DEFAULT_START_POSITION, Internal.DEFAULT_TIMEOUT, fn); + } + + default LogResult<List<String>> watchFor(long startPosition, Duration timeout, List<Pattern> patterns) throws TimeoutException + { + return watchFor(startPosition, timeout, Internal.regexPredicate(patterns)); + } + + default LogResult<List<String>> watchFor(Duration timeout, List<Pattern> patterns) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, timeout, Internal.regexPredicate(patterns)); + } + + default LogResult<List<String>> watchFor(long startPosition, List<Pattern> patterns) throws TimeoutException + { + return watchFor(startPosition, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(patterns)); + } + + default LogResult<List<String>> watchFor(List<Pattern> patterns) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(patterns)); + } + + default LogResult<List<String>> watchFor(long startPosition, Duration timeout, Pattern pattern) throws TimeoutException + { + return watchFor(startPosition, timeout, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(Duration timeout, Pattern pattern) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, timeout, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(long startPosition, Pattern pattern) throws TimeoutException + { + return watchFor(startPosition, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(Pattern pattern) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(long startPosition, Duration timeout, String pattern) throws TimeoutException + { + return watchFor(startPosition, timeout, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(Duration timeout, String pattern) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, timeout, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(long startPosition, String pattern) throws TimeoutException + { + return watchFor(startPosition, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(String pattern) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> watchFor(long startPosition, Duration timeout, String pattern, String... others) throws TimeoutException + { + + return watchFor(startPosition, timeout, Internal.regexPredicate(pattern, others)); + } + + default LogResult<List<String>> watchFor(Duration timeout, String pattern, String... others) throws TimeoutException + { + + return watchFor(Internal.DEFAULT_START_POSITION, timeout, Internal.regexPredicate(pattern, others)); + } + + default LogResult<List<String>> watchFor(long startPosition, String pattern, String... others) throws TimeoutException + { + + return watchFor(startPosition, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(pattern, others)); + } + + default LogResult<List<String>> watchFor(String pattern, String... others) throws TimeoutException + { + return watchFor(Internal.DEFAULT_START_POSITION, Internal.DEFAULT_TIMEOUT, Internal.regexPredicate(pattern, others)); + } + + default LogResult<List<String>> grep(long startPosition, Predicate<String> fn) + { + try (LineIterator it = match(startPosition, fn)) + { + return new BasicLogResult<>(it.mark(), Internal.collect(it)); + } + } + + default LogResult<List<String>> grep(Predicate<String> fn) + { + return grep(Internal.DEFAULT_START_POSITION, fn); + } + + default LogResult<List<String>> grep(long startPosition, List<Pattern> patterns) + { + return grep(startPosition, Internal.regexPredicate(patterns)); + } + + default LogResult<List<String>> grep(List<Pattern> patterns) + { + return grep(Internal.DEFAULT_START_POSITION, Internal.regexPredicate(patterns)); + } + + default LogResult<List<String>> grep(long startPosition, Pattern pattern) + { + return grep(startPosition, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> grep(Pattern pattern) + { + return grep(Internal.DEFAULT_START_POSITION, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> grep(long startPosition, String pattern) + { + return grep(startPosition, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> grep(String pattern) + { + return grep(Internal.DEFAULT_START_POSITION, Internal.regexPredicate(pattern)); + } + + default LogResult<List<String>> grep(long startPosition, String pattern, String... others) + { + + return grep(startPosition, Internal.regexPredicate(pattern, others)); + } + + default LogResult<List<String>> grep(String pattern, String... others) + { + return grep(Internal.DEFAULT_START_POSITION, Internal.regexPredicate(pattern, others)); + } + + /** + * Attempt to find all errors in the log. This method is different from {@code grep("^ERROR")} as it will also + * attempt to stitch the exception stack trace into a single line. + * + * This method is modeled after python dtests's grep_for_errors and matches the semantics there. + * + * @param startPosition to grep from + * @param exceptionPattern for WARN logs to check if they might have an exception + * @return result of all found exceptions, with stitched errors + */ + default LogResult<List<String>> grepForErrors(long startPosition, Pattern exceptionPattern) + { + Objects.requireNonNull(exceptionPattern, "exceptionPattern"); + + Pattern logLevelPattern = Internal.LOG_LEVEL_PATTERN; + Function<String, String> extractLogLevel = line -> { + Matcher matcher = logLevelPattern.matcher(line); + if (!matcher.find()) + return null; + return matcher.group(1); + }; + List<String> matches = new ArrayList<>(); + try (LineIterator it = match(startPosition, ignore -> true)) + { + StringBuilder lineBuffer = new StringBuilder(); + while (it.hasNext()) + { + String line = it.next(); + String logLevelOrNull = extractLogLevel.apply(line); + if (logLevelOrNull == null) + { + // found a log which isn't the start of a logger line; assume its an exception + if (lineBuffer.length() == 0) + { + // no previous start of line found, so skip this + continue; + } + lineBuffer.append('\n').append(line); + continue; + } + // line is a start of a log, reset state + if (lineBuffer.length() != 0) + { + // buffer has content, add and move on + matches.add(lineBuffer.toString()); + lineBuffer.setLength(0); + } + switch (logLevelOrNull) + { + case "ERROR": + lineBuffer.append(line); + break; + case "WARN": + if (exceptionPattern.matcher(line).find()) + lineBuffer.append(line); + break; + default: + // ignore + } + } + if (lineBuffer.length() != 0) + { + matches.add(lineBuffer.toString()); + } + return new BasicLogResult<>(it.mark(), matches); + } + } + + /** + * Attempt to find all errors in the log. This method is different from {@code grep("^ERROR")} as it will also + * attempt to stitch the exception stack trace into a single line. + * + * This method is modeled after python dtests's grep_for_errors and matches the semantics there. + * + * @param startPosition to grep from + * @return result of all found exceptions, with stitched errors + */ + default LogResult<List<String>> grepForErrors(long startPosition) + { + return grepForErrors(startPosition, Internal.LOG_EXCEPTION_PATTERN); + } + + /** + * Attempt to find all errors in the log. This method is different from {@code grep("^ERROR")} as it will also + * attempt to stitch the exception stack trace into a single line. + * + * This method is modeled after python dtests's grep_for_errors and matches the semantics there. + * + * @return result of all found exceptions, with stitched errors + */ + default LogResult<List<String>> grepForErrors() + { + return grepForErrors(Internal.DEFAULT_START_POSITION, Internal.LOG_EXCEPTION_PATTERN); + } + + class BasicLogResult<T> implements LogResult<T> + { + private final long mark; + private final T result; + + public BasicLogResult(long mark, T result) { + this.mark = mark; + this.result = Objects.requireNonNull(result); + } + + @Override + public long getMark() { + return mark; + } + + @Override + public T getResult() { + return result; + } + + @Override + public String toString() { + return "LogResult{" + + "mark=" + mark + + ", result=" + result + + '}'; + } + } + + class Internal + { + private static final int DEFAULT_START_POSITION = -1; + // why 10m? This is the default for python dtest... + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(10); + private static final Pattern LOG_LEVEL_PATTERN = Pattern.compile("^(INFO|DEBUG|WARN|ERROR)"); + private static final Pattern LOG_EXCEPTION_PATTERN = Pattern.compile("[Ee]xception|AssertionError"); + + private static List<String> collect(LineIterator it) + { + List<String> matches = new ArrayList<>(); + while (it.hasNext()) + matches.add(it.next()); + return matches.isEmpty() ? Collections.emptyList() : matches; + } + + private static Predicate<String> regexPredicate(List<Pattern> patterns) + { + return line -> { + for (Pattern regex : patterns) + { + Matcher m = regex.matcher(line); + if (m.find()) + return true; + } + return false; + }; + } + + private static Predicate<String> regexPredicate(String pattern, String... others) + { + List<Pattern> patterns = new ArrayList<>(others.length + 1); + patterns.add(Pattern.compile(pattern)); + for (String s : others) + patterns.add(Pattern.compile(s)); + return regexPredicate(patterns); + } + + private static Predicate<String> regexPredicate(Pattern pattern) + { + return line -> pattern.matcher(line).find(); + } + + private static Predicate<String> regexPredicate(String pattern) + { + return regexPredicate(Pattern.compile(pattern)); + } + + private static void sleepUninterruptibly(long sleepFor, TimeUnit unit) { + // copied from guava since dtest can't depend on guava + boolean interrupted = false; + + try { + long remainingNanos = unit.toNanos(sleepFor); + long end = System.nanoTime() + remainingNanos; + + while(true) { + try { + TimeUnit.NANOSECONDS.sleep(remainingNanos); + return; + } catch (InterruptedException var12) { + interrupted = true; + remainingNanos = end - System.nanoTime(); + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/LogResult.java b/src/main/java/org/apache/cassandra/distributed/api/LogResult.java new file mode 100644 index 0000000..a8cfcc4 --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/LogResult.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +public interface LogResult<T> { + long getMark(); + + T getResult(); +} diff --git a/src/test/java/org/apache/cassandra/distributed/api/LogActionTest.java b/src/test/java/org/apache/cassandra/distributed/api/LogActionTest.java new file mode 100644 index 0000000..bc2100e --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/api/LogActionTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +class LogActionTest +{ + @Test + public void watchForTimeout() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, "a", "b", "c", "d")); + + Duration duration = Duration.ofSeconds(1); + long startNanos = System.nanoTime(); + Assertions.assertThatThrownBy(() -> logs.watchFor(duration, "^ERROR")) + .isInstanceOf(TimeoutException.class); + Assertions.assertThat(System.nanoTime()) + .as("duration was smaller than expected timeout") + .isGreaterThanOrEqualTo(startNanos + duration.toNanos()); + } + + @Test + public void watchForAndFindFirstAttempt() throws TimeoutException { + LogAction logs = mockLogAction(fn -> lineIterator(fn, "a", "b", "ERROR match", "d")); + + List<String> matches = logs.watchFor("^ERROR").getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("ERROR match")); + } + + @Test + public void watchForAndFindThirdAttempt() throws TimeoutException { + class Counter + { + int count; + } + Counter counter = new Counter(); + LogAction logs = mockLogAction(fn -> { + if (++counter.count == 3) { + return lineIterator(fn, "a", "b", "ERROR match", "d"); + } else { + return lineIterator(fn, "a", "b", "c", "d"); + } + }); + + List<String> matches = logs.watchFor("^ERROR").getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("ERROR match")); + Assertions.assertThat(counter.count).isEqualTo(3); + } + + @Test + public void grepNoMatch() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, "a", "b", "c", "d")); + + List<String> matches = logs.grep("^ERROR").getResult(); + Assertions.assertThat(matches).isEmpty(); + } + + @Test + public void grepMatch() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, "a", "b", "ERROR match", "d")); + + List<String> matches = logs.grep("^ERROR").getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("ERROR match")); + } + + @Test + public void grepForErrorsNoMatch() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, "a", "b", "c", "d")); + + List<String> matches = logs.grepForErrors().getResult(); + Assertions.assertThat(matches).isEmpty(); + } + + @Test + public void grepForErrorsNoStacktrace() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, "INFO a", "INFO b", "ERROR match", "INFO d")); + + List<String> matches = logs.grepForErrors().getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("ERROR match")); + } + + @Test + public void grepForErrorsWithStacktrace() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, + "INFO a", "INFO b", + "ERROR match", + "\t\tat class.method(42)", + "\t\tat class.method(42)")); + + List<String> matches = logs.grepForErrors().getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("ERROR match\n" + + "\t\tat class.method(42)\n" + + "\t\tat class.method(42)")); + } + + @Test + public void grepForErrorsMultilineWarnNotException() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, + "INFO a", "INFO b", + "WARN match", + "\t\tat class.method(42)", + "\t\tat class.method(42)")); + + List<String> matches = logs.grepForErrors().getResult(); + Assertions.assertThat(matches).isEmpty(); + } + + @Test + public void grepForErrorsWARNWithStacktrace() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, + "INFO a", "INFO b", + "WARN match but exception in test", + "\t\tat class.method(42)", + "\t\tat class.method(42)")); + + List<String> matches = logs.grepForErrors().getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("WARN match but exception in test\n" + + "\t\tat class.method(42)\n" + + "\t\tat class.method(42)")); + } + + @Test + public void grepForErrorsWARNWithStacktraceFromAssert() { + LogAction logs = mockLogAction(fn -> lineIterator(fn, + "INFO a", "INFO b", + "WARN match but AssertionError in test", + "\t\tat class.method(42)", + "\t\tat class.method(42)")); + + List<String> matches = logs.grepForErrors().getResult(); + Assertions.assertThat(matches).isEqualTo(Arrays.asList("WARN match but AssertionError in test\n" + + "\t\tat class.method(42)\n" + + "\t\tat class.method(42)")); + } + + private static LogAction mockLogActionAnswer(Answer<?> matchAnswer) { + LogAction logs = Mockito.mock(LogAction.class, Mockito.CALLS_REAL_METHODS); + // mark is only used by matching, which we also mock out, so its ok to be a constant + Mockito.when(logs.mark()).thenReturn(0L); + Mockito.when(logs.match(Mockito.anyLong(), Mockito.any())).thenAnswer(matchAnswer); + return logs; + } + + private static LogAction mockLogAction(MockLogMatch match) { + return mockLogActionAnswer(invoke -> match.answer(invoke.getArgument(0), invoke.getArgument(1))); + } + + private static LogAction mockLogAction(MockLogMatchPredicate match) { + return mockLogAction((MockLogMatch) match); + } + + @FunctionalInterface + private interface MockLogMatch + { + LineIterator answer(long startPosition, Predicate<String> fn) throws Throwable; + } + + @FunctionalInterface + private interface MockLogMatchPredicate extends MockLogMatch + { + LineIterator answer(Predicate<String> fn) throws Throwable; + + @Override + default LineIterator answer(long startPosition, Predicate<String> fn) throws Throwable + { + return answer(fn); + } + } + + private static LineIterator lineIterator(Predicate<String> fn, String... values) + { + return new LineIteratorImpl(Arrays.asList(values).iterator(), fn); + } + + private static final class LineIteratorImpl implements LineIterator + { + private final Iterator<String> it; + private final Predicate<String> fn; + private String next = null; + private long count = 0; + + private LineIteratorImpl(Iterator<String> it, Predicate<String> fn) { + this.it = it; + this.fn = fn; + } + + @Override + public long mark() { + return count; + } + + @Override + public boolean hasNext() { + if (next != null) // only move forward if consumed + return true; + while (it.hasNext()) + { + count++; + String next = it.next(); + if (fn.test(next)) + { + this.next = next; + return true; + } + } + return false; + } + + @Override + public String next() { + String ret = next; + next = null; + return ret; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org