mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r443067211



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -267,7 +283,17 @@ public void close() {
 
     private void checkForException() {
         if (sendException != null) {
-            throw sendException;
+            if (sendException.getCause() instanceof KafkaException
+                && sendException.getCause().getMessage().equals("Failing batch 
since transaction was aborted")) {

Review comment:
       In the current code, we might still need to close tasks, right? If a TX 
is aborted, we need to "reset" all active tasks accordingly and this would 
imply, closing and reviving them? And while closing we would call 
`checkForException` and crash without this guard?
   
   What makes we wonder, if we should actually `checkForException` in 
`closeDirty()` above? If a TX is aborted, and we closeDirty and don't call 
`checkForException` it seems we don't need this guard? (In general, this guard 
seems a little bit hacky and it would be great if we could avoid it IMHO.)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -69,7 +68,6 @@
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;

Review comment:
       I looked into the code, and the answer is "no".  It's called within 
`StreamTask` / `StandbyTask` methods, `closeClean`, `closeDirty`, and 
`closeAndRecycleState`. So It seems fine?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, 
MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're 
relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once 
KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted() 
 {

Review comment:
       Good thinking!
   
   We should mention `RecordCollectorImpl` as it's the class the relies on the 
error message.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, 
MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're 
relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once 
KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted() 
 {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");

Review comment:
       Why do we need to set clientId?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during 
shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be 
committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());

Review comment:
       Standbys done affect the TX-producer and thus they can be closed dirty 
without side effect.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during 
shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be 
committed, none of them can be

Review comment:
       typo: `closed`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,7 +542,12 @@ private void close(final boolean clean) {
                     "state manager close",
                     log);
 
-                executeAndMaybeSwallow(clean, recordCollector::close, "record 
collector close", log);
+                executeAndMaybeSwallow(
+                    clean,
+                    clean ? recordCollector::closeClean : 
recordCollector::closeDirty,

Review comment:
       It seems this is the only place we call `closeDirty`, thus, I am 
wondering if it might be better to use a boolean flag ie, 
`RecordCollector#close(boolean)` and just call `() -> recordCollector(clean)` 
here?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, 
MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're 
relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once 
KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted() 
 {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new 
StringSerializer());
+
+        final ProducerRecord<String, String> record = new 
ProducerRecord<>(SINGLE_PARTITION_INPUT_TOPIC, "value");
+
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record);

Review comment:
       Do we need this?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during 
shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be 
committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during 
shutdown", e);
-            firstException.compareAndSet(null, e);
-        }
 
-        for (final Task task : tasksToClose) {
             try {
-                completeTaskCloseClean(task);
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during 
shutdown", e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+
+                // If the commit fails, everyone who participated in it must 
be closed dirty
+                tasksToCloseDirty.addAll(filterActive(tasksToCommit));
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.clear();
+            }
+
+            for (final Task task : tasksToCommit) {

Review comment:
       Similar question as above: should we move this loop into the same 
try-catch as `commitOffsetsOrTransaction` ?This would make it clear that 
`postCommit()` should only be executed if committing was successful (even if 
the current code is correct as `taskToCommit` would empty if an error occurred.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, 
MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're 
relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once 
KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted() 
 {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new 
StringSerializer());

Review comment:
       Should we use try-with-resource? Or at least try-final and call 
`producer.close` in the finally block?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -203,6 +208,34 @@ public void 
shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws
         runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, 
MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, eosConfig);
     }
 
+    // This is technically a purely producer-client test, but since we're 
relying on the specific error message being
+    // thrown we should make sure it can't change without us noticing. Once 
KAFKA-10186 is resolved we should fix this
+    @Test
+    public void testExceptionForPendingUnflushedDataWhenTransactionIsAborted() 
 {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        configs.put("client.id", "client-1");
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txnId");
+
+        final KafkaProducer<String, String> producer =
+            new KafkaProducer<>(configs, new StringSerializer(), new 
StringSerializer());
+
+        final ProducerRecord<String, String> record = new 
ProducerRecord<>(SINGLE_PARTITION_INPUT_TOPIC, "value");
+
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record);
+
+        final AtomicReference<Exception> receivedException = new 
AtomicReference<>(null);
+        producer.send(record, (recordMetadata, exception) -> 
receivedException.compareAndSet(null, exception));

Review comment:
       Do we need to make them producer config changes to ensure that the 
producer does not flush the record by chance? (eg, increase `linger.ms` to 
`MAX_VALUE` or similar?)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -679,58 +675,75 @@ private void cleanupTask(final Task task) {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
+        if (clean) {
+            for (final Task task : tasks.values()) {
                 try {
                     task.suspend();
                     if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
                         final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                        tasksToCommit.add(task);
                         if (task.isActive()) {
                             consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                         }
                     }
-                    tasksToClose.add(task);
+                    tasksToCloseClean.add(task);
                 } catch (final TaskMigratedException e) {
                     // just ignore the exception as it doesn't matter during 
shutdown
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 } catch (final RuntimeException e) {
                     firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+                    tasksToCloseDirty.add(task);
                 }
-            } else {
-                closeTaskDirty(task);
             }
-        }
 
-        try {
-            if (clean) {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-                for (final Task task : tasksToCommit) {
-                    try {
-                        task.postCommit();
-                    } catch (final RuntimeException e) {
-                        log.error("Exception caught while post-committing task 
" + task.id(), e);
-                        firstException.compareAndSet(null, e);
-                    }
-                }
+            // If any active tasks have to be clsoed dirty and can't be 
committed, none of them can be
+            if (!filterActive(tasksToCloseDirty).isEmpty()) {
+                tasksToCloseClean.removeAll(filterActive(tasksToCommit));
+                tasksToCommit.removeAll(filterActive(tasksToCommit));
+                tasksToCloseDirty.addAll(activeTaskIterable());
+                consumedOffsetsAndMetadataPerTask.clear();
             }

Review comment:
       Should we do an `else` here and only call `commitOffsetsOrTransaction` 
(and do the `postCommit` loop) in the `else` branch (I understand that the 
current code is correct, as `commitOffsetOrTx()` would be a no-op for this case 
-- just wondering, if the code would be easier to read?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to