dmsolr commented on a change in pull request #5718: URL: https://github.com/apache/skywalking/pull/5718#discussion_r510719752
########## File path: docs/en/setup/backend/configuration-vocabulary.md ########## @@ -198,8 +198,11 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | isSharding | it was true when OAP Server in cluster. | SW_KAFKA_FETCHER_IS_SHARDING | false | | - | - | createTopicIfNotExist | If true, create the Kafka topic when it does not exist. | - | true | | - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3 | -| - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false +| - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false | | - | - | replicationFactor | The replication factor for each partition in the topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 | +| - | - | enableKafkaMessageAutoCommit | If true the consumer's offset will be periodically committed in the background. | SW_ENABLE_KAFKA_MESSAGE_AUTO_COMMIT | false | Review comment: I suggest that is directly using `enable.auto.commit`. That means we don't need to introduce a new configuration. ########## File path: oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java ########## @@ -132,16 +157,21 @@ public void start() { @Override public void run() { while (true) { - ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L)); - if (!consumerRecords.isEmpty()) { - Iterator<ConsumerRecord<String, Bytes>> iterator = consumerRecords.iterator(); - while (iterator.hasNext()) { - ConsumerRecord<String, Bytes> record = iterator.next(); - handlerMap.get(record.topic()).handle(record); + try { + ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L)); + if (!consumerRecords.isEmpty()) { + Iterator<ConsumerRecord<String, Bytes>> iterator = consumerRecords.iterator(); + while (iterator.hasNext()) { + ConsumerRecord<String, Bytes> record = iterator.next(); + executor.submit(() -> handlerMap.get(record.topic()).handle(record)); + } + if (!enableKafkaMessageAutoCommit) { Review comment: Change the condition to no `enable.auto.commit`. :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org