Robert,

But if Kafka is really only available in the user jar, then this error
> still should not occur.

I think so too; it should not occur.
I scan through all the jar files in the classpath using `jar tf` but no jar
contains org.apache.kafka.common.serialization.Deserializer with a
different version.

In your case it seems that the classes are loaded from different
> classloaders.

Hmm, why did the artifact work fine with per-job cluster mode?

p.s. Another user seems to face the same problem:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplication-error-on-Kafka-Connector-Libraries-td39805.html#a39812

Thanks,

Dongwon



On Wed, Dec 16, 2020 at 4:45 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hey Dongwon,
>
> I don't think this is the intended behavior.
> I believe in application mode, we are adding the user jar into the system
> classloader as well. In your case it seems that the classes are loaded from
> different classloaders.
> But if Kafka is really only available in the user jar, then this error
> still should not occur.
>
>
> On Wed, Dec 16, 2020 at 8:22 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> I just added the following option to the script:
>>
>>
>> -Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization
>>
>> Now it seems to work.
>>
>> Why do the application mode and the per-job cluster mode behave
>> differently when it comes to the classloading?
>>
>> Is it a bug? or intended?
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have an artifact which works perfectly fine with Per-Job Cluster Mode
>>> with the following bash script:
>>>
>>> #!/bin/env bash
>>>
>>> export FLINK_CONF_DIR=./conf
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>>>
>>> I tried Application Mode [1] using the exact same artifact with the
>>> following script:
>>>
>>> #!/bin/env bash
>>>
>>>
>>> export FLINK_CONF_DIR=./conf
>>>
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> $FLINK_HOME/bin/flink run-application -t yarn-application \
>>>
>>>     
>>> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
>>> \
>>>
>>>     -Dyarn.ship-files=myconf.conf \
>>>
>>>     hdfs:///jars/myjar.jar myconf.conf
>>>
>>> but the job fails with the following exception
>>>
>>> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - session-window -> (Sink: kafka-sink, Sink:
>>> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
>>> switched from RUNNING to FAILED.
>>>
>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>> producer
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>
>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
>>>
>>> Caused by: org.apache.kafka.common.KafkaException: class
>>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>>> instance of org.apache.kafka.common.serialization.Serializer
>>>
>>>         at
>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>>> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>>>
>>>         ... 23 more
>>>
>>> I have flink-connector-kafka_2.11 in my artifact and don't have it under
>>> flink lib directory at all.
>>>
>>> Thanks in advance,
>>>
>>> p.s. the attached is the detailed log message from a TM
>>>
>>> Dongwon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
>>>
>>>
>>

Reply via email to