Jose Armando Garcia Sancio created KAFKA-13148:
--------------------------------------------------

             Summary: Kraft Controller doesn't handle scheduleAppend returning 
Long.MAX_VALUE
                 Key: KAFKA-13148
                 URL: https://issues.apache.org/jira/browse/KAFKA-13148
             Project: Kafka
          Issue Type: Bug
          Components: controller, kraft
            Reporter: Jose Armando Garcia Sancio


In some cases the RaftClient will return Long.MAX_VALUE:
{code:java}
      /**
       * Append a list of records to the log. The write will be scheduled for 
some time
       * in the future. There is no guarantee that appended records will be 
written to
       * the log and eventually committed. However, it is guaranteed that if 
any of the
       * records become committed, then all of them will be.
       *
       * If the provided current leader epoch does not match the current epoch, 
which
       * is possible when the state machine has yet to observe the epoch 
change, then
       * this method will return {@link Long#MAX_VALUE} to indicate an offset 
which is
       * not possible to become committed. The state machine is expected to 
discard all
       * uncommitted entries after observing an epoch change.
       *
       * @param epoch the current leader epoch
       * @param records the list of records to append
       * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the records could
       *         be committed; null if no memory could be allocated for the 
batch at this time
       * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if 
the size of the records is greater than the maximum
       *         batch size; if this exception is throw none of the elements in 
records were
       *         committed
       */
      Long scheduleAtomicAppend(int epoch, List<T> records);
 {code}
The controller doesn't handle this case:
{code:java}
                  // If the operation returned a batch of records, those 
records need to be
                  // written before we can return our result to the user.  
Here, we hand off
                  // the batch of records to the raft client.  They will be 
written out
                  // asynchronously.
                  final long offset;
                  if (result.isAtomic()) {
                      offset = raftClient.scheduleAtomicAppend(controllerEpoch, 
result.records());
                  } else {
                      offset = raftClient.scheduleAppend(controllerEpoch, 
result.records());
                  }
                  op.processBatchEndOffset(offset);
                  writeOffset = offset;
                  resultAndOffset = ControllerResultAndOffset.of(offset, 
result);
                  for (ApiMessageAndVersion message : result.records()) {
                      replay(message.message(), Optional.empty(), offset);
                  }
                  snapshotRegistry.getOrCreateSnapshot(offset);
                  log.debug("Read-write operation {} will be completed when the 
log " +
                      "reaches offset {}.", this, resultAndOffset.offset());
 {code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to