dmsolr commented on a change in pull request #5718:
URL: https://github.com/apache/skywalking/pull/5718#discussion_r510719752



##########
File path: docs/en/setup/backend/configuration-vocabulary.md
##########
@@ -198,8 +198,11 @@ core|default|role|Option values, 
`Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | isSharding | it was true when OAP Server in cluster. | 
SW_KAFKA_FETCHER_IS_SHARDING | false |
 | - | - | createTopicIfNotExist | If true, create the Kafka topic when it does 
not exist. | - | true |
 | - | - | partitions | The number of partitions for the topic being created. | 
SW_KAFKA_FETCHER_PARTITIONS | 3 |
-| - | - | enableMeterSystem | To enable to fetch and handle [Meter 
System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false
+| - | - | enableMeterSystem | To enable to fetch and handle [Meter 
System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false |
 | - | - | replicationFactor | The replication factor for each partition in the 
topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 |
+| - | - | enableKafkaMessageAutoCommit | If true the consumer's offset will be 
periodically committed in the background. | SW_ENABLE_KAFKA_MESSAGE_AUTO_COMMIT 
| false |

Review comment:
       I suggest that is directly using `enable.auto.commit`.
   That means we don't need to introduce a new configuration.

##########
File path: 
oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
##########
@@ -132,16 +157,21 @@ public void start() {
     @Override
     public void run() {
         while (true) {
-            ConsumerRecords<String, Bytes> consumerRecords = 
consumer.poll(Duration.ofMillis(500L));
-            if (!consumerRecords.isEmpty()) {
-                Iterator<ConsumerRecord<String, Bytes>> iterator = 
consumerRecords.iterator();
-                while (iterator.hasNext()) {
-                    ConsumerRecord<String, Bytes> record = iterator.next();
-                    handlerMap.get(record.topic()).handle(record);
+            try {
+                ConsumerRecords<String, Bytes> consumerRecords = 
consumer.poll(Duration.ofMillis(500L));
+                if (!consumerRecords.isEmpty()) {
+                    Iterator<ConsumerRecord<String, Bytes>> iterator = 
consumerRecords.iterator();
+                    while (iterator.hasNext()) {
+                        ConsumerRecord<String, Bytes> record = iterator.next();
+                        executor.submit(() -> 
handlerMap.get(record.topic()).handle(record));
+                    }
+                    if (!enableKafkaMessageAutoCommit) {

Review comment:
       Change the condition to no `enable.auto.commit`. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to