Greetings,
I'm bumping into an isse with 3.0.0-alpha that I can reproduce with the
example Kafka provider in the Kylin distro
(org.apache.kylin.source.kafka.util.KafkaSampleProducer).
That is, when sending JSON messages to the input topic which contain nested
structure (or an array), the streaming receiver fails parsing it. This
happens even if I remove the field from the data source table schema
definition.
The exception I get (this is for the sample sales data from
org.apache.kylin.source.kafka.util.KafkaSampleProducer):
2019-05-01 02:48:40,299 ERROR [str_kylin_sales_topic_cube_channel]
kafka.TimedJsonStreamParser:107 : error
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot
deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source:
(String)"{"country":"CANADA","amount":13.47842235458867,"qty":5,"currency":"USD","order_time":1556678920296,"category":"Other","device":"Andriod","user":{"gender":"Female","id":"b2bcd974-387a-ef81-4950-5668810bfce3","first_name":"unknown","age":25}}";
line: 1, column: 145] (through reference chain: java.util.HashMap["user"])
at
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
at
com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342)
at
com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138)
at
com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092)
at
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:63)
at
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10)
at
com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:527)
at
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
at
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3030)
at
org.apache.kylin.stream.source.kafka.TimedJsonStreamParser.parse(TimedJsonStreamParser.java:79)
at
org.apache.kylin.stream.source.kafka.TimedJsonStreamParser.parse(TimedJsonStreamParser.java:54)
at
org.apache.kylin.stream.source.kafka.consumer.KafkaConnector.nextEvent(KafkaConnector.java:110)
at
org.apache.kylin.stream.core.consumer.StreamingConsumerChannel.run(StreamingConsumerChannel.java:93)
at java.lang.Thread.run(Thread.java:745)
Does someone have any ideas what to do about this? What am I possibly doing
wrong?
Many thanks,
Andras