Hi all:
请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
invalid for deserialization
at
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
大致逻辑如下, 我有2条流:
1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
代码如下:
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
List<KafkaInfo> kafkaSourceConfiguration =
this.kafkaConfiguration.getSource0();
KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink();
RecordTransformOperator transformOperator = new
RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
RecordKeySelector keySelector = new RecordKeySelector();
RecordComputeOperator computeOperator = new RecordComputeOperator();
Properties sinkProperties = new Properties();
sinkProperties.setProperty("bootstrap.servers",
kafkaSinkConfiguration.getBootstrapServer());
FlinkKafkaProducer011 flinkKafkaProducer
= new
FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
KafkaSerializer(), sinkProperties);
List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList
= new ArrayList<>();
for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers",
kafkaInfo.getBootstrapServer());
sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
sourceProperties.setProperty("max.poll.records",
kafkaInfo.getMaxPollRecord());
sourceProperties.put("max.poll.interval.ms",
kafkaInfo.getMaxPollIntervalMs());
String topicName = kafkaInfo.getTopicName();
FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
= new FlinkKafkaConsumer011(topicName,
new KafkaDeserializer(),
sourceProperties);
SingleOutputStreamOperator<Tuple2<String, String>>
singleOutputStreamOperator =
streamExecutionEnvironment.addSource(flinkKafkaConsumer);
dataStreamList.add(singleOutputStreamOperator);
}
DataStream<Tuple2<String, String>> unionDataStream =
dataStreamList.get(0);
for(int i = 1; i<dataStreamList.size(); i++) {
unionDataStream = unionDataStream.union(dataStreamList.get(i));
}
unionDataStream.flatMap(transformOperator)
.keyBy(keySelector)
.flatMap(computeOperator)
.addSink(flinkKafkaProducer);
RecordTransformOperator transformOperator1 = new
RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
Properties sinkProperties1 = new Properties();
sinkProperties1.setProperty("bootstrap.servers",
kafkaSinkConfiguration.getBootstrapServer());
FlinkKafkaProducer011 flinkKafkaProducer1
= new
FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
KafkaSerializer(), sinkProperties1);
KafkaInfo kafkaInfo = this.kafkaConfiguration.getSource1().get(0);
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers",
kafkaInfo.getBootstrapServer());
sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
sourceProperties.setProperty("max.poll.records",
kafkaInfo.getMaxPollRecord());
sourceProperties.put("max.poll.interval.ms",
kafkaInfo.getMaxPollIntervalMs());
String topicName = kafkaInfo.getTopicName();
FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
= new FlinkKafkaConsumer011(topicName,
new KafkaDeserializer(),
sourceProperties);
streamExecutionEnvironment
.addSource(flinkKafkaConsumer)
.flatMap(transformOperator1)
.addSink(flinkKafkaProducer1);
streamExecutionEnvironment.execute();
[email protected]