dmsolr commented on a change in pull request #5718:
URL: https://github.com/apache/skywalking/pull/5718#discussion_r510719752
##########
File path: docs/en/setup/backend/configuration-vocabulary.md
##########
@@ -198,8 +198,11 @@ core|default|role|Option values,
`Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | isSharding | it was true when OAP Server in cluster. |
SW_KAFKA_FETCHER_IS_SHARDING | false |
| - | - | createTopicIfNotExist | If true, create the Kafka topic when it does
not exist. | - | true |
| - | - | partitions | The number of partitions for the topic being created. |
SW_KAFKA_FETCHER_PARTITIONS | 3 |
-| - | - | enableMeterSystem | To enable to fetch and handle [Meter
System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false
+| - | - | enableMeterSystem | To enable to fetch and handle [Meter
System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false |
| - | - | replicationFactor | The replication factor for each partition in the
topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 |
+| - | - | enableKafkaMessageAutoCommit | If true the consumer's offset will be
periodically committed in the background. | SW_ENABLE_KAFKA_MESSAGE_AUTO_COMMIT
| false |
Review comment:
I suggest that is directly using `enable.auto.commit`.
That means we don't need to introduce a new configuration.
##########
File path:
oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.java
##########
@@ -132,16 +157,21 @@ public void start() {
@Override
public void run() {
while (true) {
- ConsumerRecords<String, Bytes> consumerRecords =
consumer.poll(Duration.ofMillis(500L));
- if (!consumerRecords.isEmpty()) {
- Iterator<ConsumerRecord<String, Bytes>> iterator =
consumerRecords.iterator();
- while (iterator.hasNext()) {
- ConsumerRecord<String, Bytes> record = iterator.next();
- handlerMap.get(record.topic()).handle(record);
+ try {
+ ConsumerRecords<String, Bytes> consumerRecords =
consumer.poll(Duration.ofMillis(500L));
+ if (!consumerRecords.isEmpty()) {
+ Iterator<ConsumerRecord<String, Bytes>> iterator =
consumerRecords.iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord<String, Bytes> record = iterator.next();
+ executor.submit(() ->
handlerMap.get(record.topic()).handle(record));
+ }
+ if (!enableKafkaMessageAutoCommit) {
Review comment:
Change the condition to no `enable.auto.commit`. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]