[ 
https://issues.apache.org/jira/browse/KAFKA-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053595#comment-17053595
 ] 

ASF GitHub Bot commented on KAFKA-9666:
---------------------------------------

bob-barrett commented on pull request #8239: KAFKA-9666: Rotate producer ID 
when fencing producer with exhausted e…
URL: https://github.com/apache/kafka/pull/8239
 
 
   …poch
   
   When fencing producers, we currently blindly bump the epoch by 1 and write 
an abort marker to the transaction log. If the log is unavailable (for example, 
because the number of in-sync replicas is less than min.in.sync.replicas), we 
will roll back the attempted write of the abort marker, but still increment the 
epoch in the transaction metadata cache. During periods of prolonged log 
unavailability, producer retires of InitProducerId calls can cause the epoch to 
be increased to the point of exhaustion, at which point further InitProducerId 
calls fail because the producer can no longer be fenced. This patch changes the 
fencing behavior to rotate the producer ID if the epoch is exhausted, 
preventing this situation.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Transactional producer Epoch could not be reset
> -----------------------------------------------
>
>                 Key: KAFKA-9666
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9666
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Bob Barrett
>            Priority: Critical
>             Fix For: 2.5.0
>
>
> As of today, the producer epoch keeps increasing until it hits Short.Max. The 
> correct behavior at this point should be making another call to re-initialize 
> a new PID, otherwise trying with Short.Max will throw fatal exception which 
> eventually kills the producer.
> Stream log:
> [2020-03-04T20:25:41-08:00] 
> (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) [2020-03-05 
> 04:25:41,147] ERROR 
> [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Thread 
>    StreamsThread threadId: 
> stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3
> [2020-03-04T20:25:41-08:00] 
> (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) TaskManager
>         MetadataState:
>                 GlobalMetadata: []
>                 GlobalStores: []
>                 My HostInfo: HostInfo\{host='unknown', port=-1}
>                 Cluster(id = null, nodes = [], partitions = [], controller = 
> null)
>         Active tasks:
>                 Running:
>                 Running Partitions:
>                 New:
>                 Restoring:
>                 Restoring Partitions:
>                 Restored Partitions:
>                 Suspended:
>         Standby tasks:
>                 Running:
>                 Running Partitions:
>                 New:
>  encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> [2020-03-04T20:25:41-08:00] 
> (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Failed 
> to rebalance.
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:749)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> [2020-03-04T20:25:41-08:00] 
> (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) Caused by: 
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:571)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>         at java.lang.Thread.run(Thread.java:748)
>  
>  
> Producer log:
> [2020-03-04T20:25:41-08:00] 
> (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_server-log) [2020-03-05 
> 04:25:40,885] INFO [Transaction State Manager 1001]: TransactionalId 
> stream-soak-test-1_0 append transaction log for 
> TxnTransitMetadata(producerId=0, producerEpoch=576, txnTimeoutMs=60000, 
> txnState=Ongoing, 
> topicPartitions=Set(stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0,
>  stream-soak-test-logData10MinuteFinalCount-store-changelog-0, 
> stream-soak-test-logData10MinuteSuppressedCount-store-changelog-0, 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-0, 
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog-0, 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000025-changelog-0, 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-0, 
> windowed-node-counts-0), txnStartTimestamp=1583382340885, 
> txnLastUpdateTimestamp=1583382340885) transition failed due to 
> COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Ongoing), 
> aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the 
> callback (kafka.coordinator.transaction.TransactionStateManager)
> [2020-03-04T20:25:41-08:00] 
> (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_stdout) 
> java.lang.IllegalStateException: Cannot fence producer with epoch equal to 
> Short.MaxValue since this would overflow
>         at 
> kafka.coordinator.transaction.TransactionMetadata.prepareFenceProducerEpoch(TransactionMetadata.scala:194)
>         at 
> kafka.coordinator.transaction.TransactionCoordinator.kafka$coordinator$transaction$TransactionCoordinator$$prepareInitProduceIdTransit(TransactionCoordinator.scala:216)
>         at 
> kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143)
>         at 
> kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>         at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>         at 
> kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:142)
>         at 
> kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:138)
>         at scala.util.Either$RightProjection.flatMap(Either.scala:522)
>         at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:137)
>         at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:1638)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:135)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to