hi, Colar.

Flink 使用的 Kafka 的版本是2.4.1,但是你的集群版本是1.1.1。看样子 作业运行时加载的是 集群上的
ByteArraySerializer,而不是 Flink 的
`flink-connector-kafka`中的。不太确定打成一个shade包能不能行。

Best,
Shengkai

Colar <523774...@qq.com> 于2021年4月26日周一 下午6:05写道:

> 使用Flink 1.12.2 消费Kafka报错:
>
> 2021-04-26 17:39:39,802 WARN  org.apache.flink.runtime.taskmanager.Task
>
> [] - TriggerWindow(SlidingEventTimeWindows(30000, 5000),
> ReducingStateDescriptor{name=window-contents, defaultValue=null,
>
> serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@387fc2ca
> },
> EventTimeTrigger(), AllWindowedStream.reduce(AllWindowedStream.java:236))
> ->
> Map -> Sink: Unnamed (1/1)#104 (9c14897c119bf3ba51b0a1b40ce427a7) switched
> from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>         at
>
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1200)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
> ~[?:1.8.0_271]
>         at
> java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> ~[?:1.8.0_271]
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> ~[?:1.8.0_271]
>         at
> java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
> ~[?:1.8.0_271]
>         at
> java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
> ~[?:1.8.0_271]
>         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> ~[?:1.8.0_271]
>         at
> java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
> ~[?:1.8.0_271]
>         at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
> ~[?:1.8.0_271]
>         at
> java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
> ~[?:1.8.0_271]
>         at
>
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
> ~[?:1.8.0_271]
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
> ~[?:1.8.0_271]
>         at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
> ~[?:1.8.0_271]
>         at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
> ~[?:1.8.0_271]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1186)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1122)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> [flink-dist_2.11-1.12.2.jar:1.12.2]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> [flink-dist_2.11-1.12.2.jar:1.12.2]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
> Caused by: org.apache.kafka.common.KafkaException: class
> org.apache.kafka.common.serialization.ByteArraySerializer is not an
> instance
> of org.apache.kafka.common.serialization.Serializer
>         at
>
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         at
>
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
> ~[fault_tolerance-1.0-SNAPSHOT.jar:?]
>         ... 31 more
>
>
> 引入依赖如下:
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-java</artifactId>
>             <scope>provided</scope>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>             <scope>provided</scope>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
>             <scope>provided</scope>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-client</artifactId>
>             <scope>provided</scope>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>             <scope>provided</scope>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>         </dependency>
>
> 目前在IDEA中直接运行可以运行,并且能够成功生产和消费数据,但是放在集群中,使用如下命令运行,任务会抛出异常然后失败:
> flink run-application -t yarn-application ./kafka-1.0-SNAPSHOT.jar
>
> 请问该怎么解决这个问题,集群的Kafka版本是1.1.1,Flink版本是1.12.2。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复