jolshan commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1607347641


##########
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##########
@@ -0,0 +1,153 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+/**
+ * This class demonstrates a transactional Kafka client application that 
consumes messages from an input topic,
+ * processes them to generate word count statistics, and produces the results 
to an output topic.
+ * It utilizes Kafka's transactional capabilities to ensure exactly-once 
processing semantics.
+ *
+ * The application continuously polls for records from the input topic, 
processes them, and commits the offsets
+ * in a transactional manner. In case of exceptions or errors, it handles them 
appropriately, either aborting the
+ * transaction and resetting to the last committed positions, or restarting 
the application.
+ *
+ */
+public class TransactionalClientDemo {
+
+    private static final String CONSUMER_GROUP_ID = "my-group-id";
+    private static final String OUTPUT_TOPIC = "output";
+    private static final String INPUT_TOPIC = "input";
+    private static KafkaConsumer<String, String> consumer;
+    private static KafkaProducer<String, String> producer;
+
+    public static void main(String[] args) {
+        initializeApplication();
+
+        boolean isRunning = true;
+        // Continuously poll for records
+        while(isRunning) {
+            try {
+                try {
+                    // Poll records from Kafka for a timeout of 60 seconds
+                    ConsumerRecords<String, String> records = 
consumer.poll(ofSeconds(60));
+
+                    // Process records to generate word count map
+                    Map<String, Integer> wordCountMap = new HashMap<>();
+
+                    for (ConsumerRecord<String, String> record : records) {
+                        String[] words = record.value().split(" ");
+                        for (String word : words) {
+                            wordCountMap.merge(word, 1, Integer::sum);
+                        }
+                    }
+
+                    // Begin transaction
+                    producer.beginTransaction();
+
+                    // Produce word count results to output topic
+                    wordCountMap.forEach((key, value) ->
+                            producer.send(new ProducerRecord<>(OUTPUT_TOPIC, 
key, value.toString())));
+
+                    // Determine offsets to commit
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
new HashMap<>();
+                    for (TopicPartition partition : records.partitions()) {
+                        List<ConsumerRecord<String, String>> 
partitionedRecords = records.records(partition);
+                        long offset = 
partitionedRecords.get(partitionedRecords.size() - 1).offset();
+                        offsetsToCommit.put(partition, new 
OffsetAndMetadata(offset + 1));
+                    }
+
+                    // Send offsets to transaction for atomic commit
+                    producer.sendOffsetsToTransaction(offsetsToCommit, 
CONSUMER_GROUP_ID);
+
+                    // Commit transaction
+                    producer.commitTransaction();
+                } catch (AbortableTransactionException e) {
+                    // Abortable Exception: Handle Kafka exception by aborting 
transaction. AbortTransaction path never throws abortable exception.

Review Comment:
   More of a implementation discussion, but are we saying that 
producer.abortTransaction() should never throw such an exception? Or that we 
don't ever try to catch such an exception from abortTransaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to