Hi,
I'm trying to convert a stream of JSON string to a stream of Avro
GenericRecords, and write this to parquet files, but I get the exception.
This exception came at the line /out.collect(genericRecord)/. If there's no
sink then there's no error.
/KryoException: java.lang.UnsupportedOperationException/
My code is as following:
/ val parquetSink: StreamingFileSink[GenericRecord] =
StreamingFileSink
.forBulkFormat(new Path(path),
ParquetAvroWriters.forGenericRecord(new
Schema.Parser().parse(schemaString)))
.build()
val parquetStream = inputStream.process(new ProcessFunction[String,
GenericRecord] {
@transient
private var schema: Schema = _
@transient
private var reader: GenericDatumReader[GenericRecord] = _
override def processElement(value: String,
ctx: ProcessFunction[String,
GenericRecord]#Context,
out: Collector[GenericRecord]): Unit
= {
if (reader == null) {
schema = new Schema.Parser().parse(schemaString)
reader = new GenericDatumReader[GenericRecord](schema)
}
try {
val genericRecord = reader.read(null,
DecoderFactory.get.jsonDecoder(schema, value))
out.collect(genericRecord)
} catch {
case e: Throwable =>
LOG.warn(s"Error decoding JSON string: $e\nRaw
string: `${value.value}`")
throw e
}
}
})
parquetStream.addSink(parquetSink)
/
The schema is a simple one with all fields are string.
I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
Could you please help?
Thanks and regards,
Averell
============
/com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: null
at
java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 33 common frames omitted/
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/