Hi,
想问下Flink SQL在使用DDL创建Kafka 
Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:


CREATE TABLE user_behavior (
test_time TIMESTAMP(3),
user_id STRING ,
item_id STRING ,
category_id STRING ,
behavior STRING,
ts STRING,
proctime as PROCTIME() -- 通过计算列产生一个处理时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'test', -- kafka topic
'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
--'connector.properties.group.id' = 'mytest',
'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- zookeeper 
地址
'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- kafka 
broker 地址
'format.type' = 'json' -- 数据源格式为 json
,'schema.0.rowtime.timestamps.type' = 'from-source',
'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
'schema.0.rowtime.watermarks.delay' = '5000'
)




异常为:


 java.lang.UnsupportedOperationException: empty.max
 at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
 at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
 at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
 at scala.Option.map(Option.scala:146)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at org.apache.flink.table.planner.plan.no

回复