Jun Qin created FLINK-16419:
-------------------------------

             Summary: Avoid to recommit transactions which are known committed 
successfully to Kafka upon recovery
                 Key: FLINK-16419
                 URL: https://issues.apache.org/jira/browse/FLINK-16419
             Project: Flink
          Issue Type: Improvement
            Reporter: Jun Qin


When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
tries to recommit all pre-committed transactions which are in the snapshot, 
even if those transactions were successfully committed before (i.e., the call 
to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
returns OK). This may lead to recovery failures when recovering from a very old 
snapshot because the transactional IDs in that snapshot have been expired and 
removed from Kafka.  For example the following scenario:
 # Start a Flink job with FlinkKafkaProducer sink with exactly-once
 # Suspend the Flink job with a savepoint A
 # Wait for time longer than {{transactional.id.expiration.ms}} + 
{{transaction.remove.expired.transaction.cleanup.interval.ms}}
 # Recover the job with savepoint A.
 # The recovery will fail with the following error:

{noformat}
2020-02-26 14:33:25,817 INFO  
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer 
 - Attempting to resume transaction Source: Custom Source -> Sink: 
Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata              
              - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           - [Producer clientId=producer-1, transactionalId=Source: Custom 
Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
producer with timeoutMillis = 92233720
36854775807 ms.
2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task         
           - Source: Custom Source -> Sink: Unnamed (1/1) 
(a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
        at java.lang.Thread.run(Thread.java:748)
{noformat}
After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
way is to let JobManager, after successfully notifies all operators the 
completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
success, e.g., write the successful transactional IDs somewhere in the 
snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to