dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1407405303


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
                 }
               }
 
+              val currentHighWatermark = log.highWatermark
+              if (currentHighWatermark > previousHighWatermark) {
+                onHighWatermarkUpdated.accept(currentHighWatermark)
+                previousHighWatermark = currentHighWatermark
+              }
+
+              if (currentOffset >= currentHighWatermark) {
+                onLoadedBatch.accept(currentOffset)
+              }

Review Comment:
   `onLoadedBatch` updates the lastWrittenOffset so shouldn't we call this 
after updating `currentOffset`?



##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
                 }
               }
 
+              val currentHighWatermark = log.highWatermark
+              if (currentHighWatermark > previousHighWatermark) {
+                onHighWatermarkUpdated.accept(currentHighWatermark)
+                previousHighWatermark = currentHighWatermark
+              }

Review Comment:
   I wonder if we should rather do this after updating the last written offset. 
At least, conceptually it makes more sense. What do you think?



##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T](
    * Loads the coordinator by reading all the records from the TopicPartition
    * and applying them to the Replayable object.
    *
-   * @param tp          The TopicPartition to read from.
-   * @param coordinator The object to apply records to.
+   * @param tp                      The TopicPartition to read from.
+   * @param coordinator             The object to apply records to.
+   * @param onLoadedBatch           Invoked when a batch was successfully 
loaded.
+   * @param onHighWatermarkUpdated  Invoked when the high watermark advanced.
    */
   override def load(
     tp: TopicPartition,
-    coordinator: CoordinatorPlayback[T]
+    coordinator: CoordinatorPlayback[T],
+    onLoadedBatch: Consumer[java.lang.Long],
+    onHighWatermarkUpdated: Consumer[java.lang.Long]

Review Comment:
   This does not look good. Is there a reason why you did not use an interface 
which merges those three? In my opinion, it would be better to have an 
interface which defines:
   * replay
   * updateLastWrittenOffset
   * updateLastCommittedOffset
   
   We could re-purpose `CoordinatorPlayback` for this and introduce a new 
interface for the loader.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -543,8 +543,14 @@ private void transitionTo(
 
                 case ACTIVE:
                     state = CoordinatorState.ACTIVE;
-                    snapshotRegistry.getOrCreateSnapshot(0);
                     partitionWriter.registerListener(tp, 
highWatermarklistener);
+
+                    // If the partition did not contain any records, we would 
not have generated a snapshot
+                    // while loading.
+                    if (lastWrittenOffset == -1L) {
+                        updateLastWrittenOffset(0);
+                        updateLastCommittedOffset(0);
+                    }

Review Comment:
   I was wondering whether we could just keep 0 as the default values for both 
(In LOADING state) and create the base snapshot there. Then, we don't have to 
handle a special case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to