Hi Becket,

I have read kafka source code and found that the error won't be propagated
to client if the list of
topic-partition is empty [1], because it bind the error with each
topic-partition. If this list is empty,
then that error won't be packaged into response body. That made the client
didn't get the error
message to find the newer coordinator.

Back to this problem, I think the original design of kafka client might not
prefer to execute
`enqueueNewPartitions` if there is no added topic-partition. It might be a
bug here, and we should
first check if `newPartitionsInTransaction` list is empty before executing
`enqueueNewPartitions`
function. Am I right?

If it can be confirmed as a bug, I would like to submit my patch to fix it.
Thanks for your help.

Best,
Tony Wei

[1]
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L2042

Tony Wei <tony19920...@gmail.com> 於 2019年9月20日 週五 下午2:57寫道:

> Hi,
>
> I found that the source code [1] in kafka showed that it always check if
> `newPartitionsInTransaction`
> is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`,
> that is not
> applied to flink kafka producer code [2].
>
> I wrote a simple producer with the `flushNewPartitions` copied from flink
> kafka producer, and
> successfully reproduce this exception. Then, I modified the logic in
> `enqueueNewPartitions` to check
> if there is any `newPartitionsInTransaction` before make this request. And
> this would work well even
> if I restarted the broker who owned this transaction's coordinator, since
> the empty transaction won't
> make any request to server.
>
> The attachments are my simple producer code. Please help to verify what I
> thought is correct. Thanks.
>
> Best,
> Tony Wei
>
> [1]
> https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316
> [2]
> https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273
>
> Tony Wei <tony19920...@gmail.com> 於 2019年9月20日 週五 上午11:56寫道:
>
>> Hi,
>>
>> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I
>> opened
>> flink's log level to DEBUG for producer. And I found some logs from flink
>> side
>> regarding this error. Below is some log snippet.
>>
>> It seems that producer client didn't catch this error and retry to find
>> new coordinator.
>> This caused the transaction state is inconsistent between client side and
>> server side.
>> Would it be possible that the problem is caused
>> by FlinkKafkaInternalProducer using
>> java reflection to send `addPartitionsToTransactionHandler` request in
>> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who
>> is familiar
>> with both kafka and flink's kafka connector could help me solve this?
>> Thanks very much.
>>
>> The attachment is my code to reproduce this problem.
>> The cluster's versions are the same as I mentioned in my first email.
>>
>> Best,
>> Tony Wei
>>
>> *flink taskmanager:*
>>
>>> 2019-09-20 02:32:45,927 INFO
>>>  
>>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>>>  - Flushing new partitions
>>> 2019-09-20 02:32:45,927 DEBUG
>>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
>>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>>> partitions=[])
>>>
>> 2019-09-20 02:32:45,931 DEBUG
>>> org.apache.kafka.clients.producer.internals.Sender            - [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
>>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>>> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
>>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>>>                    - [Producer clientId=producer-29, transactionalId=map ->
>>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
>>> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]}
>>> with correlation id 12 to node 1
>>> 2019-09-20 02:32:45,937 DEBUG
>>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions []
>>> to transaction
>>
>>
>> *kafka-broker-1:*
>>
>>>  [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1]
>>> Initialized transactionalId map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer
>>> epoch 1 on partition __transaction_state-37
>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>
>> [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning
>>> NOT_COORDINATOR error code to client for map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request
>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting
>>> append of COMMIT to transaction log with coordinator and returning
>>> NOT_COORDINATOR error to client for map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request
>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>
>>
>>
>>
>> Tony Wei <tony19920...@gmail.com> 於 2019年9月19日 週四 下午6:25寫道:
>>
>>> Hi Becket,
>>>
>>> I found that those transactions were tend to be failed
>>> with InvalidTxnStateException if
>>> they never sent any records but committed after some brokers being
>>> restarted.
>>>
>>> Because the error state transition always failed from EMPTY to COMMIT, I
>>> run a
>>> job with only one parallelism with or without output to Kafka. I tried
>>> to restart brokers
>>> and see what happened on these two situations and found that I couldn't
>>> make job failed
>>> when job continuously emitted output to Kafka, but it could fail when it
>>> didn't send any
>>> output to Kafka.
>>>
>>> I'm not familiar with FlinkKafkaProducer's behavior. I tried to use
>>> kafka java producer
>>> to reproduce the exception, but it worked well. Maybe my observation is
>>> not correct,
>>> but the experiment result seems like that. Do you have any thoughts on
>>> this?
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Tony Wei <tony19920...@gmail.com> 於 2019年9月19日 週四 上午11:08寫道:
>>>
>>>> Hi Becket,
>>>>
>>>> One more thing, I have tried to restart other brokers without active
>>>> controller, but
>>>> this exception might happen as well. So it should be independent  of
>>>> the active
>>>> controller like you said.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> Tony Wei <tony19920...@gmail.com> 於 2019年9月18日 週三 下午6:14寫道:
>>>>
>>>>> Hi Becket,
>>>>>
>>>>> I have reproduced this problem in our development environment. Below
>>>>> is the log message with debug level.
>>>>> Seems that the exception was from broker-3, and I also found other
>>>>> error code in broker-2 during the time.
>>>>>
>>>>> There are others INVALID_TXN_STATE error for other transaction id. I
>>>>> just list one of them. Above log messages only
>>>>> shows message with
>>>>> `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before
>>>>> `2019-09-18 07:14`.
>>>>>
>>>>> I didn't see other information to find out why producer tried to make
>>>>> transaction state from EMPTY to COMMIT, and what
>>>>> made NOT_COORDINATOR happened. Do you have any thought about what's
>>>>> happening? Thanks.
>>>>>
>>>>> *Number of Kafka brokers: 3*
>>>>> *logging config for kafka:*
>>>>>
>>>>>>
>>>>>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>>>>>>
>>>>>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
>>>>>>
>>>>>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
>>>>>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p
>>>>>> %m (%c)%n
>>>>>> log4j.appender.transactionAppender.MaxFileSize=10MB
>>>>>> log4j.appender.transactionAppender.MaxBackupIndex=10
>>>>>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
>>>>>> log4j.additivity.kafka.coordinator.transaction=true
>>>>>>
>>>>>
>>>>>
>>>>> *flink-ui*
>>>>>>
>>>>>> Timestamp: 2019-09-18, 07:13:43
>>>>>>
>>>>>
>>>>>
>>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing
>>>>>> one of transactions failed, logging first encountered failure
>>>>>>     at
>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>>>>     at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>>>     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>>>>>>     ... 5 more
>>>>>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>>>>> The producer attempted a transactional operation in an invalid state
>>>>>>
>>>>>
>>>>>
>>>>> *broker-3*
>>>>>>
>>>>>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
>>>>>> TransactionalId: blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
>>>>>> received transaction marker result to send: COMMIT
>>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3]
>>>>>> Aborting append of COMMIT to transaction log with coordinator and 
>>>>>> returning
>>>>>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction 
>>>>>> request
>>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
>>>>>> TransactionalId: blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
>>>>>> received transaction marker result to send: COMMIT
>>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
>>>>>> Aborting append of COMMIT to transaction log with coordinator and 
>>>>>> returning
>>>>>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction 
>>>>>> request
>>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]:
>>>>>> Updating blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=4, 
>>>>>> txnTimeoutMs=5400000,
>>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>>> txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for
>>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>>>
>>>>>
>>>>> *broker-2*
>>>>>
>>>>>> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]:
>>>>>> Updating blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>>> TxnTransitMetadata(producerId=7019, produc
>>>>>> erEpoch=0, txnTimeoutMs=5400000, txnState=Empty,
>>>>>> topicPartitions=Set(), txnStartTimestamp=-1,
>>>>>> txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for
>>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e6
>>>>>> 0de7e4744f3307058f865-7 succeeded
>>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>>> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]:
>>>>>> Updating blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=1, 
>>>>>> txnTimeoutMs=5400000,
>>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>>> txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for
>>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>>> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]:
>>>>>> Updating blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=2, 
>>>>>> txnTimeoutMs=5400000,
>>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>>> txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for
>>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>>> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]:
>>>>>> Updating blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=3, 
>>>>>> txnTimeoutMs=5400000,
>>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>>> txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for
>>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>>> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2]
>>>>>> Returning NOT_COORDINATOR error code to client for blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions 
>>>>>> request
>>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2]
>>>>>> Aborting append of COMMIT to transaction log with coordinator and 
>>>>>> returning
>>>>>> NOT_COORDINATOR error to client for blacklist -> Sink:
>>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction 
>>>>>> request
>>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>>
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>>
>>>>> Becket Qin <becket....@gmail.com> 於 2019年9月2日 週一 下午10:03寫道:
>>>>>
>>>>>> Hi Tony,
>>>>>>
>>>>>> From the symptom it is not quite clear to me what may cause this
>>>>>> issue. Supposedly the TransactionCoordinator is independent of the active
>>>>>> controller, so bouncing the active controller should not have special
>>>>>> impact on the transactions (at least not every time). If this is stably
>>>>>> reproducible, is it possible to turn on debug level logging
>>>>>> on kafka.coordinator.transaction.TransactionCoordinator to see what does
>>>>>> the broker say?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Has anyone run into the same problem? I have updated my producer
>>>>>>> transaction timeout to 1.5 hours,
>>>>>>> but the problem sill happened when I restarted broker with active
>>>>>>> controller. It might not due to the
>>>>>>> problem that checkpoint duration is too long causing transaction
>>>>>>> timeout. I had no more clue to find out
>>>>>>> what's wrong about my kafka producer. Could someone help me please?
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道:
>>>>>>>
>>>>>>>> Hi Tony,
>>>>>>>>
>>>>>>>> I'm sorry I cannot help you with this issue, but Becket (in CC)
>>>>>>>> might have an idea what went wrong here.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>>>>>>>> tony19920...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Currently, I was trying to update our kafka cluster with larger `
>>>>>>>>> transaction.max.timeout.ms`. The
>>>>>>>>> original setting is kafka's default value (i.e. 15 minutes) and I
>>>>>>>>> tried to set as 3 hours.
>>>>>>>>>
>>>>>>>>> When I was doing rolling-restart for my brokers, this exception
>>>>>>>>> came to me on the next checkpoint
>>>>>>>>> after I restarted the broker with active controller.
>>>>>>>>>
>>>>>>>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>>  at
>>>>>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>>>>>>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of 
>>>>>>>>>> transactions
>>>>>>>>>> failed, logging first encountered failure at
>>>>>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) 
>>>>>>>>>> ... 5
>>>>>>>>>> more Caused by: 
>>>>>>>>>> org.apache.kafka.common.errors.InvalidTxnStateException:
>>>>>>>>>> The producer attempted a transactional operation in an invalid state
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have no idea why it happened, and I didn't find any error log
>>>>>>>>> from brokers. Does anyone have
>>>>>>>>> this exception before? How can I prevent from this exception when
>>>>>>>>> I tried to restart kafka cluster?
>>>>>>>>> Does this exception mean that I will lost data in some of these
>>>>>>>>> transactions?
>>>>>>>>>
>>>>>>>>> flink cluster version: 1.8.1
>>>>>>>>> kafka cluster version: 1.0.1
>>>>>>>>> flink kafka producer version: universal
>>>>>>>>> producer transaction timeout: 15 minutes
>>>>>>>>> checkpoint interval: 5 minutes
>>>>>>>>> number of concurrent checkpoint: 1
>>>>>>>>> max checkpoint duration before and after the exception occurred:
>>>>>>>>> < 2 minutes
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>

Reply via email to