Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

I tried below (Flink 1.11.0) but got some error:

> tables:
>   - name: test
>     type: source-table
>     update-mode: append
>     connector:
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: ...
>       properties:
>         bootstrap.servers: ...
>         group.id: ...
>     format:
>       property-version: 1
>       type: json
>     schema:
>       - name: type
>         data-type: STRING
>       - name: location
>         data-type: >
>           ROW<
>             id STRING,
>             lastUpdateTime BIGINT
>           >
>       - name: timestampCol
>         data-type: TIMESTAMP(3)
>         rowtime:
>           timestamps:
>             type: from-field
>             from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000,
> 'yyyy-MM-dd HH:mm:ss'))
>           watermarks:
>             type: periodic-bounded
>             delay: 5000
>

SQL client doesn't complain about the file but, when I execute "SELECT
timestampCol from test", the job fails with the following error message:

> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Leonard,
>
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
>
> Best,
> Dongwon
>
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> wrote:
>
>> Hi, Kim
>>
>> The reason your attempts (2) and (3) failed is that the json format does
>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
>> field and then use a computed column to extract TIMESTAMP field, you can
>> also define the time attribute on TIMESTAMP filed for using time-based
>> operations in Flink 1.10.1. But the computed column only support in pure
>> DDL, the Table API lacks the support and should be aligned in 1.12 as I
>> know.
>> The DDL syntax  as following:
>>
>> create table test (
>>   `type` STRING,
>>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>>    timestampCol as
>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd
>> HH:mm:ss')), —computed column
>>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
>> )   with (
>>   'connector' = '...',
>>   'format' = 'json',
>>   ...
>> );
>>
>>
>> Best,
>> Leonard Xu
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>>
>>
>> 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道:
>>
>> Hi,
>> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
>> message looks like below.
>>
>>>     {
>>>        "type":"Update",
>>>        "location":{
>>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>>           "lastUpdateTime":1593866161436
>>>        }
>>>     }
>>
>>
>> I wrote the following program just to see whether json messages are
>> correctly parsed by Table API:
>>
>>>     StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>     EnvironmentSettings envSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>>> envSettings);
>>>     tEnv
>>>       .connect(
>>>         new Kafka()
>>>           .version("universal")
>>>           .topic(consumerTopic)
>>>           .startFromLatest()
>>>           .properties(consumerProperties)
>>>       )
>>>       .withFormat(new Json())
>>>       .withSchema(new Schema().schema(
>>>         TableSchema.builder()
>>>           .field("type", STRING())
>>>           .field("location",
>>>             ROW(
>>>               FIELD("id", STRING()),
>>>               // (1)
>>>               FIELD("lastUpdateTime", BIGINT())
>>>               // (2)
>>>               FIELD("lastUpdateTime", TIMESTAMP())
>>>               // (3)
>>>               FIELD("lastUpdateTime",
>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>>             ))
>>>           .build()
>>>       ))
>>>       .createTemporaryTable("message");
>>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>>       .print();
>>
>>
>> Note that I tried BIGINT(), TIMESTAMP(), and
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
>> (1) it works fine but later I can't use time-based operations like
>> windowing.
>>
>> (2) it causes the following exception
>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>>> the 'location' field of the TableSource return type.
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>> at
>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>> at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>> at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id`
>>> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not
>>> match with the physical type ROW<`id` STRING, `lastUpdateTime`
>>> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>>> ... 38 more
>>
>>
>> (3) it causes the following exception
>>
>>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814'
>>> could not be parsed at index 0
>>> at
>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>> ... 7 more
>>
>>
>> Can I read such json messages with time information in Flink 1.10.1?
>>
>> Thanks
>>
>> Dongwon
>>
>>
>>

Reply via email to