gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1450941019
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -392,90 +397,29 @@ public void testSetConfigs() { } @Test - public void testThreadSafety() throws Throwable { - long runtimeMs = 5_000; - int numThreads = 10; - // Check that multiple threads using RetryWithToleranceOperator concurrently - // can't corrupt the state of the ProcessingContext - AtomicReference<Throwable> failed = new AtomicReference<>(null); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, - ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicInteger attempt = new AtomicInteger(); - - @Override - public void error(Throwable error) { - if (count.getAndIncrement() > 0) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error()")); - } - super.error(error); - } - - @Override - public Future<Void> report() { - if (count.getAndSet(0) > 1) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error() in report()")); - } - - return super.report(); - } - - @Override - public void currentContext(Stage stage, Class<?> klass) { - this.attempt.set(0); - super.currentContext(stage, klass); - } - - @Override - public void attempt(int attempt) { - if (!this.attempt.compareAndSet(attempt - 1, attempt)) { - failed.compareAndSet(null, new AssertionError( - "Concurrent call to attempt(): Attempts should increase monotonically " + - "within the scope of a given currentContext()")); - } - super.attempt(attempt); - } - }, new CountDownLatch(1)); - - ExecutorService pool = Executors.newFixedThreadPool(numThreads); - List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed() - .map(id -> - pool.submit(() -> { - long t0 = System.currentTimeMillis(); - long i = 0; - while (true) { - if (++i % 10000 == 0 && System.currentTimeMillis() > t0 + runtimeMs) { - break; - } - if (failed.get() != null) { - break; - } - try { - if (id < numThreads / 2) { - retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, new Throwable()).get(); - } else { - retryWithToleranceOperator.execute(() -> null, Stage.TRANSFORMATION, - SinkTask.class); - } - } catch (Exception e) { - failed.compareAndSet(null, e); - } - } - })) - .collect(Collectors.toList()); - pool.shutdown(); - pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS); - futures.forEach(future -> { - try { - future.get(); - } catch (Exception e) { - failed.compareAndSet(null, e); - } Review Comment: I think getting a deterministic reproduction case would involve programmatic breakpoints like what I used in the SynchronizationTest, and would involve modifying the WorkerSourceTask to be able to intercept between the execute() and failed() calls. I think if I were to copy-paste the call-site from WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then it does nothing to prove that the bug is absent in the main code. I think that is the core flaw in the testThreadSafety test: the class is "thread safe" but the call-site of it wasn't. Also when i say "sometimes", i mean like 50%. I tweaked the constants in that test (1000 records, and a batch size of 5) to increase the number of opportunities for the bug to surface until it easy to find. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -392,90 +397,29 @@ public void testSetConfigs() { } @Test - public void testThreadSafety() throws Throwable { - long runtimeMs = 5_000; - int numThreads = 10; - // Check that multiple threads using RetryWithToleranceOperator concurrently - // can't corrupt the state of the ProcessingContext - AtomicReference<Throwable> failed = new AtomicReference<>(null); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, - ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicInteger attempt = new AtomicInteger(); - - @Override - public void error(Throwable error) { - if (count.getAndIncrement() > 0) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error()")); - } - super.error(error); - } - - @Override - public Future<Void> report() { - if (count.getAndSet(0) > 1) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error() in report()")); - } - - return super.report(); - } - - @Override - public void currentContext(Stage stage, Class<?> klass) { - this.attempt.set(0); - super.currentContext(stage, klass); - } - - @Override - public void attempt(int attempt) { - if (!this.attempt.compareAndSet(attempt - 1, attempt)) { - failed.compareAndSet(null, new AssertionError( - "Concurrent call to attempt(): Attempts should increase monotonically " + - "within the scope of a given currentContext()")); - } - super.attempt(attempt); - } - }, new CountDownLatch(1)); - - ExecutorService pool = Executors.newFixedThreadPool(numThreads); - List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed() - .map(id -> - pool.submit(() -> { - long t0 = System.currentTimeMillis(); - long i = 0; - while (true) { - if (++i % 10000 == 0 && System.currentTimeMillis() > t0 + runtimeMs) { - break; - } - if (failed.get() != null) { - break; - } - try { - if (id < numThreads / 2) { - retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, new Throwable()).get(); - } else { - retryWithToleranceOperator.execute(() -> null, Stage.TRANSFORMATION, - SinkTask.class); - } - } catch (Exception e) { - failed.compareAndSet(null, e); - } - } - })) - .collect(Collectors.toList()); - pool.shutdown(); - pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS); - futures.forEach(future -> { - try { - future.get(); - } catch (Exception e) { - failed.compareAndSet(null, e); - } Review Comment: I think getting a deterministic reproduction case would involve programmatic breakpoints like what I used in the SynchronizationTest, and would involve modifying the WorkerSourceTask to be able to intercept between the execute() and failed() calls. I think if I were to copy-paste the call-site from WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then it does nothing to prove that the bug is absent in the main code. I think that is the core flaw in the testThreadSafety test: the class is "thread safe" but the call-site of it wasn't. Also when i say "sometimes", i mean like 50%. I tweaked the constants in that test (1000 records, and a batch size of 5) to increase the number of opportunities for the bug to surface until it was easy to find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org