lbradstreet commented on a change in pull request #9147: URL: https://github.com/apache/kafka/pull/9147#discussion_r467590975
########## File path: tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java ########## @@ -345,39 +357,55 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200)); if (records.count() > 0) { try { - producer.beginTransaction(); - - for (ConsumerRecord<String, String> record : records) { - producer.send(producerRecordFromConsumerRecord(outputTopic, record)); - } - - long messagesSentWithinCurrentTxn = records.count(); - - if (useGroupMetadata) { - producer.sendOffsetsToTransaction(consumerPositions(consumer), consumer.groupMetadata()); - } else { - producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup); - } - - if (enableRandomAborts && random.nextInt() % 3 == 0) { - throw new KafkaException("Aborting transaction"); - } else { - producer.commitTransaction(); - remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn); - numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn); - totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn); + try { + producer.get().beginTransaction(); + + for (ConsumerRecord<String, String> record : records) { + producer.get().send(producerRecordFromConsumerRecord(outputTopic, record)); + } + + long messagesSentWithinCurrentTxn = records.count(); + + if (useGroupMetadata) { + producer.get().sendOffsetsToTransaction(consumerPositions(consumer), + consumer.groupMetadata()); + } else { + producer.get().sendOffsetsToTransaction(consumerPositions(consumer), + consumerGroup); + } + + if (enableRandomAborts && random.nextInt() % 3 == 0) { + throw new KafkaException("Aborting transaction"); + } else { + producer.get().commitTransaction(); + remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn); + numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn); + totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn); + } + } catch (ProducerFencedException | OutOfOrderSequenceException e) { + // handle these exception in the outer exception handling + throw e; + } catch (KafkaException e) { + // this may throw a ProducerFencedException on recovery + // this will handled in the outer catch if necessary + System.out.println(handledExceptionJson(totalMessageProcessed.get(), + numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, e)); + producer.get().abortTransaction(); + resetToLastCommittedPositions(consumer); } } catch (ProducerFencedException | OutOfOrderSequenceException e) { - // We cannot recover from these errors, so just rethrow them and let the process fail - throw e; - } catch (KafkaException e) { - producer.abortTransaction(); + // These failures are not recoverable with the same producer Review comment: I think this is a reasonable way to deal with these issues as the main thing we are trying to test here is that we produce an exact copy from the input topic to output topic, however I do have questions about what kind of exceptions we should allow for. Originally my change allowed for supervision of ProducerFencedException(s) but not OutOfOrderSequenceException(s). If we do not expect those in any cases where we are messing with cluster behavior (broker bounces, network partitioning, etc) then I can take the handling out for the OoOS case. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org