wcarlson5 commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586728619



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -282,6 +284,21 @@ public boolean commitNeeded() {
         return Collections.unmodifiableMap(stateMgr.changelogOffsets());
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        return new HashMap<>();
+    }

Review comment:
       I changed it a bit so that we don't need to have it be modifiable

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -266,13 +272,13 @@ public void suspend() {
             case CREATED:
                 log.info("Suspended created");
                 transitionTo(State.SUSPENDED);
-
+                timeCurrentIdlingStarted = 
Optional.of(System.currentTimeMillis());

Review comment:
       sure

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1598,7 +1598,6 @@ private long countStreamThread(final 
Predicate<StreamThread> predicate) {
      * @return the set of {@link ThreadMetadata}.
      */
     public Set<ThreadMetadata> localThreadsMetadata() {
-        validateIsRunningOrRebalancing();

Review comment:
       They probably could as well. But In the KIP we only changed this one.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();
+
+    Map<TopicPartition, Long> getHighWaterMark();
+
+    Optional<Long> getTimeCurrentIdlingStarted();

Review comment:
       good idea

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1113,6 +1116,16 @@ private void commitOffsetsOrTransaction(final Map<Task, 
Map<TopicPartition, Offs
         }
     }
 
+    private void updateTaskMetadata(final Map<TopicPartition, 
OffsetAndMetadata> allOffsets) {
+        for (final Task task: tasks.activeTasks()) {
+            for (final TopicPartition topicPartition: task.inputPartitions()) {
+                if (allOffsets.containsKey(topicPartition)) {
+                    task.getCommittedOffsets().put(topicPartition, 
allOffsets.get(topicPartition).offset());

Review comment:
       I think you are right. I will update

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -32,10 +34,22 @@
 
     private final Set<TopicPartition> topicPartitions;
 
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private Optional<Long> timeCurrentIdlingStarted;

Review comment:
       yes

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to