hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r440984705



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = 
subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
-                        partRecords.size(), position, 
completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition 
{}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, 
nextPosition);
-                }
+            if (position != null) {
+                if (completedFetch.nextFetchOffset == position.offset) {
+                    List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
+
+                    log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
+                            partRecords.size(), position, 
completedFetch.partition);
+
+                    if (completedFetch.nextFetchOffset > position.offset) {
+                        FetchPosition nextPosition = new FetchPosition(
+                                completedFetch.nextFetchOffset,
+                                completedFetch.lastEpoch,
+                                position.currentLeader);
+                        log.trace("Update fetching position to {} for 
partition {}", nextPosition, completedFetch.partition);
+                        subscriptions.position(completedFetch.partition, 
nextPosition);
+                    }
 
-                Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    this.sensors.recordPartitionLag(completedFetch.partition, 
partitionLag);
+                    Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
+                    if (partitionLag != null)
+                        
this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
 
-                Long lead = 
subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    this.sensors.recordPartitionLead(completedFetch.partition, 
lead);
-                }
+                    Long lead = 
subscriptions.partitionLead(completedFetch.partition);
+                    if (lead != null) {
+                        
this.sensors.recordPartitionLead(completedFetch.partition, lead);
+                    }
 
-                return partRecords;
+                    return partRecords;
+                } else {
+                    // these records aren't next in line based on the last 
consumed position, ignore them
+                    // they must be from an obsolete request
+                    log.debug("Ignoring fetched records for {} at offset {} 
since the current position is {}",
+                            completedFetch.partition, 
completedFetch.nextFetchOffset, position);
+                }
             } else {
-                // these records aren't next in line based on the last 
consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since 
the current position is {}",
-                        completedFetch.partition, 
completedFetch.nextFetchOffset, position);
+                log.warn("Ignoring fetched records for {} at offset {} since 
the current position is undefined",

Review comment:
       This comment applies to a few of the added null checks where we have 
already validated that the partition is "fetchable." I am wondering if it would 
be more consistent to raise an exception.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -924,10 +949,19 @@ default FetchState transitionTo(FetchState newState) {
             }
         }
 
+        /**
+         * Return the valid states which this state can transition to
+         */
         Collection<FetchState> validTransitions();
 
+        /**
+         * Test if this state has a position

Review comment:
       Since the usage is a bit different, maybe we could change the name to 
`requiresPosition`. Then this check seems a little more intuitive:
   ```java
                   if (this.position == null && nextState.requiresPosition()) {
                       throw new IllegalStateException("Transitioned 
subscription state to " + nextState + ", but position is null");
                   }
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -745,6 +745,9 @@ private void transitionState(FetchState newState, Runnable 
runIfTransitioned) {
             if (nextState.equals(newState)) {
                 this.fetchState = nextState;
                 runIfTransitioned.run();
+                if (this.position == null && nextState.hasPosition()) {

Review comment:
       Would it make sense to set `position` explicitly to null if the 
`FetchState` does not expect to have it. For example, it seems currently when 
we reset the offset, we leave `position` at whatever value it had previously. 
If we were initializing, then it would be null. If we had an offset out of 
range, it would be non-null. It might be easier to reason about the logic if it 
is always null in the AWAIT_RESET state.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -647,7 +647,7 @@ public synchronized void resetMissingPositions() {
         assignment.stream().forEach(state -> {
             TopicPartition tp = state.topicPartition();
             TopicPartitionState partitionState = state.value();
-            if (!partitionState.hasPosition()) {
+            if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) {

Review comment:
       Should we change the name of this method to something like 
`resetInitializingPositions`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to