gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450908695


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
     }
 
     @Test
-    public void testThreadSafety() throws Throwable {
-        long runtimeMs = 5_000;
-        int numThreads = 10;
-        // Check that multiple threads using RetryWithToleranceOperator 
concurrently
-        // can't corrupt the state of the ProcessingContext
-        AtomicReference<Throwable> failed = new AtomicReference<>(null);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-                    private final AtomicInteger count = new AtomicInteger();
-                    private final AtomicInteger attempt = new AtomicInteger();
-
-                    @Override
-                    public void error(Throwable error) {
-                        if (count.getAndIncrement() > 0) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-                        }
-                        super.error(error);
-                    }
-
-                    @Override
-                    public Future<Void> report() {
-                        if (count.getAndSet(0) > 1) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-                        }
-
-                        return super.report();
-                    }
-
-                    @Override
-                    public void currentContext(Stage stage, Class<?> klass) {
-                        this.attempt.set(0);
-                        super.currentContext(stage, klass);
-                    }
-
-                    @Override
-                    public void attempt(int attempt) {
-                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-                            failed.compareAndSet(null, new AssertionError(
-                                    "Concurrent call to attempt(): Attempts 
should increase monotonically " +
-                                            "within the scope of a given 
currentContext()"));
-                        }
-                        super.attempt(attempt);
-                    }
-                }, new CountDownLatch(1));
-
-        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-        List<? extends Future<?>> futures = IntStream.range(0, 
numThreads).boxed()
-                .map(id ->
-                        pool.submit(() -> {
-                            long t0 = System.currentTimeMillis();
-                            long i = 0;
-                            while (true) {
-                                if (++i % 10000 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-                                    break;
-                                }
-                                if (failed.get() != null) {
-                                    break;
-                                }
-                                try {
-                                    if (id < numThreads / 2) {
-                                        
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-                                                SinkTask.class, 
consumerRecord, new Throwable()).get();
-                                    } else {
-                                        retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-                                                SinkTask.class);
-                                    }
-                                } catch (Exception e) {
-                                    failed.compareAndSet(null, e);
-                                }
-                            }
-                        }))
-                .collect(Collectors.toList());
-        pool.shutdown();
-        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-        futures.forEach(future -> {
-            try {
-                future.get();
-            } catch (Exception e) {
-                failed.compareAndSet(null, e);
-            }

Review Comment:
   The replacement is the augmented ErrorHandlingTest#testErrantRecordReporter. 
That test (sometimes) fails when the race condition is present, and passes if 
the race condition is avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to