从报错来看,GenericRecord可能不能被序列化;感觉目前可以先用一个自定义的数据类型来传输


------------------------------------------------------------------
From:58683632 <58683...@qq.com>
Send Time:2020 Mar. 17 (Tue.) 13:33
To:user-zh <user-zh@flink.apache.org>
Subject:Streaming File Sink的使用问题

Streaming File Sink使用parquet avro格式进行bulk write,代码如下:final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint")));
Schema schema = new 
Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
final DataStreamSource<GenericRecord&gt; source = env.addSource(new 
RichSourceFunction<GenericRecord&gt;() {
    Schema schema1;

    @Override
    public void open(Configuration parameters) throws Exception {
        schema1 = new 
Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
    }

    @Override
    public void run(SourceContext<GenericRecord&gt; ctx) throws Exception {
        while (true) {
            Thread.sleep(2000);
            GenericRecord record = new GenericData.Record(schema1);
            record.put("name", "zhangsan");
            ctx.collect(record);
        }
    }

    @Override
    public void cancel() {

    }
});
final StreamingFileSink<GenericRecord&gt; sink = StreamingFileSink
        .forBulkFormat(new Path("file:///g:/tmp/streamsink"), 
ParquetAvroWriters.forGenericRecord(schema))
        .build();
source.addSink(sink);
env.execute();但是程序运行起来却报如下错误是什么原因呢com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException Serialization trace: reserved 
(org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) 
schema (org.apache.avro.generic.GenericData$Record)  at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)  
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)  at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
  at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) 
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)  at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)  at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)  at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
  at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
  at 
myflink.connector.StreamFileSinkConnector$1.run(StreamFileSinkConnector.java:37)
  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
 Caused by: java.lang.UnsupportedOperationException  at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)  at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
  at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)  at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)  
... 22 more

回复