Jose Armando Garcia Sancio created KAFKA-13148:
--------------------------------------------------
Summary: Kraft Controller doesn't handle scheduleAppend returning
Long.MAX_VALUE
Key: KAFKA-13148
URL: https://issues.apache.org/jira/browse/KAFKA-13148
Project: Kafka
Issue Type: Bug
Components: controller, kraft
Reporter: Jose Armando Garcia Sancio
In some cases the RaftClient will return Long.MAX_VALUE:
{code:java}
/**
* Append a list of records to the log. The write will be scheduled for
some time
* in the future. There is no guarantee that appended records will be
written to
* the log and eventually committed. However, it is guaranteed that if
any of the
* records become committed, then all of them will be.
*
* If the provided current leader epoch does not match the current epoch,
which
* is possible when the state machine has yet to observe the epoch
change, then
* this method will return {@link Long#MAX_VALUE} to indicate an offset
which is
* not possible to become committed. The state machine is expected to
discard all
* uncommitted entries after observing an epoch change.
*
* @param epoch the current leader epoch
* @param records the list of records to append
* @return the expected offset of the last record; {@link Long#MAX_VALUE}
if the records could
* be committed; null if no memory could be allocated for the
batch at this time
* @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if
the size of the records is greater than the maximum
* batch size; if this exception is throw none of the elements in
records were
* committed
*/
Long scheduleAtomicAppend(int epoch, List<T> records);
{code}
The controller doesn't handle this case:
{code:java}
// If the operation returned a batch of records, those
records need to be
// written before we can return our result to the user.
Here, we hand off
// the batch of records to the raft client. They will be
written out
// asynchronously.
final long offset;
if (result.isAtomic()) {
offset = raftClient.scheduleAtomicAppend(controllerEpoch,
result.records());
} else {
offset = raftClient.scheduleAppend(controllerEpoch,
result.records());
}
op.processBatchEndOffset(offset);
writeOffset = offset;
resultAndOffset = ControllerResultAndOffset.of(offset,
result);
for (ApiMessageAndVersion message : result.records()) {
replay(message.message(), Optional.empty(), offset);
}
snapshotRegistry.getOrCreateSnapshot(offset);
log.debug("Read-write operation {} will be completed when the
log " +
"reaches offset {}.", this, resultAndOffset.offset());
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)