lbradstreet commented on a change in pull request #9147:
URL: https://github.com/apache/kafka/pull/9147#discussion_r467590975



##########
File path: 
tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
##########
@@ -345,39 +357,55 @@ public void 
onPartitionsAssigned(Collection<TopicPartition> partitions) {
                 ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(200));
                 if (records.count() > 0) {
                     try {
-                        producer.beginTransaction();
-
-                        for (ConsumerRecord<String, String> record : records) {
-                            
producer.send(producerRecordFromConsumerRecord(outputTopic, record));
-                        }
-
-                        long messagesSentWithinCurrentTxn = records.count();
-
-                        if (useGroupMetadata) {
-                            
producer.sendOffsetsToTransaction(consumerPositions(consumer), 
consumer.groupMetadata());
-                        } else {
-                            
producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
-                        }
-
-                        if (enableRandomAborts && random.nextInt() % 3 == 0) {
-                            throw new KafkaException("Aborting transaction");
-                        } else {
-                            producer.commitTransaction();
-                            
remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn);
-                            
numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn);
-                            
totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn);
+                        try {
+                            producer.get().beginTransaction();
+
+                            for (ConsumerRecord<String, String> record : 
records) {
+                                
producer.get().send(producerRecordFromConsumerRecord(outputTopic, record));
+                            }
+
+                            long messagesSentWithinCurrentTxn = 
records.count();
+
+                            if (useGroupMetadata) {
+                                
producer.get().sendOffsetsToTransaction(consumerPositions(consumer),
+                                        consumer.groupMetadata());
+                            } else {
+                                
producer.get().sendOffsetsToTransaction(consumerPositions(consumer),
+                                        consumerGroup);
+                            }
+
+                            if (enableRandomAborts && random.nextInt() % 3 == 
0) {
+                                throw new KafkaException("Aborting 
transaction");
+                            } else {
+                                producer.get().commitTransaction();
+                                
remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn);
+                                
numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn);
+                                
totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn);
+                            }
+                        } catch (ProducerFencedException | 
OutOfOrderSequenceException e) {
+                            // handle these exception in the outer exception 
handling
+                            throw e;
+                        } catch (KafkaException e) {
+                            // this may throw a ProducerFencedException on 
recovery
+                            // this will handled in the outer catch if 
necessary
+                            
System.out.println(handledExceptionJson(totalMessageProcessed.get(),
+                                    
numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), 
transactionalId, e));
+                            producer.get().abortTransaction();
+                            resetToLastCommittedPositions(consumer);
                         }
                     } catch (ProducerFencedException | 
OutOfOrderSequenceException e) {
-                        // We cannot recover from these errors, so just 
rethrow them and let the process fail
-                        throw e;
-                    } catch (KafkaException e) {
-                        producer.abortTransaction();
+                        // These failures are not recoverable with the same 
producer

Review comment:
       I think this is a reasonable way to deal with these issues as the main 
thing we are trying to test here is that we produce an exact copy from the 
input topic to output topic, however I do have questions about what kind of 
exceptions we should allow for. Originally my change allowed for supervision of 
ProducerFencedException(s) but not OutOfOrderSequenceException(s). If we do not 
expect those in any cases where we are messing with cluster behavior (broker 
bounces, network partitioning, etc) then I can take the handling out for the 
OoOS case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to