[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-09 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947388#comment-16947388
 ] 

Jiangjie Qin commented on FLINK-14302:
--

[~tonywei] Ah, you are right. I apparently misunderstood the behavior of Scala 
Either here. 

I just left some ideas on how to test this case in the PR.

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-08 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947309#comment-16947309
 ] 

Wei-Che Wei commented on FLINK-14302:
-

Hi [~becket_qin] 

I think the error was from `txnManager.getTransactionState(transactionalId)`.

You are right. Maybe we can report this issue back to kafka community as well. 
For now, could we conclude the temporary solution that we should prevent from 
committing empty transaction? I have opened a pull request already. Could you 
help me review it and give me advice? I had a problem about how to add an 
integration test on it. Thanks.

[1] 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L239]

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-08 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947123#comment-16947123
 ] 

Jiangjie Qin commented on FLINK-14302:
--

[~tonywei] Thanks for the explanation. I am still a little confused. From the 
source code it looks the log I mentioned should be printed in [1], but it looks 
that this logging should never see NOT_COORDINATOR error code.

In any case, I think it is a good idea to ensure on the Flink side we are not 
committing empty transactions. However, technically speaking, the Kafka brokers 
should not assume the behavior of the clients because there could be 3rd party 
Kafka clients implementations. So I think there should be a fix on the Kafka 
side as well. But that might be a protocol change to add an error field in 
{{AddPartitionsToTxnResponse}}. We can leave it as is for now.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L268]

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-06 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945515#comment-16945515
 ] 

Wei-Che Wei commented on FLINK-14302:
-

Hi [~becket_qin] 

The root cause of NOT_COORDINATOR is not from empty  partition list.

It is due to that broker who host the original transaction coordinator was 
restarted. The error should be propagate to client to refetch new coordinator, 
but the error will be bind with each element of partition list. Since this list 
was empty, the error got lost in response message. And that lead to the 
following commit failed. 

For more information, you can refer to my replys in the mailing thread. Thanks. 

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-06 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945485#comment-16945485
 ] 

Jiangjie Qin commented on FLINK-14302:
--

[~tonywei] Really sorry for the late response. I was on a business trip and 
just saw this ping. Thanks for keeping debugging this. The current way Flink 
constructs the KafkaProducer on recovery is indeed a little flaky. I am not 
sure if the NOT_COORDINATOR error code was caused by empty partition list in 
AddPartitionsToTxnRequest.

More specifically, I am not sure why the following log was printed. 
 
*kafka-broker-1:*
{quote} [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] 
Initialized transactionalId map -> Sink: 
sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 
1 on partition __transaction_state-37 
(kafka.coordinator.transaction.TransactionCoordinator){quote}
{quote}*[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning 
NOT_COORDINATOR error code to client for map -> Sink: 
sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request 
(kafka.coordinator.transaction.TransactionCoordinator)*
[2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting append 
of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR 
error to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's 
EndTransaction request 
(kafka.coordinator.transaction.TransactionCoordinator){quote}
Can you double check the version of the Kafka broker? I'd like to reproduce the 
problem if possible.

Thanks.

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-01 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941640#comment-16941640
 ] 

Wei-Che Wei commented on FLINK-14302:
-

Sorry for that I didn't notice that the contribution flow has changed. And I 
already sent a PR for this issue, but couldn't assign it to myself.

Hi [~jqin],
If you have free time, please help me to verify if the issue in mailing list 
thread I mentioned is a bug, and should it be fix like I suggested?
If this is a bug, and the solution is good to you, please assign this Jira 
issue to me. Thanks.

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)