[
https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800557#comment-17800557
]
Angelos Kaltsikis commented on KAFKA-16047:
-------------------------------------------
Yes sir. Will do so and ask for a review. Btw thanks for the extra context 👌🏽
> Source connector with EOS enabled have some InitProducerId requests timing
> out, effectively failing all the tasks & the whole connector
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-16047
> URL: https://issues.apache.org/jira/browse/KAFKA-16047
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect, mirrormaker
> Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2,
> 3.6.1
> Reporter: Angelos Kaltsikis
> Priority: Major
>
> Source Connectors with 'exactly.once.support = required' may have some of
> their tasks that issue InitProducerId requests from the admin client timeout.
> In the case of MirrorSourceConnector, which was the source connector that i
> found the bug, the bug was effectively making all the tasks (in the specific
> case of) become "FAILED". As soon as one of the tasks gets FAILED due to the
> 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many
> restarts i did to the connector/tasks, i couldn't get the
> MirrorSourceConnector in a healthy RUNNING state again.
> Due to the low timeout that has been [hard-coded in the
> code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
> (1ms), there is a chance that the `InitProducerId` requests timeout in case
> of "slower-than-expected" Kafka brokers (that do not process & respond to the
> above request in <= 1ms). (feel free to read more information about the issue
> in the "More Context" section below)
> [~ChrisEgerton] I would appreciate it if you could respond to the following
> questions
> - How and why was the 1ms magic number for transaction timeout has to be
> chosen?
> - Is there any specific reason that it can be guaranteed that the
> `InitProducerId` request can be processed in such a small time window?
> - I have tried the above in multiple different Kafka clusters that are hosted
> in different underlying datacenter hosts and i don't believe that those
> brokers are "slow" for some reason. If you feel that the brokers are slower
> than expected, i would appreciate any pointers on how could i find out what
> is the bottleneck
> h3. Temporary Mitigation
> I have increased the timeout to 1000ms (randomly picked this number, just
> wanted to give enough time to brokers to always complete those type of
> requests). It fix can be found in my fork
> https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
>
> h3. Final solution
> The temporary mitigation is not ideal, as it still randomly picks a timeout
> for such an operation which may high enough but it's not ensured that it will
> always be high enough. Shall we introduce something client configurable ?
> At the same time, i was thinking whether it makes sense to introduce some
> tests that simulate slower than the "blazing" fast mocked brokers that exist
> in Unit Tests, so as to be able to catch this type of low timeouts that
> potentially make some software features not usable.
> h3. What is affected
> The above bug exists in MirrorSourceConnector Tasks running in distributed
> Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode
> enabled (pre-requisite for the exactly.once.support to work). I believe this
> should be true for other SourceConnectors as well (as the code-path that was
> the one to blame is Connect specific & not MirrorMaker specific).
> h3. More context & logs
> *Connector Logs*
> {code:java}
> Caused by: java.util.concurrent.CompletionException:
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node
> assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
> {code}
> *Broker Logs*
> {code:java}
> [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning
> COORDINATOR_NOT_AVAILABLE error code to client for
> kafka-connect-uat-mm2-msc-20th-7's InitProducerId request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]:
> TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for
> TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2,
> lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed
> due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty),
> aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the
> callback (kafka.coordinator.transaction.TransactionStateManager)
> {code}
> h3. How to reproduce it
> While the bug exists in both the Standalone MM2 deployment, it's easier to
> reproduce it via deploying the connector to a Kafka Connect cluster (as it is
> possible to update the config/delete/restart/pause/stop/resume via the Kafka
> Connect REST API)
> Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with
> `exactly.once.source.support = enabled`) and after the initial start, update
> it's configuration or restart the connector & tasks.
> To test whether my fork has fixed the issue once and for good i have created
> the following script, which constantly restarts the connector every few
> seconds (after it's tasks get in RUNNING state). I have been running the
> scripts for a few hours and the MirrorSourceConnector never got in a state
> that was non recoverable (as it was happening on the upstream versions)
> {code:java}
> #!/bin/bash
> # Source vars
> source /<path>/connect.sh
> # Kafka Connect API endpoint
> KAFKA_CONNECT_API=$KAFKA_CONNECT_URL
> # Kafka Connect connector name
> CONNECTOR_NAME="<connector_name>"
> while true; do
> # Fetch the connector status
> connector_status=$(curl -k -u
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s
> "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status")
> # Check if connector is in FAILED state
> if echo "$connector_status" | grep -q '"state":"FAILED"'; then
> echo "Connector has failed. Exiting."
> exit 1
> fi
> # Fetch and check all task statuses
> task_statuses=$(echo "$connector_status" | jq '.tasks[].state')
> all_running=true
> for status in $task_statuses; do
> if [ "$status" != '"RUNNING"' ]; then
> all_running=false
> break
> fi
> done
> # If all tasks and the connector are RUNNING, restart them after 90
> seconds
> if $all_running; then
> echo "All tasks are running. Restarting in 90 seconds."
> sleep 90
> date;curl -k -X POST -H "Content-Type: application/json" -u
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD
> $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true
> else
> echo "Not all tasks are running. Checking again..."
> fi
> # Sleep for a while before checking again
> sleep 10
> done
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)