C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450933080


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
     }
 
     @Test
-    public void testThreadSafety() throws Throwable {
-        long runtimeMs = 5_000;
-        int numThreads = 10;
-        // Check that multiple threads using RetryWithToleranceOperator 
concurrently
-        // can't corrupt the state of the ProcessingContext
-        AtomicReference<Throwable> failed = new AtomicReference<>(null);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-                    private final AtomicInteger count = new AtomicInteger();
-                    private final AtomicInteger attempt = new AtomicInteger();
-
-                    @Override
-                    public void error(Throwable error) {
-                        if (count.getAndIncrement() > 0) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-                        }
-                        super.error(error);
-                    }
-
-                    @Override
-                    public Future<Void> report() {
-                        if (count.getAndSet(0) > 1) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-                        }
-
-                        return super.report();
-                    }
-
-                    @Override
-                    public void currentContext(Stage stage, Class<?> klass) {
-                        this.attempt.set(0);
-                        super.currentContext(stage, klass);
-                    }
-
-                    @Override
-                    public void attempt(int attempt) {
-                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-                            failed.compareAndSet(null, new AssertionError(
-                                    "Concurrent call to attempt(): Attempts 
should increase monotonically " +
-                                            "within the scope of a given 
currentContext()"));
-                        }
-                        super.attempt(attempt);
-                    }
-                }, new CountDownLatch(1));
-
-        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-        List<? extends Future<?>> futures = IntStream.range(0, 
numThreads).boxed()
-                .map(id ->
-                        pool.submit(() -> {
-                            long t0 = System.currentTimeMillis();
-                            long i = 0;
-                            while (true) {
-                                if (++i % 10000 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-                                    break;
-                                }
-                                if (failed.get() != null) {
-                                    break;
-                                }
-                                try {
-                                    if (id < numThreads / 2) {
-                                        
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-                                                SinkTask.class, 
consumerRecord, new Throwable()).get();
-                                    } else {
-                                        retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-                                                SinkTask.class);
-                                    }
-                                } catch (Exception e) {
-                                    failed.compareAndSet(null, e);
-                                }
-                            }
-                        }))
-                .collect(Collectors.toList());
-        pool.shutdown();
-        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-        futures.forEach(future -> {
-            try {
-                future.get();
-            } catch (Exception e) {
-                failed.compareAndSet(null, e);
-            }

Review Comment:
   There's no unit testing we can add that would deterministically fail if the 
bug resurfaces? Given how notoriously flaky our integration tests are, I'm a 
little afraid to rely on something that only fails some of the time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to