[ 
https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800554#comment-17800554
 ] 

Greg Harris commented on KAFKA-16047:
-------------------------------------

I found this code-review comment thread about this timeout: 
[https://github.com/apache/kafka/pull/2849#discussion_r111528120] There was 
originally a comment explicitly pointing out that the transaction timeout was 
used, but without any substantive reasoning why. The same concern about the 
excessive waiting on the broker side was raised, but as the default was 60s, no 
action appears to have been taken.

As this code is so old (2017/0.11) and this problem only affects extremely low 
transaction timeouts, I think we should opt for the client-side fix to increase 
the transaction timeout to the request timeout, rather than hardcoding 1ms.

[~akaltsikis] Are you interested in assigning this to yourself and opening a PR 
to fix it? You can tag me as a reviewer.

> Source connector with EOS enabled have some InitProducerId requests timing 
> out, effectively failing all the tasks & the whole connector
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16047
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16047
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect, mirrormaker
>    Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 
> 3.6.1
>            Reporter: Angelos Kaltsikis
>            Priority: Major
>
> Source Connectors with 'exactly.once.support = required' may have some of 
> their tasks that issue InitProducerId requests from the admin client timeout. 
> In the case of MirrorSourceConnector, which was the source connector that i 
> found the bug, the bug was effectively making all the tasks (in the specific 
> case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 
> 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many 
> restarts i did to the connector/tasks, i couldn't get the 
> MirrorSourceConnector in a healthy RUNNING state again.
> Due to the low timeout that has been [hard-coded in the 
> code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
>  (1ms), there is a chance that the `InitProducerId` requests timeout in case 
> of "slower-than-expected" Kafka brokers (that do not process & respond to the 
> above request in <= 1ms). (feel free to read more information about the issue 
> in the "More Context" section below)
> [~ChrisEgerton] I would appreciate it if you could respond to the following 
> questions
> - How and why was the 1ms magic number for transaction timeout has to be 
> chosen?
> - Is there any specific reason that it can be guaranteed that the 
> `InitProducerId` request can be processed in such a small time window? 
> - I have tried the above in multiple different Kafka clusters that are hosted 
> in different underlying datacenter hosts and i don't believe that those 
> brokers are "slow" for some reason. If you feel that the brokers are slower 
> than expected, i would appreciate any pointers on how could i find out what 
> is the bottleneck
> h3. Temporary Mitigation
> I have increased the timeout to 1000ms (randomly picked this number, just 
> wanted to give enough time to brokers to always complete those type of 
> requests). It fix can be found in my fork 
> https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
>  
> h3. Final solution
> The temporary mitigation is not ideal, as it still randomly picks a timeout 
> for such an operation which may high enough but it's not ensured that it will 
> always be high enough. Shall we introduce something client configurable ?
> At the same time, i was thinking whether it makes sense to introduce some 
> tests that simulate slower than the "blazing" fast mocked brokers that exist 
> in Unit Tests, so as to be able to catch this type of low timeouts that 
> potentially make some software features not usable.
> h3. What is affected
> The above bug exists in MirrorSourceConnector Tasks running in distributed 
> Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode 
> enabled (pre-requisite for the exactly.once.support to work). I believe this 
> should be true for other SourceConnectors as well (as the code-path that was 
> the one to blame is Connect specific & not MirrorMaker specific).
> h3. More context & logs
> *Connector Logs*
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
> {code}
> *Broker Logs*
> {code:java}
> [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning 
> COORDINATOR_NOT_AVAILABLE error code to client for 
> kafka-connect-uat-mm2-msc-20th-7's InitProducerId request 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: 
> TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for 
> TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, 
> lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), 
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed 
> due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), 
> aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the 
> callback (kafka.coordinator.transaction.TransactionStateManager)
> {code}
> h3. How to reproduce it
> While the bug exists in both the Standalone MM2 deployment, it's easier to 
> reproduce it via deploying the connector to a Kafka Connect cluster (as it is 
> possible to update the config/delete/restart/pause/stop/resume via the Kafka 
> Connect REST API)
> Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with 
> `exactly.once.source.support = enabled`) and after the initial start, update 
> it's configuration or restart the connector & tasks. 
> To test whether my fork has fixed the issue once and for good i have created 
> the following script, which constantly restarts the connector every few 
> seconds (after it's tasks get in RUNNING state). I have been running the 
> scripts for a few hours and the MirrorSourceConnector never got in a state 
> that was non recoverable (as it was happening on the upstream versions)
> {code:java}
> #!/bin/bash
> # Source vars
> source /<path>/connect.sh
> # Kafka Connect API endpoint
> KAFKA_CONNECT_API=$KAFKA_CONNECT_URL
> # Kafka Connect connector name
> CONNECTOR_NAME="<connector_name>"
> while true; do
>     # Fetch the connector status
>     connector_status=$(curl -k -u 
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s 
> "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status")
>     # Check if connector is in FAILED state
>     if echo "$connector_status" | grep -q '"state":"FAILED"'; then
>         echo "Connector has failed. Exiting."
>         exit 1
>     fi
>     # Fetch and check all task statuses
>     task_statuses=$(echo "$connector_status" | jq '.tasks[].state')
>     all_running=true
>     for status in $task_statuses; do
>         if [ "$status" != '"RUNNING"' ]; then
>             all_running=false
>             break
>         fi
>     done
>     # If all tasks and the connector are RUNNING, restart them after 90 
> seconds
>     if $all_running; then
>         echo "All tasks are running. Restarting in 90 seconds."
>         sleep 90
>         date;curl -k -X POST -H "Content-Type: application/json" -u 
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD 
> $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true
>     else
>         echo "Not all tasks are running. Checking again..."
>     fi
>     # Sleep for a while before checking again
>     sleep 10
> done
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to