C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1450933080
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -392,90 +397,29 @@ public void testSetConfigs() { } @Test - public void testThreadSafety() throws Throwable { - long runtimeMs = 5_000; - int numThreads = 10; - // Check that multiple threads using RetryWithToleranceOperator concurrently - // can't corrupt the state of the ProcessingContext - AtomicReference<Throwable> failed = new AtomicReference<>(null); - RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, - ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicInteger attempt = new AtomicInteger(); - - @Override - public void error(Throwable error) { - if (count.getAndIncrement() > 0) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error()")); - } - super.error(error); - } - - @Override - public Future<Void> report() { - if (count.getAndSet(0) > 1) { - failed.compareAndSet(null, new AssertionError("Concurrent call to error() in report()")); - } - - return super.report(); - } - - @Override - public void currentContext(Stage stage, Class<?> klass) { - this.attempt.set(0); - super.currentContext(stage, klass); - } - - @Override - public void attempt(int attempt) { - if (!this.attempt.compareAndSet(attempt - 1, attempt)) { - failed.compareAndSet(null, new AssertionError( - "Concurrent call to attempt(): Attempts should increase monotonically " + - "within the scope of a given currentContext()")); - } - super.attempt(attempt); - } - }, new CountDownLatch(1)); - - ExecutorService pool = Executors.newFixedThreadPool(numThreads); - List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed() - .map(id -> - pool.submit(() -> { - long t0 = System.currentTimeMillis(); - long i = 0; - while (true) { - if (++i % 10000 == 0 && System.currentTimeMillis() > t0 + runtimeMs) { - break; - } - if (failed.get() != null) { - break; - } - try { - if (id < numThreads / 2) { - retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, new Throwable()).get(); - } else { - retryWithToleranceOperator.execute(() -> null, Stage.TRANSFORMATION, - SinkTask.class); - } - } catch (Exception e) { - failed.compareAndSet(null, e); - } - } - })) - .collect(Collectors.toList()); - pool.shutdown(); - pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS); - futures.forEach(future -> { - try { - future.get(); - } catch (Exception e) { - failed.compareAndSet(null, e); - } Review Comment: There's no unit testing we can add that would deterministically fail if the bug resurfaces? Given how notoriously flaky our integration tests are, I'm a little afraid to rely on something that only fails some of the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org