lucasbru commented on code in PR #14951:
URL: https://github.com/apache/kafka/pull/14951#discussion_r1418947853


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -716,6 +734,126 @@ boolean runLoop() {
         return true;
     }
 
+    // visible for testing
+    void maybeGetClientInstanceIds() {
+        // we pass in a timeout of zero into each `clientInstanceId()` call
+        // to just trigger the "get instance id" background RPC;
+        // we don't want to block the stream thread that can do useful work in 
the meantime
+
+        if (fetchDeadlineClientInstanceId != -1) {
+            if 
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
+                final Map<TaskId, Task> activeTasks = 
taskManager.activeTaskMap();
+
+                // setup task futures if necessary
+                if (!producerInstanceIdFuture.isDone()) {
+                    if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
+                        // only active tasks have a producer (standby tasks 
don't)
+                        // producers are set up during task creation and thus 
all active tasks have a valid producer
+                        for (final Map.Entry<TaskId, Task> task : 
activeTasks.entrySet()) {
+                            final TaskId taskId = task.getKey();
+                            KafkaFutureImpl<Uuid> future = 
taskProducersInstanceIdsFuture.get(taskId);
+                            if (future == null || 
future.isCompletedExceptionally()) {
+                                future = new KafkaFutureImpl<>();
+                                ((StreamTask) 
task.getValue()).producerInstanceId = future;
+                                taskProducersInstanceIdsFuture.put(taskId, 
future);
+                            }
+                        }
+                        if (stateUpdaterEnabled) {

Review Comment:
   Dropping some ideas how to implement this with state updater:
   
    - Add `clientInstanceIds` method to the `Task` interface, and also 
implement it in `ReadOnlyTask`.
    - NOOP for standby tasks and of EOS_ALPHA is not enabled
    - Since the producer is thread safe, you should then be able to call that 
function whether it's owned by the state updater or the stream thread.
    - Use `tasks.activeTasks` or something similar to get a collection of _all_ 
tasks, including state updater tasks.
    - Do it like it's defined here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to