[
https://issues.apache.org/jira/browse/KAFKA-19758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028157#comment-18028157
]
Mickael Maison commented on KAFKA-19758:
----------------------------------------
After some investigations, it looks like KIP-891 broke plugin isolation. For
example if we have the same transformation under multiple directories, when
Connect tries to execute it, it doesn't call apply with the classloader from
the right connector.
The issue seems to be in DelegatingClassLoader.findPluginLoader() where it
always loops through all directories to find the transformation (or predicate)
and keeps the last instance. It then uses the classloader of that instance
which may not be the same one as the connector.
Adding custom tracing I see:
{noformat}
ConnectorConfig.getTransformationOrPredicate
connector=io.debezium.connector.postgresql.PostgresConnector
connectorRange=null range=[3.4.0-SNAPSHOT,3.4.0-SNAPSHOT]
ConnectorConfig.getTransformationOrPredicate
classloader=PluginClassLoader{pluginLocation=file:/tmp/plugins/debezium-connector-postgres/}
OpenLineage instantiated
io.debezium.transforms.openlineage.OpenLineage@2675b74a from
/tmp/plugins/debezium-connector-sqlserver/debezium-core-3.4.0-SNAPSHOT.jar
classloader null
PluginClassLoader{pluginLocation=file:/tmp/plugins/debezium-connector-sqlserver/}
{noformat}
The connector is running from
PluginClassLoader{pluginLocation=file:/tmp/plugins/debezium-connector-postgres/}
and trying to run its transformation using
PluginClassLoader{pluginLocation=file:/tmp/plugins/debezium-connector-sqlserver/}.
The OpenLineage transformation exists in both directories but the
DelegatingClassLoader.findPluginLoader() fails to find the appropriate one.
We should clarify the expected behavior when a plugins exists under multiple
directories. Having multiple times the same connector with the same version
should probably be rejected as from the connector configuration, where you have
the name and version, you can't decide which one to run. On the other hands,
the other connector plugins can exists in multiple copies as long as they are
in different directories, thus isolated, as they are always associated with a
connector so we can identify a preferred copy.
Another inconsistency found while looking at this is that we have default
versions for transformations and predicates but the other plugins default to
null. For example, the connector configuration submitted via the REST API does
not include any version but the computed configuration injects 3.4.0-SNAPSHOT
for the transformation.
{noformat}
config.action.reload = restart
connector.class = io.debezium.connector.postgresql.PostgresConnector
connector.plugin.version = null
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
header.converter.plugin.version = null
key.converter = null
key.converter.plugin.version = null
name = inventory-connector-postgres
predicates = []
tasks.max = 1
tasks.max.enforce = true
transforms = [openlineage]
transforms.openlineage.negate = false
transforms.openlineage.plugin.version = 3.4.0-SNAPSHOT
transforms.openlineage.predicate = null
transforms.openlineage.type = class
io.debezium.transforms.openlineage.OpenLineage
value.converter = null
value.converter.plugin.version = null
{noformat}
> Weird behavior on Kafka Connect 4.1 class loading
> -------------------------------------------------
>
> Key: KAFKA-19758
> URL: https://issues.apache.org/jira/browse/KAFKA-19758
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 4.1.0
> Reporter: Mario Fiore Vitale
> Assignee: Mickael Maison
> Priority: Blocker
> Attachments: connect-service.log
>
>
> I have the
> [DebeziumOpenLineageEmitter|https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java]
> class in the *debezium-openlineage-api* that internally has a static map to
> maintain the registered emitter, the key of this map is
> "connectoLogicalName-taskid"
> Then there is the [OpenLineage
> SMT|https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java],
> which is part of the *debezium-core.* In this SMT, I simply pass the same
> context to instantiate the same emitter via the connector.
> Now I'm running the following image
> {code:java}
> FROM quay.io/debezium/connect:3.3.0.Final
> ENV MAVEN_REPO="https://repo1.maven.org/maven2"
> ENV GROUP_ID="io/debezium"
> ENV DEBEZIUM_VERSION="3.3.0.Final"
> ENV ARTIFACT_ID="debezium-openlineage-core"
> ENV CLASSIFIER="-libs"
> COPY log4j.properties /kafka/config/log4j.properties
> Add OpenLineage
> RUN mkdir -p /tmp/openlineage-libs && \
> curl
> "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
> -o /tmp/debezium-openlineage-core-libs.tar.gz && \
> tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C
> /tmp/openlineage-libs --strip-components=1
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-postgres/
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-mongodb/
> ADD openlineage.yml /kafka/ {code}
> So is practically debezium connect image with just openlineage jars copied
> into postgres and mongodb connector folders.
> When I register the PostgreSQL connector
> {code:java}
> {
> "name": "inventory-connector-postgres",
> "config": {
> "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
> "tasks.max": "1",
> "database.hostname": "postgres",
> "database.port": "5432",
> "database.user": "postgres",
> "database.password": "postgres",
> "database.server.id": "184054",
> "database.dbname": "postgres",
> "topic.prefix": "inventory",
> "snapshot.mode": "initial",
> "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
> "schema.history.internal.kafka.topic": "schema-changes.inventory",
> "slot.name": "postgres",
> "openlineage.integration.enabled": "true",
> "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
> "openlineage.integration.job.description": "This connector does cdc for
> products",
> "openlineage.integration.tags": "env=prod,team=cdc",
> "openlineage.integration.owners": "Mario=maintainer,John Doe=Data
> scientist,IronMan=superero",
> "transforms": "openlineage",
> "transforms.openlineage.type":
> "io.debezium.transforms.openlineage.OpenLineage"
> }
> } {code}
>
> I get the following error
> {code:java}
> 2025-10-03T14:22:09,761 ERROR ||
> WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught
> and unrecoverable exception. Task is being killed and will not recover until
> manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error
> handler
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
> ~[connect-runtime-4.1.0.jar:?]
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243)
> ~[connect-runtime-4.1.0.jar:?]
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
> ~[connect-runtime-4.1.0.jar:?]
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> ~[?:?]
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
> ~[?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[?:?]
> at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not
> initialized for connector ConnectorContext[connectorLogicalName=inventory,
> connectorName=postgresql, taskId=0, version=null, config=null]. Call init()
> first.
> at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
> at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
> at
> io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74)
> ~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
> at
> org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
> ~[connect-runtime-4.1.0.jar:?]
> ... 13 more {code}
> Full logs [^connect-service.log]
>
> This is evidence that the emitters map is not shared between the connector
> and the SMT.
> The situation becomes weirder if I remove all connectors from the image
> except PostgreSQL and MongoDB.
> In that case, the PostgreSQL connector works perfectly.
> The plugins are in the folder */kafka/connect* (that is, the only
> `plugin.path` configured folder), each under a dedicated folder with their
> dependencies.
> I then started to add more connectors, and it continued to work until I added
> the SQL Server connector.
> To summarize, the problem arises when I put one or all of [sqlserver,
> spanner,vitess].
>
> The commonality for these connectors seems to be that they support
> multi-task. The others don't.
> Am I correct that Kafka Connect guarantees that each connector is loaded with
> an isolated class loader with its dependencies so that the static emitters
> should be shared between the Connector and the SMT?
> To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all
> connectors, it works fine.
> I did other tests, and things are more and more weird. All tests were done
> with *{{plugin.path=/kafka/connect}}* and *KC 4.1*
> My original tests were with this directory structure
>
> {code:java}
> /kafka/connect
> |___ debezium-connector-postgres
> |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>
> In this case, each connector should be isolated from each others (having a
> dedicated class loader). In that case, the sharing between the connector and
> SMT does not work for KC 4.0
> Then I tried with
>
> {code:java}
> /kafka/connect
> |___ debezium-connectors
> |___ debezium-connector-postgres
> |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>
> So all connectors are not isolated and share the same class loader. In this
> case, no issue. And I'll say that this is expected.
> Then I tried with
>
> {code:java}
> /kafka/connect
> |___ debezium-connectors
> | |___ debezium-connector-postgres
> | |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>
> where *{{postgres}}* and *{{mongodb}}* are not isolated (same classloader)
> and *{{sqlserver}}* is isolated (different classloader), and in this case, it
> still works. I expected this to fail as with the first setup.
> The SMT is in the *debezium-core* jar that and each connector has its own copy
> So in each connector folder, there are:
> {code:java}
> debezium-api-3.3.0.Final.jar
> debezium-common-3.3.0.Final.jar
> debezium-connector-[connectorName]-3.3.0.Final.jar
> debezium-core-3.3.0.Final.jar
> debezium-openlineage-api-3.3.0.Final.jar{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)