apoorvmittal10 commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1636568658
########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -350,87 +752,73 @@ public String toString() { * fetched from the leader. The state of the record is used to determine if the record should * be re-deliver or if it can be acknowledged or archived. */ - private static class InFlightState { - /** - * The state of the fetch batch records. - */ + static final class InFlightState { + + // The state of the fetch batch records. private RecordState state; - /** - * The number of times the records has been delivered to the client. - */ + // The number of times the records has been delivered to the client. private int deliveryCount; - /** - * The member id of the client that is fetching/acknowledging the record. - */ + // The member id of the client that is fetching/acknowledging the record. private String memberId; + // The timer task for the acquisition lock timeout. + private AcquisitionLockTimerTask acquisitionLockTimeoutTask; + InFlightState(RecordState state, int deliveryCount, String memberId) { + this(state, deliveryCount, memberId, null); + } + + InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) { this.state = state; this.deliveryCount = deliveryCount; this.memberId = memberId; + this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; } - @Override - public int hashCode() { - return Objects.hash(state, deliveryCount, memberId); + void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) { + this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; + void cancelAndClearAcquisitionLockTimeoutTask() { + acquisitionLockTimeoutTask.cancel(); + acquisitionLockTimeoutTask = null; + } + + /** + * Try to update the state of the records. The state of the records can only be updated if the + * new state is allowed to be transitioned from old state. The delivery count is not incremented + * if the state update is unsuccessful. + * + * @param newState The new state of the records. + * @param incrementDeliveryCount Whether to increment the delivery count. + * + * @return {@code InFlightState} if update succeeds, null otherwise. Returning state + * helps update chaining. + */ + private InFlightState tryUpdateState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) { + try { + if (newState == RecordState.AVAILABLE && deliveryCount >= maxDeliveryCount) { + newState = RecordState.ARCHIVED; + } + state = state.validateTransition(newState); + if (incrementDeliveryCount && newState != RecordState.ARCHIVED) { + deliveryCount++; + } + memberId = newMemberId; + return this; + } catch (IllegalStateException e) { + log.info("Failed to update state of the records", e); Review Comment: I have moved it to error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org