lucasbru commented on code in PR #14936:
URL: https://github.com/apache/kafka/pull/14936#discussion_r1417082199
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -79,6 +84,10 @@ private class StateUpdaterThread extends Thread {
private long totalCheckpointLatency = 0L;
+ private volatile long fetchDeadline = -1L;
Review Comment:
Same, rename please
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -231,6 +242,36 @@ private void restoreTasks(final long now) {
}
}
+ private void maybeGetClientInstanceIds() {
+ // we pass in a timeout of zero into each `clientInstanceId()` call
+ // to just trigger the "get instance id" background RPC;
+ // we don't want to block the state updater thread that can do
useful work in the meantime
+
+ if (fetchDeadline != -1) {
+ if (!restoreConsumerInstanceIdFuture.isDone()) {
+ if (fetchDeadline >= time.milliseconds()) {
+ try {
+ restoreConsumerClientInstanceId =
restoreConsumer.clientInstanceId(Duration.ZERO);
+
restoreConsumerInstanceIdFuture.complete(restoreConsumerClientInstanceId);
+ fetchDeadline = -1L;
+ } catch (final IllegalStateException disabledError) {
+ restoreConsumerInstanceIdFuture.complete(null);
+ fetchDeadline = -1L;
+ } catch (final TimeoutException swallow) {
+ // swallow
+ } catch (final Exception error) {
+
restoreConsumerInstanceIdFuture.completeExceptionally(error);
+ fetchDeadline = -1L;
+ }
+ } else {
+ restoreConsumerInstanceIdFuture.completeExceptionally(
+ new TimeoutException("Could not retrieve main
consumer client instance id.")
Review Comment:
that's the restore consumer client instance id
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -772,6 +817,26 @@ public Set<StandbyTask> getStandbyTasks() {
);
}
+ @Override
+ public KafkaFutureImpl<Uuid> restoreConsumerInstanceId(final Duration
timeout) {
+ if (stateUpdaterThread.restoreConsumerInstanceIdFuture != null) {
+ return stateUpdaterThread.restoreConsumerInstanceIdFuture;
Review Comment:
I still want to update the fetch deadline before returning, right? Otherwise
I'll timeout too early.
Also, what if the existing future is completed with a timeout exception
already?
How about this:
- Update the `fetchDeadline` first in this function (max of new and current
deadline)
- Do not complete the future with a timeout exception if the deadline
expires, instead just set the future to null, deadline to -1.
- Use bounded-time `get` on the future in the application thread.
- When the current future completed with an error, also set the future to
null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]