Hi,
   根据报错内容,定位到你的代码在
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
      at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。

Best,
Robin




[email protected] wrote
> Hi all:
> 
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> <init>
> (OperatorChain.java:144)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
>     at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
>     at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>     at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> 
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
>  StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         List
> <KafkaInfo>
>  kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
>         KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
>         RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
>         RecordKeySelector keySelector = new RecordKeySelector();
>         RecordComputeOperator computeOperator = new
> RecordComputeOperator();
>         Properties sinkProperties = new Properties();
>         sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
>         FlinkKafkaProducer011 flinkKafkaProducer
>                 = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
> 
>         List&lt;SingleOutputStreamOperator&lt;Tuple2&lt;String,
> String&gt;>> dataStreamList = new ArrayList<>();
>         for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
>             Properties sourceProperties = new Properties();
>             sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
>             sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
>             sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
>             sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
>             String topicName = kafkaInfo.getTopicName();
>             FlinkKafkaConsumer011&lt;Tuple2&lt;String, String&gt;>
> flinkKafkaConsumer
>                     = new FlinkKafkaConsumer011(topicName,
>                     new KafkaDeserializer(),
>                     sourceProperties);
>             SingleOutputStreamOperator&lt;Tuple2&lt;String, String&gt;>
> singleOutputStreamOperator =
>                    
> streamExecutionEnvironment.addSource(flinkKafkaConsumer);
>             dataStreamList.add(singleOutputStreamOperator);
>         }
> 
>         DataStream&lt;Tuple2&lt;String, String&gt;> unionDataStream =
> dataStreamList.get(0);
>         for(int i = 1; i&lt;dataStreamList.size(); i++) {
>             unionDataStream =
> unionDataStream.union(dataStreamList.get(i));
>         }
>         unionDataStream.flatMap(transformOperator)
>                 .keyBy(keySelector)
>                 .flatMap(computeOperator)
>                 .addSink(flinkKafkaProducer);
> 
>         RecordTransformOperator transformOperator1 = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
>         Properties sinkProperties1 = new Properties();
>         sinkProperties1.setProperty(&quot;bootstrap.servers&quot;,
> kafkaSinkConfiguration.getBootstrapServer());
>         FlinkKafkaProducer011 flinkKafkaProducer1
>                 = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties1);
>         KafkaInfo kafkaInfo  =
> this.kafkaConfiguration.getSource1().get(0);
>         Properties sourceProperties = new Properties();
>         sourceProperties.setProperty(&quot;bootstrap.servers&quot;,
> kafkaInfo.getBootstrapServer());
>         sourceProperties.setProperty(&quot;group.id&quot;,
> kafkaInfo.getGroupId());
>         sourceProperties.setProperty(&quot;max.poll.records&quot;,
> kafkaInfo.getMaxPollRecord());
>         sourceProperties.put(&quot;max.poll.interval.ms&quot;,
> kafkaInfo.getMaxPollIntervalMs());
>         String topicName = kafkaInfo.getTopicName();
>         FlinkKafkaConsumer011&lt;Tuple2&lt;String, String&gt;>
> flinkKafkaConsumer
>                 = new FlinkKafkaConsumer011(topicName,
>                 new KafkaDeserializer(),
>                 sourceProperties);
>         streamExecutionEnvironment
>                 .addSource(flinkKafkaConsumer)
>                 .flatMap(transformOperator1)
>                 .addSink(flinkKafkaProducer1);
>         streamExecutionEnvironment.execute();
> 
> 

> freeza1982@





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复