hachikuji commented on a change in pull request #8841: URL: https://github.com/apache/kafka/pull/8841#discussion_r440984705
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp completedFetch.partition); } else { FetchPosition position = subscriptions.position(completedFetch.partition); - if (completedFetch.nextFetchOffset == position.offset) { - List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords); - - log.trace("Returning {} fetched records at offset {} for assigned partition {}", - partRecords.size(), position, completedFetch.partition); - - if (completedFetch.nextFetchOffset > position.offset) { - FetchPosition nextPosition = new FetchPosition( - completedFetch.nextFetchOffset, - completedFetch.lastEpoch, - position.currentLeader); - log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition); - subscriptions.position(completedFetch.partition, nextPosition); - } + if (position != null) { + if (completedFetch.nextFetchOffset == position.offset) { + List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords); + + log.trace("Returning {} fetched records at offset {} for assigned partition {}", + partRecords.size(), position, completedFetch.partition); + + if (completedFetch.nextFetchOffset > position.offset) { + FetchPosition nextPosition = new FetchPosition( + completedFetch.nextFetchOffset, + completedFetch.lastEpoch, + position.currentLeader); + log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition); + subscriptions.position(completedFetch.partition, nextPosition); + } - Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel); - if (partitionLag != null) - this.sensors.recordPartitionLag(completedFetch.partition, partitionLag); + Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel); + if (partitionLag != null) + this.sensors.recordPartitionLag(completedFetch.partition, partitionLag); - Long lead = subscriptions.partitionLead(completedFetch.partition); - if (lead != null) { - this.sensors.recordPartitionLead(completedFetch.partition, lead); - } + Long lead = subscriptions.partitionLead(completedFetch.partition); + if (lead != null) { + this.sensors.recordPartitionLead(completedFetch.partition, lead); + } - return partRecords; + return partRecords; + } else { + // these records aren't next in line based on the last consumed position, ignore them + // they must be from an obsolete request + log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", + completedFetch.partition, completedFetch.nextFetchOffset, position); + } } else { - // these records aren't next in line based on the last consumed position, ignore them - // they must be from an obsolete request - log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", - completedFetch.partition, completedFetch.nextFetchOffset, position); + log.warn("Ignoring fetched records for {} at offset {} since the current position is undefined", Review comment: This comment applies to a few of the added null checks where we have already validated that the partition is "fetchable." I am wondering if it would be more consistent to raise an exception. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ########## @@ -924,10 +949,19 @@ default FetchState transitionTo(FetchState newState) { } } + /** + * Return the valid states which this state can transition to + */ Collection<FetchState> validTransitions(); + /** + * Test if this state has a position Review comment: Since the usage is a bit different, maybe we could change the name to `requiresPosition`. Then this check seems a little more intuitive: ```java if (this.position == null && nextState.requiresPosition()) { throw new IllegalStateException("Transitioned subscription state to " + nextState + ", but position is null"); } ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ########## @@ -745,6 +745,9 @@ private void transitionState(FetchState newState, Runnable runIfTransitioned) { if (nextState.equals(newState)) { this.fetchState = nextState; runIfTransitioned.run(); + if (this.position == null && nextState.hasPosition()) { Review comment: Would it make sense to set `position` explicitly to null if the `FetchState` does not expect to have it. For example, it seems currently when we reset the offset, we leave `position` at whatever value it had previously. If we were initializing, then it would be null. If we had an offset out of range, it would be non-null. It might be easier to reason about the logic if it is always null in the AWAIT_RESET state. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ########## @@ -647,7 +647,7 @@ public synchronized void resetMissingPositions() { assignment.stream().forEach(state -> { TopicPartition tp = state.topicPartition(); TopicPartitionState partitionState = state.value(); - if (!partitionState.hasPosition()) { + if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) { Review comment: Should we change the name of this method to something like `resetInitializingPositions`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org