mjsax commented on a change in pull request #11336:
URL: https://github.com/apache/kafka/pull/11336#discussion_r717032981



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -110,6 +112,11 @@ public long currentStreamTimeMs() {
         throw new UnsupportedOperationException("There is no concept of 
stream-time for a global processor.");
     }
 
+    @Override
+    public Map<TopicPartition, Long> currentPositions() {
+        throw new UnsupportedOperationException("currentPositions is not 
supported for global processors.");

Review comment:
       I agree that it might not be required (useful?) to support. However, I 
think it's different to "stream-time" because (1) stream-time depends on 
ordered processing across partitions of different topics (what does not apply 
to a global task that reads a single topic and multiple/all partitions of this 
single topic) and (2) a global-task is _decoupled_ from the other tasks for 
"time synchronization" (time synchronization is important for joins!). -- 
However, for just tracking the offset per partition, there is no cross-task 
synchronization necessary and we can report the offset per partition easily. 
Thus, it's a well-define operation for a global-task too, while "stream-time" 
is undefined for a global-task.
   
   > The code isn't already in place to track it (because we don't commit 
global tasks)
   
   Don't think it would be hard to add? Should even be simpler than for regular 
tasks, because we don't have a "record buffer", but only the current 
`ConsumerRecord` iterator: so we can just do a look-ahead of one record, or use 
`position()` if we are at the end of the iterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to