再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 millseconds,我看了函数的使用方法,没想到哪里有问题 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | `database` varchar, | `data` row < transaction_id varchar, | user_id int, | amount int, | >, | maxwell_ts bigint, | ts_watermark as TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin Leonard Xu <[email protected]> 于2020年6月7日周日 下午5:51写道: > Hi, > 1.10确实有这个bug, > 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 > jark wu 修复的。 > > Best, > Leonard Xu > [1] https://issues.apache.org/jira/browse/FLINK-16526 < > https://issues.apache.org/jira/browse/FLINK-16526> > > > 在 2020年6月7日,15:32,macia kk <[email protected]> 写道: > > > > 各位大佬, > > > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > > > val bsSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > > val sourceTable = """CREATE TABLE my_kafak_source ( > > | `table` varchar, > > | `database` varchar, > > | `data` row < transaction_id varchar, > > | user_id int, > > | amount int, > > | >, > > | maxwell_ts bigint, > > | ts_watermark as > > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > > |) WITH ( > > |)""".stripMargin > > > > error > > > > The program finished with the following exception: > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method caused an error: SQL parse failed. Encountered "table" at line > > 1, column 8. > > Was expecting one of: > > "ABS" ... > > "ALL" ... > > "ARRAY" ... > > "AVG" ... > > "CARDINALITY" ... > > "CASE" ... > > "CAST" ... > > "CEIL" ... > >
