Hi,
根据报错内容,定位到你的代码在
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。
Best,
Robin
[email protected] wrote
> Hi all:
>
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> <init>
> (OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
> at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
> StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> List
> <KafkaInfo>
> kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
> KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
> RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
> RecordKeySelector keySelector = new RecordKeySelector();
> RecordComputeOperator computeOperator = new
> RecordComputeOperator();
> Properties sinkProperties = new Properties();
> sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
>
> List<SingleOutputStreamOperator<Tuple2<String,
> String>>> dataStreamList = new ArrayList<>();
> for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011<Tuple2<String, String>>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new KafkaDeserializer(),
> sourceProperties);
> SingleOutputStreamOperator<Tuple2<String, String>>
> singleOutputStreamOperator =
>
> streamExecutionEnvironment.addSource(flinkKafkaConsumer);
> dataStreamList.add(singleOutputStreamOperator);
> }
>
> DataStream<Tuple2<String, String>> unionDataStream =
> dataStreamList.get(0);
> for(int i = 1; i<dataStreamList.size(); i++) {
> unionDataStream =
> unionDataStream.union(dataStreamList.get(i));
> }
> unionDataStream.flatMap(transformOperator)
> .keyBy(keySelector)
> .flatMap(computeOperator)
> .addSink(flinkKafkaProducer);
>
> RecordTransformOperator transformOperator1 = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
> Properties sinkProperties1 = new Properties();
> sinkProperties1.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer1
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties1);
> KafkaInfo kafkaInfo =
> this.kafkaConfiguration.getSource1().get(0);
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011<Tuple2<String, String>>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new KafkaDeserializer(),
> sourceProperties);
> streamExecutionEnvironment
> .addSource(flinkKafkaConsumer)
> .flatMap(transformOperator1)
> .addSink(flinkKafkaProducer1);
> streamExecutionEnvironment.execute();
>
>
> freeza1982@
--
Sent from: http://apache-flink.147419.n8.nabble.com/