This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 037307e  (chores) camel-kafka: minor cleanups:
037307e is described below

commit 037307ecaa997131a3bbec5c3ae9c00e60dfb7df
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Thu Sep 16 16:42:19 2021 +0200

    (chores) camel-kafka: minor cleanups:
    
    - avoid recreating the result object everytime
    - move the poll log message out of the hot path loop
    - remove duplicated message
    - minor cleanups to avoid running expensive calls when in debug
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 44 +++++++++++-----------
 .../consumer/support/KafkaRecordProcessor.java     | 14 ++++---
 2 files changed, 32 insertions(+), 26 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2acc771..6394770 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -169,14 +170,11 @@ class KafkaFetchRecords implements Runnable {
         long partitionLastOffset = -1;
 
         try {
-            while (isKafkaConsumerRunnable() && isRetrying() && 
!isReconnecting()) {
-                long pollTimeoutMs = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
+            long pollTimeoutMs = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
+            LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, 
topicName, pollTimeoutMs);
 
-                LOG.trace("Polling {} from topic: {} with timeout: {}", 
threadId, topicName, pollTimeoutMs);
+            while (isKafkaConsumerRunnable() && isRetrying() && 
!isReconnecting()) {
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-                if (allRecords.isEmpty()) {
-                    LOG.debug("No records received when polling ... 
(continuing)");
-                }
 
                 partitionLastOffset = processPolledRecords(allRecords);
             }
@@ -310,34 +308,38 @@ class KafkaFetchRecords implements Runnable {
     private long processPolledRecords(ConsumerRecords<Object, Object> 
allRecords) {
         logRecords(allRecords);
 
-        Iterator<TopicPartition> partitionIterator = 
allRecords.partitions().iterator();
+        Set<TopicPartition> partitions = allRecords.partitions();
+        Iterator<TopicPartition> partitionIterator = partitions.iterator();
+
         KafkaRecordProcessor.ProcessResult lastResult = 
KafkaRecordProcessor.ProcessResult.newUnprocessed();
 
         while (partitionIterator.hasNext() && !isStopping()) {
             lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed();
             TopicPartition partition = partitionIterator.next();
 
-            Iterator<ConsumerRecord<Object, Object>> recordIterator = 
allRecords.records(partition).iterator();
+            List<ConsumerRecord<Object, Object>> partitionRecords = 
allRecords.records(partition);
+            Iterator<ConsumerRecord<Object, Object>> recordIterator = 
partitionRecords.iterator();
 
-            logRecordsInPartition(allRecords, partition);
+            logRecordsInPartition(partitionRecords, partition);
 
             KafkaRecordProcessor kafkaRecordProcessor = 
buildKafkaRecordProcessor();
 
-            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() 
&& !isStopping()) {
-                ConsumerRecord<Object, Object> record = recordIterator.next();
+            try {
+                /*
+                 * We lock the processing of the record to avoid raising a 
WakeUpException as a result to a call
+                 * to stop() or shutdown().
+                 */
+                lock.lock();
 
-                try {
-                    /* 
-                     * We lock the processing of the record to avoid raising a 
WakeUpException as a result to a call 
-                     * to stop() or shutdown().  
-                     */
-                    lock.lock();
+                while (!lastResult.isBreakOnErrorHit() && 
recordIterator.hasNext() && !isStopping()) {
+                    ConsumerRecord<Object, Object> record = 
recordIterator.next();
 
                     lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
                             kafkaRecordProcessor, record);
-                } finally {
-                    lock.unlock();
+
                 }
+            } finally {
+                lock.unlock();
             }
         }
 
@@ -351,9 +353,9 @@ class KafkaFetchRecords implements Runnable {
         return lastResult.getPartitionLastOffset();
     }
 
-    private void logRecordsInPartition(ConsumerRecords<Object, Object> 
allRecords, TopicPartition partition) {
+    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> 
partitionRecords, TopicPartition partition) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Records count {} received for partition {}", 
allRecords.records(partition).size(),
+            LOG.debug("Records count {} received for partition {}", 
partitionRecords.size(),
                     partition);
         }
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 0361426..e48f78c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -53,6 +53,8 @@ public class KafkaRecordProcessor {
     private final String threadId;
 
     public static final class ProcessResult {
+        private static final ProcessResult UNPROCESSED_RESULT = new 
ProcessResult(false, START_OFFSET);
+
         private boolean breakOnErrorHit;
         private long partitionLastOffset;
 
@@ -70,7 +72,7 @@ public class KafkaRecordProcessor {
         }
 
         public static ProcessResult newUnprocessed() {
-            return new ProcessResult(false, START_OFFSET);
+            return UNPROCESSED_RESULT;
         }
     }
 
@@ -121,13 +123,15 @@ public class KafkaRecordProcessor {
             boolean recordHasNext, ConsumerRecord<Object, Object> record, 
ProcessResult lastResult,
             ExceptionHandler exceptionHandler) {
 
-        setupExchangeMessage(exchange.getMessage(), record);
+        Message message = exchange.getMessage();
+
+        setupExchangeMessage(message, record);
 
         propagateHeaders(record, exchange);
 
         // if not auto commit then we have additional information on the 
exchange
         if (!autoCommitEnabled) {
-            
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordHasNext);
+            message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordHasNext);
         }
 
         if (configuration.isAllowManualCommit()) {
@@ -136,11 +140,11 @@ public class KafkaRecordProcessor {
             // allow Camel users to access the Kafka consumer API to be able 
to do for example manual commits
             KafkaManualCommit manual = 
manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId,
                     offsetRepository, partition, record.offset(), 
configuration.getCommitTimeoutMs());
-            exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
+            message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
         }
         // if commit management is on user side give additional info for the 
end of poll loop
         if (!autoCommitEnabled || configuration.isAllowManualCommit()) {
-            exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, 
!recordHasNext && !partitionHasNext);
+            message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext 
&& !partitionHasNext);
         }
 
         try {

Reply via email to