[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021982#comment-17021982
 ] 

Oleksii Boiko edited comment on KAFKA-8803 at 1/24/20 11:09 AM:
----------------------------------------------------------------

Hi all,

We are facing same issue. The frequency of issue reproducing was increased 
after migrating to Kafka-streams 2.3
 For our case increase was caused by changing commit behaviour in 
Kafka-streams. Previously(we used 2.0.1), "commit offsets" was executed only in 
case when new record was consumed on source node but in version 2.3 "commit 
offset" executes on each "punctuate" call even no changes were made. We have 
punctuator with 1s wall-clock scheduler. As the the result commit offsets 
operations count was grown. Additionally to this we had detected that error(on 
broker side) which causes transctional id stuck

 
{noformat}
java.lang.IllegalStateException: TransactionalId <id> failed transition to 
state TxnTransitMetadata(producerId=61432, producerEpoch=0, txnTimeoutMs=60000, 
txnState=Ongoing, topicPartitions=Set(__consumer_offsets-10), 
txnStartTimestamp=1577934186261, txnLastUpdateTimestamp=1577934186261) due to 
unexpected metadata{noformat}
appears almost every day  at the same time and txnStartTimestamp of previous 
transaction state is younger than txnStartTimestamp of transaction target state

If I'm correct that means that transaction can not be transferred to "Ongoing" 
state and as the result it never expires due to only "Ongoing" transactions can 
be expired
 kafka.coordinator.transaction.TransactionStateManager#timedOutTransactions
{noformat}
def timedOutTransactions(): Iterable[TransactionalIdAndProducerIdEpoch] = {
    val now = time.milliseconds()
    inReadLock(stateLock) {
      transactionMetadataCache.filter { case (txnPartitionId, _) =>
        !leavingPartitions.exists(_.txnPartitionId == txnPartitionId)
      }.flatMap { case (_, entry) =>
        entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
          if (txnMetadata.pendingTransitionInProgress) {
            false
          } else {
            txnMetadata.state match {
              case Ongoing =>
                txnMetadata.txnStartTimestamp + txnMetadata.txnTimeoutMs < now
              case _ => false
            }
          }
        }.map { case (txnId, txnMetadata) =>
          TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId, 
txnMetadata.producerEpoch)
        }
      }
    }
  }{noformat}
Based on this inputs we had found that time of issue reproducing is the time of 
ntp synchronization. 
 Our broker timer goes froward in comparison with ntp and have up to +2 seconds 
per 24 hour and ntp sync rollbacks this delta.

After disabling ntp sync issue was gone
 We had found that similar issue was already fixed previously but it does not 
cover all the cases https://issues.apache.org/jira/browse/KAFKA-5415
 There is one more place where timestamp comparison exists

kafka.coordinator.transaction.TransactionMetadata#completeTransitionTo
{noformat}
case Ongoing => // from addPartitions if (!validProducerEpoch(transitMetadata) 
|| !topicPartitions.subsetOf(transitMetadata.topicPartitions) || txnTimeoutMs 
!= transitMetadata.txnTimeoutMs || txnStartTimestamp > 
transitMetadata.txnStartTimestamp) { 
throwStateTransitionFailure(transitMetadata) } else { txnStartTimestamp = 
transitMetadata.txnStartTimestamp 
addPartitions(transitMetadata.topicPartitions) }
{noformat}
 

 

 


was (Author: oleksii.boiko):
Hi all,

We are facing same issue. The frequency of issue reproducing was increased 
after migrating to Kafka-streams 2.3
 For our case increase was caused by changing commit behaviour in 
Kafka-streams. Previously(we used 2.0.1), "commit offsets" was executed only in 
case when new record was consumed on source node but in version 2.3 "commit 
offset" executes on each "punctuate" call even no changes were made. We have 
punctuator with 1s wall-clock scheduler. As the the result commit offsets 
operations count was grown. Additionally to this we had detected that error(on 
broker side) which causes transctional id stuck

 
{noformat}
java.lang.IllegalStateException: TransactionalId <id> failed transition to 
state TxnTransitMetadata(producerId=61432, producerEpoch=0, txnTimeoutMs=60000, 
txnState=Ongoing, topicPartitions=Set(__consumer_offsets-10), 
txnStartTimestamp=1577934186261, txnLastUpdateTimestamp=1577934186261) due to 
unexpected metadata{noformat}
appears almost every day  at the same time and txnStartTimestamp of previous 
transaction state is younger than txnStartTimestamp of transaction target state

If I'm correct that means that transaction can not be transferred to "Ongoing" 
state and as the result it never expires due to only "Ongoing" transactions can 
be expired
 kafka.coordinator.transaction.TransactionStateManager#timedOutTransactions
{noformat}
def timedOutTransactions(): Iterable[TransactionalIdAndProducerIdEpoch] = {
    val now = time.milliseconds()
    inReadLock(stateLock) {
      transactionMetadataCache.filter { case (txnPartitionId, _) =>
        !leavingPartitions.exists(_.txnPartitionId == txnPartitionId)
      }.flatMap { case (_, entry) =>
        entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
          if (txnMetadata.pendingTransitionInProgress) {
            false
          } else {
            txnMetadata.state match {
              case Ongoing =>
                txnMetadata.txnStartTimestamp + txnMetadata.txnTimeoutMs < now
              case _ => false
            }
          }
        }.map { case (txnId, txnMetadata) =>
          TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.producerId, 
txnMetadata.producerEpoch)
        }
      }
    }
  }{noformat}
Based on this inputs we had detected that time of issue reproducing is the time 
of ntp synchronization. 
 Our broker timer goes froward in comparison with ntp and have up to +2 seconds 
per 24 hour and ntp sync rollbacks this delta.

After disabling ntp sync issue was gone
 We had found that similar issue was already fixed previously but it does not 
cover all the cases https://issues.apache.org/jira/browse/KAFKA-5415
 There is one more place where timestamp comparison exists

kafka.coordinator.transaction.TransactionMetadata#completeTransitionTo
{noformat}
case Ongoing => // from addPartitions if (!validProducerEpoch(transitMetadata) 
|| !topicPartitions.subsetOf(transitMetadata.topicPartitions) || txnTimeoutMs 
!= transitMetadata.txnTimeoutMs || txnStartTimestamp > 
transitMetadata.txnStartTimestamp) { 
throwStateTransitionFailure(transitMetadata) } else { txnStartTimestamp = 
transitMetadata.txnStartTimestamp 
addPartitions(transitMetadata.topicPartitions) }
{noformat}
 

 

 

> Stream will not start due to TimeoutException: Timeout expired after 
> 60000milliseconds while awaiting InitProducerId
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8803
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8803
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Raman Gupta
>            Assignee: Boyang Chen
>            Priority: Major
>         Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask                : task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 60000milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to