nizhikov commented on code in PR #202:
URL: https://github.com/apache/ignite-extensions/pull/202#discussion_r1132074490
##########
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java:
##########
@@ -88,12 +95,25 @@ public KafkaToIgniteMetadataUpdater(
/** Polls all available records from metadata topic and applies it to
Ignite. */
public synchronized void updateMetadata() {
+ Map<TopicPartition, Long> endOffsets =
cnsmr.endOffsets(cnsmr.assignment(), Duration.ofMillis(kafkaReqTimeout));
+
+ // If we have an information, that offsets in metadata topic has not
changed, we can skip polling loop.
+ if (!F.isEmpty(endOffsets) && F.eqNotOrdered(offsets, endOffsets))
+ return;
+
+ offsets = new HashMap<>(endOffsets);
+
+ long pollTimeout = kafkaReqTimeout;
+
while (true) {
- ConsumerRecords<Void, byte[]> recs =
cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
+ ConsumerRecords<Void, byte[]> recs =
cnsmr.poll(Duration.ofMillis(pollTimeout));
if (recs.count() == 0)
return;
+ // Next polls can be performed with a small timeout.
Review Comment:
Please, revert changes regarding timeout
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]