Hi Leonard, Can I have a YAML definition corresponding to the DDL you suggested?
I tried below (Flink 1.11.0) but got some error: > tables: > - name: test > type: source-table > update-mode: append > connector: > property-version: 1 > type: kafka > version: universal > topic: ... > properties: > bootstrap.servers: ... > group.id: ... > format: > property-version: 1 > type: json > schema: > - name: type > data-type: STRING > - name: location > data-type: > > ROW< > id STRING, > lastUpdateTime BIGINT > > > - name: timestampCol > data-type: TIMESTAMP(3) > rowtime: > timestamps: > type: from-field > from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, > 'yyyy-MM-dd HH:mm:ss')) > watermarks: > type: periodic-bounded > delay: 5000 > SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message: > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at SourceConversion$4.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Leonard, > > Wow, that's great! It works like a charm. > I've never considered this approach at all. > Thanks a lot. > > Best, > Dongwon > > On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> wrote: > >> Hi, Kim >> >> The reason your attempts (2) and (3) failed is that the json format does >> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT >> field and then use a computed column to extract TIMESTAMP field, you can >> also define the time attribute on TIMESTAMP filed for using time-based >> operations in Flink 1.10.1. But the computed column only support in pure >> DDL, the Table API lacks the support and should be aligned in 1.12 as I >> know. >> The DDL syntax as following: >> >> create table test ( >> `type` STRING, >> `location` ROW<`id` STRING, lastUpdateTime BIGINT>, >> timestampCol as >> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd >> HH:mm:ss')), —computed column >> WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND >> ) with ( >> 'connector' = '...', >> 'format' = 'json', >> ... >> ); >> >> >> Best, >> Leonard Xu >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html >> >> >> 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道: >> >> Hi, >> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The >> message looks like below. >> >>> { >>> "type":"Update", >>> "location":{ >>> "id":"123e4567-e89b-12d3-a456-426652340000", >>> "lastUpdateTime":1593866161436 >>> } >>> } >> >> >> I wrote the following program just to see whether json messages are >> correctly parsed by Table API: >> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> EnvironmentSettings envSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >>> envSettings); >>> tEnv >>> .connect( >>> new Kafka() >>> .version("universal") >>> .topic(consumerTopic) >>> .startFromLatest() >>> .properties(consumerProperties) >>> ) >>> .withFormat(new Json()) >>> .withSchema(new Schema().schema( >>> TableSchema.builder() >>> .field("type", STRING()) >>> .field("location", >>> ROW( >>> FIELD("id", STRING()), >>> // (1) >>> FIELD("lastUpdateTime", BIGINT()) >>> // (2) >>> FIELD("lastUpdateTime", TIMESTAMP()) >>> // (3) >>> FIELD("lastUpdateTime", >>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)) >>> )) >>> .build() >>> )) >>> .createTemporaryTable("message"); >>> tEnv.toAppendStream(tEnv.from("message"), Row.class) >>> .print(); >> >> >> Note that I tried BIGINT(), TIMESTAMP(), and >> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class). >> (1) it works fine but later I can't use time-based operations like >> windowing. >> >> (2) it causes the following exception >> >>> Exception in thread "main" >>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, >>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match >>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of >>> the 'location' field of the TableSource return type. >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191) >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252) >>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) >>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >>> at >>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >>> at >>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >>> at >>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234) >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212) >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>> at >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>> at >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>> at >>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133) >>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` >>> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not >>> match with the physical type ROW<`id` STRING, `lastUpdateTime` >>> TIMESTAMP(3)> of the 'location' field of the TableSource return type. >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) >>> at >>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188) >>> ... 38 more >> >> >> (3) it causes the following exception >> >>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814' >>> could not be parsed at index 0 >>> at >>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>> at >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>> ... 7 more >> >> >> Can I read such json messages with time information in Flink 1.10.1? >> >> Thanks >> >> Dongwon >> >> >>