```scala
val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val sourceTable = """CREATE TABLE my_kafak_source (
| `table` varchar,
| `database` varchar,
| `data` row < transaction_id varchar,
| user_id int,
| amount int,
| reference_id varchar,
| status int,
| transaction_type int,
| merchant_id int,
| update_time int,
| create_time int
| >,
| maxwell_ts bigint,
| ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = '0.11',
| 'connector.topic' = 'xx',
| 'connector.startup-mode' = 'latest-offset',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)""".stripMargin
val dstTable = """CREATE TABLE my_kafak_dst (
| transaction_type int,
| transaction_id VARCHAR,
| reference_id VARCHAR,
| merchant_id int,
| status int,
| create_time int,
| maxwell_ts bigint,
| ts_watermark TIMESTAMP(3)
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = '0.11',
| 'connector.topic' = 'uu',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)""".stripMargin
bsTableEnv.sqlUpdate(sourceTable)
bsTableEnv.sqlUpdate(dstTable)
val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
ts_watermark FROM my_kafak_source")
bsTableEnv.createTemporaryView("main_table", main_table)
bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
main_table")
```
macia kk <[email protected]> 于2020年6月7日周日 下午3:41写道:
> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>
> Benchao Li <[email protected]> 于2020年6月7日周日 下午3:38写道:
>
>> Hi,
>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>
>> macia kk <[email protected]> 于2020年6月7日周日 下午3:33写道:
>>
>> > 各位大佬,
>> >
>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>> >
>> > val bsSettings =
>> >
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>> > val sourceTable = """CREATE TABLE my_kafak_source (
>> > | `table` varchar,
>> > | `database` varchar,
>> > | `data` row < transaction_id varchar,
>> > | user_id int,
>> > | amount int,
>> > | >,
>> > | maxwell_ts bigint,
>> > | ts_watermark as
>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>> > |) WITH (
>> > |)""".stripMargin
>> >
>> > error
>> >
>> > The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: SQL parse failed. Encountered "table" at line
>> > 1, column 8.
>> > Was expecting one of:
>> > "ABS" ...
>> > "ALL" ...
>> > "ARRAY" ...
>> > "AVG" ...
>> > "CARDINALITY" ...
>> > "CASE" ...
>> > "CAST" ...
>> > "CEIL" ...
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>