gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450941019


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
     }
 
     @Test
-    public void testThreadSafety() throws Throwable {
-        long runtimeMs = 5_000;
-        int numThreads = 10;
-        // Check that multiple threads using RetryWithToleranceOperator 
concurrently
-        // can't corrupt the state of the ProcessingContext
-        AtomicReference<Throwable> failed = new AtomicReference<>(null);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-                    private final AtomicInteger count = new AtomicInteger();
-                    private final AtomicInteger attempt = new AtomicInteger();
-
-                    @Override
-                    public void error(Throwable error) {
-                        if (count.getAndIncrement() > 0) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-                        }
-                        super.error(error);
-                    }
-
-                    @Override
-                    public Future<Void> report() {
-                        if (count.getAndSet(0) > 1) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-                        }
-
-                        return super.report();
-                    }
-
-                    @Override
-                    public void currentContext(Stage stage, Class<?> klass) {
-                        this.attempt.set(0);
-                        super.currentContext(stage, klass);
-                    }
-
-                    @Override
-                    public void attempt(int attempt) {
-                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-                            failed.compareAndSet(null, new AssertionError(
-                                    "Concurrent call to attempt(): Attempts 
should increase monotonically " +
-                                            "within the scope of a given 
currentContext()"));
-                        }
-                        super.attempt(attempt);
-                    }
-                }, new CountDownLatch(1));
-
-        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-        List<? extends Future<?>> futures = IntStream.range(0, 
numThreads).boxed()
-                .map(id ->
-                        pool.submit(() -> {
-                            long t0 = System.currentTimeMillis();
-                            long i = 0;
-                            while (true) {
-                                if (++i % 10000 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-                                    break;
-                                }
-                                if (failed.get() != null) {
-                                    break;
-                                }
-                                try {
-                                    if (id < numThreads / 2) {
-                                        
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-                                                SinkTask.class, 
consumerRecord, new Throwable()).get();
-                                    } else {
-                                        retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-                                                SinkTask.class);
-                                    }
-                                } catch (Exception e) {
-                                    failed.compareAndSet(null, e);
-                                }
-                            }
-                        }))
-                .collect(Collectors.toList());
-        pool.shutdown();
-        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-        futures.forEach(future -> {
-            try {
-                future.get();
-            } catch (Exception e) {
-                failed.compareAndSet(null, e);
-            }

Review Comment:
   I think getting a deterministic reproduction case would involve programmatic 
breakpoints like what I used in the SynchronizationTest, and would involve 
modifying the WorkerSourceTask to be able to intercept between the execute() 
and failed() calls.
   
   I think if I were to copy-paste the call-site from 
WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then 
it does nothing to prove that the bug is absent in the main code. I think that 
is the core flaw in the testThreadSafety test: the class is "thread safe" but 
the call-site of it wasn't.
   
   Also when i say "sometimes", i mean like 50%. I tweaked the constants in 
that test (1000 records, and a batch size of 5) to increase the number of 
opportunities for the bug to surface until it easy to find.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
     }
 
     @Test
-    public void testThreadSafety() throws Throwable {
-        long runtimeMs = 5_000;
-        int numThreads = 10;
-        // Check that multiple threads using RetryWithToleranceOperator 
concurrently
-        // can't corrupt the state of the ProcessingContext
-        AtomicReference<Throwable> failed = new AtomicReference<>(null);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-                    private final AtomicInteger count = new AtomicInteger();
-                    private final AtomicInteger attempt = new AtomicInteger();
-
-                    @Override
-                    public void error(Throwable error) {
-                        if (count.getAndIncrement() > 0) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-                        }
-                        super.error(error);
-                    }
-
-                    @Override
-                    public Future<Void> report() {
-                        if (count.getAndSet(0) > 1) {
-                            failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-                        }
-
-                        return super.report();
-                    }
-
-                    @Override
-                    public void currentContext(Stage stage, Class<?> klass) {
-                        this.attempt.set(0);
-                        super.currentContext(stage, klass);
-                    }
-
-                    @Override
-                    public void attempt(int attempt) {
-                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-                            failed.compareAndSet(null, new AssertionError(
-                                    "Concurrent call to attempt(): Attempts 
should increase monotonically " +
-                                            "within the scope of a given 
currentContext()"));
-                        }
-                        super.attempt(attempt);
-                    }
-                }, new CountDownLatch(1));
-
-        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-        List<? extends Future<?>> futures = IntStream.range(0, 
numThreads).boxed()
-                .map(id ->
-                        pool.submit(() -> {
-                            long t0 = System.currentTimeMillis();
-                            long i = 0;
-                            while (true) {
-                                if (++i % 10000 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-                                    break;
-                                }
-                                if (failed.get() != null) {
-                                    break;
-                                }
-                                try {
-                                    if (id < numThreads / 2) {
-                                        
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-                                                SinkTask.class, 
consumerRecord, new Throwable()).get();
-                                    } else {
-                                        retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-                                                SinkTask.class);
-                                    }
-                                } catch (Exception e) {
-                                    failed.compareAndSet(null, e);
-                                }
-                            }
-                        }))
-                .collect(Collectors.toList());
-        pool.shutdown();
-        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-        futures.forEach(future -> {
-            try {
-                future.get();
-            } catch (Exception e) {
-                failed.compareAndSet(null, e);
-            }

Review Comment:
   I think getting a deterministic reproduction case would involve programmatic 
breakpoints like what I used in the SynchronizationTest, and would involve 
modifying the WorkerSourceTask to be able to intercept between the execute() 
and failed() calls.
   
   I think if I were to copy-paste the call-site from 
WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then 
it does nothing to prove that the bug is absent in the main code. I think that 
is the core flaw in the testThreadSafety test: the class is "thread safe" but 
the call-site of it wasn't.
   
   Also when i say "sometimes", i mean like 50%. I tweaked the constants in 
that test (1000 records, and a batch size of 5) to increase the number of 
opportunities for the bug to surface until it was easy to find.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to