Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
message looks like below.

>     {
>        "type":"Update",
>        "location":{
>           "id":"123e4567-e89b-12d3-a456-426652340000",
>           "lastUpdateTime":1593866161436
>        }
>     }


I wrote the following program just to see whether json messages are
correctly parsed by Table API:

>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> envSettings);
>     tEnv
>       .connect(
>         new Kafka()
>           .version("universal")
>           .topic(consumerTopic)
>           .startFromLatest()
>           .properties(consumerProperties)
>       )
>       .withFormat(new Json())
>       .withSchema(new Schema().schema(
>         TableSchema.builder()
>           .field("type", STRING())
>           .field("location",
>             ROW(
>               FIELD("id", STRING()),
>               // (1)
>               FIELD("lastUpdateTime", BIGINT())
>               // (2)
>               FIELD("lastUpdateTime", TIMESTAMP())
>               // (3)
>               FIELD("lastUpdateTime",
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>             ))
>           .build()
>       ))
>       .createTemporaryTable("message");
>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>       .print();


Note that I tried BIGINT(), TIMESTAMP(), and
TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like
windowing.

(2) it causes the following exception

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field
> 'location' does not match with the physical type ROW<`id` STRING,
> `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource
> return type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id`
> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not
> match with the physical type ROW<`id` STRING, `lastUpdateTime`
> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
> ... 38 more


(3) it causes the following exception

> Caused by: java.time.format.DateTimeParseException: Text '1593868714814'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more


Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon

Reply via email to