Sergey Ivanov created KAFKA-16837:
-------------------------------------
Summary: Kafka Connect fails on update connector for incorrect
previous Config Provider tasks
Key: KAFKA-16837
URL: https://issues.apache.org/jira/browse/KAFKA-16837
Project: Kafka
Issue Type: Bug
Components: connect
Affects Versions: 3.6.1, 3.5.1, 3.8.0
Reporter: Sergey Ivanov
Hello,
We faced an issue when is not possible to update Connector config if the
*previous* task contains ConfigProvider's value with incorrect value that leads
to ConfigException.
I can provide simple Test Case to reproduce it with FileConfigProvider, but
actually any ConfigProvider is acceptable that could raise exception if
something wrong with config (like resource doesn't exist).
*Prerequisites:*
Kafka Connect instance with config providers:
{code:java}
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
1. Create Kafka topic "test"
2. On the KK instance create the file "/opt/kafka/provider.properties" with
content
{code:java}
topics=test
{code}
3. Create simple FileSink connector:
{code:java}
PUT /connectors/local-file-sink/config
{
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "/opt/kafka/test.sink.txt",
"topics": "${file:/opt/kafka/provider.properties:topics}"
}
{code}
4. Checks that everything works fine:
{code:java}
GET /connectors?expand=info&expand=status
...
"status": {
"name": "local-file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
}
],
"type": "sink"
}
}
}
{code}
Looks fine.
5. Renames the file to "/opt/kafka/provider2.properties".
6. Update connector with new correct file name:
{code:java}
PUT /connectors/local-file-sink/config
{
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "/opt/kafka/test.sink.txt",
"topics": "${file:/opt/kafka/provider2.properties:topics}"
}
{code}
Update {*}succeed{*}, got 200.
7. Checks that everything works fine:
{code:java}
{
"local-file-sink": {
"info": {
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"file": "/opt/kafka/test.sink.txt",
"tasks.max": "1",
"topics": "${file:/opt/kafka/provider2.properties:topics}",
"name": "local-file-sink"
},
"tasks": [
{
"connector": "local-file-sink",
"task": 0
}
],
"type": "sink"
},
"status": {
"name": "local-file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.10.10.10:8083",
"trace": "org.apache.kafka.common.errors.InvalidTopicException:
Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
}
],
"type": "sink"
}
}
}
{code}
Config has been updated, but new task has not been created. And as result
connector doesn't work.
It failed on:
{code:java}
[2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id=
][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
[Worker clientId=connect-1, groupId=streaming-service_streaming_service]
Failed to reconfigure connector's tasks (local-file-sink), retrying after
backoff.
org.apache.kafka.common.config.ConfigException: Could not read properties from
file /opt/kafka/provider.properties
at
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
at
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
at
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
at
org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
at
org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840) {code}
As I understand it happens, because on the connector update AbstractHerder
tries to update current tasks:
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]
and before do it, herder {+}tries to compare old task config and new one{+}.
But it doesn't compare original values, +it tries to get ConfigProvider
calculated value for previous task+ and failed as not possible to get file for
previous task, by ConfigProvider.
The main question *do we really need to compare ConfigProvider calculated*
values there instead of comparing original configs?
Now it leads to issues as lot of ConfigProviders usually raise Exception if
resource not found.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)