lucasbru commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1417036886
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1477,6 +1533,29 @@ public Object getStateLock() {
return stateLock;
}
+ public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final
Duration timeout) {
+ final Map<String, KafkaFuture<Uuid>> result = new HashMap<>();
+
+ synchronized (fetchDeadlines) {
+ boolean addDeadline = false;
Review Comment:
Why do we need this boolean?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -335,6 +342,11 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
+ private volatile Uuid mainConsumerClientInstanceId = null;
+
+ private final List<Long> fetchDeadlines = new LinkedList<>();
Review Comment:
maybe rename. fetch deadlines for what?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -713,6 +727,48 @@ boolean runLoop() {
return true;
}
+ // visible for testing
+ void maybeGetClientInstanceIds() {
+ synchronized (fetchDeadlines) {
+ // we pass in a timeout of zero into each `clientInstanceId()` call
+ // to just trigger the "get instance id" background RPC;
+ // we don't want to block the stream thread that can do useful
work in the meantime
+
+ if (!fetchDeadlines.isEmpty()) {
+ if (fetchDeadlines.get(0) >= time.milliseconds()) {
+ try {
+ mainConsumerClientInstanceId =
mainConsumer.clientInstanceId(Duration.ZERO);
+ mainConsumerInstanceIdFutures.forEach(f ->
f.complete(mainConsumerClientInstanceId));
Review Comment:
It seems I can remove all the fetch deadlines here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]