smjn opened a new pull request, #17580:
URL: https://github.com/apache/kafka/pull/17580

   It is possible that due to a network partition or delays on the wire a read 
request with a higher `leaderEpoch` reaches the share coordinator before a 
write request with a lower one for the same share partition. In this case, we 
have chosen to honor the highest `leaderEpoch` request and reject any 
subsequent lower value requests irrespective of the RPC type. However, this 
means that we need to persist the `leaderEpoch` seen in the read requests.
   
   In the `ShareCoordinatorShard.readState` method we are storing the 
`leaderEpoch` present in the request directly in the `leaderEpochMap` timeline 
hashmap, if it is the highest `leaderEpoch` seen so far for a specific share 
partition. This however is folly. 
   
   The coordinator runtime guarantees the consistency of the timeline data 
structures when updated via the `replay` method. The `replay` method is called 
by the runtime whenever it persists some records into the topic it is managing. 
This method is implemented in `ShareCoordinatorShard.replay`.
   
   Therefore, to remedy the situation this PR adds code into the 
`ShareCoordinatorService.readState` method to issue a 
`runtime.scheduleWriteOperation` call if the incoming read state request holds 
a valid `leaderEpoch` value (not -1). We cannot ascertain if the `leaderEpoch` 
in the read request is the highest so far for the given share partition because 
it is not possible to lookup the timeline data structures in the 
`ShareCoordinatorService` class since they are housed in 
`ShareCoordinatorShard` and an offset is needed to do the lookup, which we 
won't have. Secondly, we cannot move code to write the record in the shard read 
state method as we need the runtime to execute that callback which is present 
in the `ShareCoordinatorService`.
   
   Hence, the new code will issue a `scheduleWriteOperation` call if 
leaderEpoch is != -1 and the supplied callback method 
`ShareCoordinatorShard.writeLeaderEpoch` (also part of the PR) will generate a 
record if the `leaderEpoch` is highest, or no record if it is same as last 
seen, or error if the epoch is old. We required a separate method because we 
want to only look at `leaderEpoch` and simply ignore other data fields. Also, 
we want to perform the optimization of not generating records if the 
`leaderEpoch` is -1 or equal to highest seen so far.
   
   Subsequently, a sequential call to the `runtime.scheduleReadOperation` will 
be made in `ShareCoordinatorService.readState` which is same as before.
   
   **TLDR**: We will issue a phantom write call (not explicitly issued by a 
caller) in `ShareCoordinatorService.readState` if the read request contains a 
valid `leaderEpoch`. Based on the response to the write, a standard read call 
will be scheduled. This way we will be able to persist the `leaderEpoch` 
correctly in the topic and timeline data structures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to