[
https://issues.apache.org/jira/browse/KAFKA-20602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084576#comment-18084576
]
Bill Bejeck commented on KAFKA-20602:
-------------------------------------
Hi [~pavelzeger] I've assigned this ticket to you and added the contributor
role to your profile so you can self-assign tickets in the future.
> DLQ topic creation has 3 issues: indefinite hang, wrong log level, interrupt
> not restored
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-20602
> URL: https://issues.apache.org/jira/browse/KAFKA-20602
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Reporter: Pavel Zeger
> Priority: Major
>
> `DeadLetterQueueReporter.createAndSetup` can hang indefinitely on broker
> outage, logs expected DLQ creation as ERROR, and does not restore interrupt
> status
> *Where*
> `connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java`,
> lines 77-98:
>
> {code:java}
> public static DeadLetterQueueReporter createAndSetup(Map<String, Object>
> adminProps,
> ConnectorTaskId id,
> SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,
> ErrorHandlingMetrics
> errorHandlingMetrics) { {code}
> {*}Issue 1{*}: indefinite hang on broker outage
> Both `admin.listTopics().names().get()` and
> `admin.createTopics(...).all().get()` are unbounded `.get()` calls. If the
> broker is unreachable:
> - The `AdminClient` retries internally until its own `default.api.timeout.ms`
> (default **60 seconds**), then `.get()` returns the failure as
> `ExecutionException`.
> - But the `try-with-resources` already started — the `Admin.create()` itself
> blocks on the producer's bootstrap.
> So on cold-broker startup, **every connector that has DLQ enabled
> blocks for ~60s before its task starts**. For a worker with N such
> connectors, this serializes — N × 60s startup delay.
> There's a documented Connect best practice of "set short
> admin.default.api.timeout.ms for fast startup," but the `adminProps` map here
> is constructed from sink config and may not
> include that override. Add an explicit `.get(timeout, unit)` to upper-bound
> the wait regardless:
> {code:java}
> admin.listTopics().names().get(30, TimeUnit.SECONDS); {code}
> {*}Issue 2{*}: `log.error` for an expected condition
> {code:java}
> log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
> {code}
> This isn't an error. The next line creates the topic. The log message even
> says "will attempt to create" - the code is *expecting* the topic to not
> exist on first startup. Should be `log.info`. This shows up in operator
> dashboards as "ERROR" alerts on every
> first start of a connector with DLQ enabled. Pure noise.
>
> {*}Issue 3{*}: `InterruptedException` thrown without restoring flag
> {code:java}
> } catch (InterruptedException e) {
> thrownew ConnectException("Could not initialize dead letter queue with
> topic=" + topic, e);
> } {code}
> The Java idiom: when you catch `InterruptedException` and don't re-throw it
> as-is, you must call `Thread.currentThread().interrupt()` to preserve the
> signal for upstream code. Currently the `ConnectException` propagates but the
> interrupt flag is cleared,
> so any subsequent blocking call on this thread doesn't know it was
> interrupted.
>
> *Proposed fix*
> Four changes:
> 1. Bounded `.get(timeoutMs, ...)` on both admin calls.
> 2. New explicit `catch (TimeoutException e)` for the timeout case.
> 3. `log.error` → `log.info`, and include the topic config in the
> message.
> 4. Restore the interrupt flag before re-throwing.
> *KIP needed?*
> No
--
This message was sent by Atlassian Jira
(v8.20.10#820010)