[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
[ https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947388#comment-16947388 ] Jiangjie Qin commented on FLINK-14302: -- [~tonywei] Ah, you are right. I apparently misunderstood the behavior of Scala Either here. I just left some ideas on how to test this case in the PR. > FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if > `newPartitionsInTransaction` is empty when enable EoS > --- > > Key: FLINK-14302 > URL: https://issues.apache.org/jira/browse/FLINK-14302 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As the survey in this mailing list thread [1], kafka server will bind the > error with topic-partition list when it handles `AddPartitionToTxnRequest`. > So when the request body contains no topic-partition, the error won't be sent > back to kafka producer client. Moreover, it producer internal api, it always > check if `newPartitionsInTransaction` is empty before sending > ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if > you need to explicitly call it in the first commit phase of two-phase commit > sink. > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
[ https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947309#comment-16947309 ] Wei-Che Wei commented on FLINK-14302: - Hi [~becket_qin] I think the error was from `txnManager.getTransactionState(transactionalId)`. You are right. Maybe we can report this issue back to kafka community as well. For now, could we conclude the temporary solution that we should prevent from committing empty transaction? I have opened a pull request already. Could you help me review it and give me advice? I had a problem about how to add an integration test on it. Thanks. [1] [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L239] > FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if > `newPartitionsInTransaction` is empty when enable EoS > --- > > Key: FLINK-14302 > URL: https://issues.apache.org/jira/browse/FLINK-14302 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As the survey in this mailing list thread [1], kafka server will bind the > error with topic-partition list when it handles `AddPartitionToTxnRequest`. > So when the request body contains no topic-partition, the error won't be sent > back to kafka producer client. Moreover, it producer internal api, it always > check if `newPartitionsInTransaction` is empty before sending > ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if > you need to explicitly call it in the first commit phase of two-phase commit > sink. > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
[ https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947123#comment-16947123 ] Jiangjie Qin commented on FLINK-14302: -- [~tonywei] Thanks for the explanation. I am still a little confused. From the source code it looks the log I mentioned should be printed in [1], but it looks that this logging should never see NOT_COORDINATOR error code. In any case, I think it is a good idea to ensure on the Flink side we are not committing empty transactions. However, technically speaking, the Kafka brokers should not assume the behavior of the clients because there could be 3rd party Kafka clients implementations. So I think there should be a fix on the Kafka side as well. But that might be a protocol change to add an error field in {{AddPartitionsToTxnResponse}}. We can leave it as is for now. [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L268] > FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if > `newPartitionsInTransaction` is empty when enable EoS > --- > > Key: FLINK-14302 > URL: https://issues.apache.org/jira/browse/FLINK-14302 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As the survey in this mailing list thread [1], kafka server will bind the > error with topic-partition list when it handles `AddPartitionToTxnRequest`. > So when the request body contains no topic-partition, the error won't be sent > back to kafka producer client. Moreover, it producer internal api, it always > check if `newPartitionsInTransaction` is empty before sending > ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if > you need to explicitly call it in the first commit phase of two-phase commit > sink. > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
[ https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945515#comment-16945515 ] Wei-Che Wei commented on FLINK-14302: - Hi [~becket_qin] The root cause of NOT_COORDINATOR is not from empty partition list. It is due to that broker who host the original transaction coordinator was restarted. The error should be propagate to client to refetch new coordinator, but the error will be bind with each element of partition list. Since this list was empty, the error got lost in response message. And that lead to the following commit failed. For more information, you can refer to my replys in the mailing thread. Thanks. > FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if > `newPartitionsInTransaction` is empty when enable EoS > --- > > Key: FLINK-14302 > URL: https://issues.apache.org/jira/browse/FLINK-14302 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As the survey in this mailing list thread [1], kafka server will bind the > error with topic-partition list when it handles `AddPartitionToTxnRequest`. > So when the request body contains no topic-partition, the error won't be sent > back to kafka producer client. Moreover, it producer internal api, it always > check if `newPartitionsInTransaction` is empty before sending > ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if > you need to explicitly call it in the first commit phase of two-phase commit > sink. > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
[ https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945485#comment-16945485 ] Jiangjie Qin commented on FLINK-14302: -- [~tonywei] Really sorry for the late response. I was on a business trip and just saw this ping. Thanks for keeping debugging this. The current way Flink constructs the KafkaProducer on recovery is indeed a little flaky. I am not sure if the NOT_COORDINATOR error code was caused by empty partition list in AddPartitionsToTxnRequest. More specifically, I am not sure why the following log was printed. *kafka-broker-1:* {quote} [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 1 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator){quote} {quote}*[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning NOT_COORDINATOR error code to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)* [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting append of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR error to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator){quote} Can you double check the version of the Kafka broker? I'd like to reproduce the problem if possible. Thanks. > FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if > `newPartitionsInTransaction` is empty when enable EoS > --- > > Key: FLINK-14302 > URL: https://issues.apache.org/jira/browse/FLINK-14302 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As the survey in this mailing list thread [1], kafka server will bind the > error with topic-partition list when it handles `AddPartitionToTxnRequest`. > So when the request body contains no topic-partition, the error won't be sent > back to kafka producer client. Moreover, it producer internal api, it always > check if `newPartitionsInTransaction` is empty before sending > ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if > you need to explicitly call it in the first commit phase of two-phase commit > sink. > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
[ https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941640#comment-16941640 ] Wei-Che Wei commented on FLINK-14302: - Sorry for that I didn't notice that the contribution flow has changed. And I already sent a PR for this issue, but couldn't assign it to myself. Hi [~jqin], If you have free time, please help me to verify if the issue in mailing list thread I mentioned is a bug, and should it be fix like I suggested? If this is a bug, and the solution is good to you, please assign this Jira issue to me. Thanks. > FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if > `newPartitionsInTransaction` is empty when enable EoS > --- > > Key: FLINK-14302 > URL: https://issues.apache.org/jira/browse/FLINK-14302 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Wei-Che Wei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > As the survey in this mailing list thread [1], kafka server will bind the > error with topic-partition list when it handles `AddPartitionToTxnRequest`. > So when the request body contains no topic-partition, the error won't be sent > back to kafka producer client. Moreover, it producer internal api, it always > check if `newPartitionsInTransaction` is empty before sending > ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if > you need to explicitly call it in the first commit phase of two-phase commit > sink. > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)