This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 037307e (chores) camel-kafka: minor cleanups: 037307e is described below commit 037307ecaa997131a3bbec5c3ae9c00e60dfb7df Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Sep 16 16:42:19 2021 +0200 (chores) camel-kafka: minor cleanups: - avoid recreating the result object everytime - move the poll log message out of the hot path loop - remove duplicated message - minor cleanups to avoid running expensive calls when in debug --- .../camel/component/kafka/KafkaFetchRecords.java | 44 +++++++++++----------- .../consumer/support/KafkaRecordProcessor.java | 14 ++++--- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 2acc771..6394770 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -19,6 +19,7 @@ package org.apache.camel.component.kafka; import java.time.Duration; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -169,14 +170,11 @@ class KafkaFetchRecords implements Runnable { long partitionLastOffset = -1; try { - while (isKafkaConsumerRunnable() && isRetrying() && !isReconnecting()) { - long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); + long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); + LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); - LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); + while (isKafkaConsumerRunnable() && isRetrying() && !isReconnecting()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); - if (allRecords.isEmpty()) { - LOG.debug("No records received when polling ... (continuing)"); - } partitionLastOffset = processPolledRecords(allRecords); } @@ -310,34 +308,38 @@ class KafkaFetchRecords implements Runnable { private long processPolledRecords(ConsumerRecords<Object, Object> allRecords) { logRecords(allRecords); - Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator(); + Set<TopicPartition> partitions = allRecords.partitions(); + Iterator<TopicPartition> partitionIterator = partitions.iterator(); + KafkaRecordProcessor.ProcessResult lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed(); while (partitionIterator.hasNext() && !isStopping()) { lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed(); TopicPartition partition = partitionIterator.next(); - Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); + List<ConsumerRecord<Object, Object>> partitionRecords = allRecords.records(partition); + Iterator<ConsumerRecord<Object, Object>> recordIterator = partitionRecords.iterator(); - logRecordsInPartition(allRecords, partition); + logRecordsInPartition(partitionRecords, partition); KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor(); - while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { - ConsumerRecord<Object, Object> record = recordIterator.next(); + try { + /* + * We lock the processing of the record to avoid raising a WakeUpException as a result to a call + * to stop() or shutdown(). + */ + lock.lock(); - try { - /* - * We lock the processing of the record to avoid raising a WakeUpException as a result to a call - * to stop() or shutdown(). - */ - lock.lock(); + while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { + ConsumerRecord<Object, Object> record = recordIterator.next(); lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, kafkaRecordProcessor, record); - } finally { - lock.unlock(); + } + } finally { + lock.unlock(); } } @@ -351,9 +353,9 @@ class KafkaFetchRecords implements Runnable { return lastResult.getPartitionLastOffset(); } - private void logRecordsInPartition(ConsumerRecords<Object, Object> allRecords, TopicPartition partition) { + private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) { if (LOG.isDebugEnabled()) { - LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(), + LOG.debug("Records count {} received for partition {}", partitionRecords.size(), partition); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index 0361426..e48f78c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -53,6 +53,8 @@ public class KafkaRecordProcessor { private final String threadId; public static final class ProcessResult { + private static final ProcessResult UNPROCESSED_RESULT = new ProcessResult(false, START_OFFSET); + private boolean breakOnErrorHit; private long partitionLastOffset; @@ -70,7 +72,7 @@ public class KafkaRecordProcessor { } public static ProcessResult newUnprocessed() { - return new ProcessResult(false, START_OFFSET); + return UNPROCESSED_RESULT; } } @@ -121,13 +123,15 @@ public class KafkaRecordProcessor { boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessResult lastResult, ExceptionHandler exceptionHandler) { - setupExchangeMessage(exchange.getMessage(), record); + Message message = exchange.getMessage(); + + setupExchangeMessage(message, record); propagateHeaders(record, exchange); // if not auto commit then we have additional information on the exchange if (!autoCommitEnabled) { - exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordHasNext); + message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordHasNext); } if (configuration.isAllowManualCommit()) { @@ -136,11 +140,11 @@ public class KafkaRecordProcessor { // allow Camel users to access the Kafka consumer API to be able to do for example manual commits KafkaManualCommit manual = manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId, offsetRepository, partition, record.offset(), configuration.getCommitTimeoutMs()); - exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); + message.setHeader(KafkaConstants.MANUAL_COMMIT, manual); } // if commit management is on user side give additional info for the end of poll loop if (!autoCommitEnabled || configuration.isAllowManualCommit()) { - exchange.getIn().setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext); + message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext); } try {