[ 
https://issues.apache.org/jira/browse/KAFKA-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586140#comment-16586140
 ] 

ASF GitHub Bot commented on KAFKA-7298:
---------------------------------------

hachikuji closed pull request #5518: KAFKA-7298; Raise 
UnknownProducerIdException if next sequence number is unknown
URL: https://github.com/apache/kafka/pull/5518
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 49c887b771c..2f711234bdb 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -234,9 +234,13 @@ private[log] class ProducerAppendInfo(val producerId: Long,
         RecordBatch.NO_SEQUENCE
 
       if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != 0) {
-        // the epoch was bumped by a control record, so we expect the sequence 
number to be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producerId $producerId: found $appendFirstSeq " +
-          s"(incoming seq. number), but expected 0")
+        // We have a matching epoch, but we do not know the next sequence 
number. This case can happen if
+        // only a transaction marker is left in the log for this producer. We 
treat this as an unknown
+        // producer id error, so that the producer can check the log start 
offset for truncation and reset
+        // the sequence number. Note that this check follows the fencing 
check, so the marker still fences
+        // old producers even if it cannot determine our next expected 
sequence number.
+        throw new UnknownProducerIdException(s"Local producer state matches 
expected epoch $producerEpoch " +
+          s"for producerId=$producerId, but next expected sequence number is 
not known.")
       } else if (!inSequence(currentLastSeq, appendFirstSeq)) {
         throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producerId $producerId: $appendFirstSeq " +
           s"(incoming seq. number), $currentLastSeq (current end sequence 
number)")
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 053aed7c915..f9f4a239023 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -81,6 +81,35 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testAppendTxnMarkerWithNoProducerState(): Unit = {
+    val producerEpoch = 2.toShort
+    appendEndTxnMarker(stateManager, producerId, producerEpoch, 
ControlRecordType.COMMIT, offset = 27L)
+
+    val firstEntry = 
stateManager.lastEntry(producerId).getOrElse(fail("Expected last entry to be 
defined"))
+    assertEquals(producerEpoch, firstEntry.producerEpoch)
+    assertEquals(producerId, firstEntry.producerId)
+    assertEquals(RecordBatch.NO_SEQUENCE, firstEntry.lastSeq)
+
+    // Fencing should continue to work even if the marker is the only thing 
left
+    assertThrows[ProducerFencedException] {
+      append(stateManager, producerId, 0.toShort, 0, 0L, 4L)
+    }
+
+    // If the transaction marker is the only thing left in the log, then an 
attempt to write using a
+    // non-zero sequence number should cause an UnknownProducerId, so that the 
producer can reset its state
+    assertThrows[UnknownProducerIdException] {
+      append(stateManager, producerId, producerEpoch, 17, 0L, 4L)
+    }
+
+    // The broker should accept the request if the sequence number is reset to 0
+    append(stateManager, producerId, producerEpoch, 0, 39L, 4L)
+    val secondEntry = 
stateManager.lastEntry(producerId).getOrElse(fail("Expected last entry to be 
defined"))
+    assertEquals(producerEpoch, secondEntry.producerEpoch)
+    assertEquals(producerId, secondEntry.producerId)
+    assertEquals(0, secondEntry.lastSeq)
+  }
+
   @Test
   def testProducerSequenceWrapAround(): Unit = {
     val epoch = 15.toShort


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Concurrent DeleteRecords can lead to fatal OutOfSequence error in producer
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-7298
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7298
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>
> We have logic in the producer to handle unknown producer errors. Basically 
> when the producer gets an unknown producer error, it checks whether the log 
> start offset is larger than the last acknowledged offset. If it is, then we 
> know the error is spurious and we reset the sequence number to 0, which the 
> broker will then accept.
> It can happen after a DeleteRecords call, however, that the only record 
> remaining in the log is a transaction marker, which does not have a sequence 
> number. The error we get in this case is OUT_OF_SEQUENCE rather than 
> UNKNOWN_PRODUCER, which is fatal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to