Hi all:

请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
    at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class 
invalid for deserialization
    at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
    at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)

大致逻辑如下, 我有2条流:
1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
代码如下:
 StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
        List<KafkaInfo> kafkaSourceConfiguration = 
this.kafkaConfiguration.getSource0();
        KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink();
        RecordTransformOperator transformOperator = new 
RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
        RecordKeySelector keySelector = new RecordKeySelector();
        RecordComputeOperator computeOperator = new RecordComputeOperator();
        Properties sinkProperties = new Properties();
        sinkProperties.setProperty("bootstrap.servers", 
kafkaSinkConfiguration.getBootstrapServer());
        FlinkKafkaProducer011 flinkKafkaProducer
                = new 
FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new 
KafkaSerializer(), sinkProperties);

        List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList 
= new ArrayList<>();
        for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
            Properties sourceProperties = new Properties();
            sourceProperties.setProperty("bootstrap.servers", 
kafkaInfo.getBootstrapServer());
            sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
            sourceProperties.setProperty("max.poll.records", 
kafkaInfo.getMaxPollRecord());
            sourceProperties.put("max.poll.interval.ms", 
kafkaInfo.getMaxPollIntervalMs());
            String topicName = kafkaInfo.getTopicName();
            FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
                    = new FlinkKafkaConsumer011(topicName,
                    new KafkaDeserializer(),
                    sourceProperties);
            SingleOutputStreamOperator<Tuple2<String, String>> 
singleOutputStreamOperator =
                    streamExecutionEnvironment.addSource(flinkKafkaConsumer);
            dataStreamList.add(singleOutputStreamOperator);
        }

        DataStream<Tuple2<String, String>> unionDataStream = 
dataStreamList.get(0);
        for(int i = 1; i<dataStreamList.size(); i++) {
            unionDataStream = unionDataStream.union(dataStreamList.get(i));
        }
        unionDataStream.flatMap(transformOperator)
                .keyBy(keySelector)
                .flatMap(computeOperator)
                .addSink(flinkKafkaProducer);

        RecordTransformOperator transformOperator1 = new 
RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
        Properties sinkProperties1 = new Properties();
        sinkProperties1.setProperty("bootstrap.servers", 
kafkaSinkConfiguration.getBootstrapServer());
        FlinkKafkaProducer011 flinkKafkaProducer1
                = new 
FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new 
KafkaSerializer(), sinkProperties1);
        KafkaInfo kafkaInfo  = this.kafkaConfiguration.getSource1().get(0);
        Properties sourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers", 
kafkaInfo.getBootstrapServer());
        sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
        sourceProperties.setProperty("max.poll.records", 
kafkaInfo.getMaxPollRecord());
        sourceProperties.put("max.poll.interval.ms", 
kafkaInfo.getMaxPollIntervalMs());
        String topicName = kafkaInfo.getTopicName();
        FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
                = new FlinkKafkaConsumer011(topicName,
                new KafkaDeserializer(),
                sourceProperties);
        streamExecutionEnvironment
                .addSource(flinkKafkaConsumer)
                .flatMap(transformOperator1)
                .addSink(flinkKafkaProducer1);
        streamExecutionEnvironment.execute();



[email protected]

回复