```scala
    val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
    val sourceTable = """CREATE TABLE my_kafak_source (
                        |    `table` varchar,
                        |    `database` varchar,
                        |    `data` row < transaction_id varchar,
                        |               user_id int,
                        |               amount int,
                        |               reference_id varchar,
                        |               status int,
                        |               transaction_type int,
                        |               merchant_id int,
                        |               update_time int,
                        |               create_time int
                        |    >,
                        |    maxwell_ts bigint,
                        |    ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
                        |) WITH (
                        |    'connector.type' = 'kafka',
                        |    'connector.version' = '0.11',
                        |    'connector.topic' = 'xx',
                        |    'connector.startup-mode' = 'latest-offset',
                        |    'update-mode' = 'append',
                        |    'format.type' = 'json',
                        |    'format.derive-schema' = 'true'
                        |)""".stripMargin

    val dstTable = """CREATE TABLE my_kafak_dst (
                     |     transaction_type int,
                     |     transaction_id VARCHAR,
                     |     reference_id VARCHAR,
                     |     merchant_id int,
                     |     status int,
                     |     create_time int,
                     |     maxwell_ts bigint,
                     |     ts_watermark TIMESTAMP(3)
                     |) WITH (
                     |    'connector.type' = 'kafka',
                     |    'connector.version' = '0.11',
                     |    'connector.topic' = 'uu',
                     |    'update-mode' = 'append',
                     |    'format.type' = 'json',
                     |    'format.derive-schema' = 'true'
                     |)""".stripMargin


    bsTableEnv.sqlUpdate(sourceTable)
    bsTableEnv.sqlUpdate(dstTable)


    val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
ts_watermark FROM my_kafak_source")
    bsTableEnv.createTemporaryView("main_table", main_table)

    bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
main_table")
```

macia kk <[email protected]> 于2020年6月7日周日 下午3:41写道:

> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>
> Benchao Li <[email protected]> 于2020年6月7日周日 下午3:38写道:
>
>> Hi,
>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>
>> macia kk <[email protected]> 于2020年6月7日周日 下午3:33写道:
>>
>> > 各位大佬,
>> >
>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>> >
>> >     val bsSettings =
>> >
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>> >     val sourceTable = """CREATE TABLE my_kafak_source (
>> >                         |    `table` varchar,
>> >                         |    `database` varchar,
>> >                         |    `data` row < transaction_id varchar,
>> >                         |               user_id int,
>> >                         |               amount int,
>> >                         |    >,
>> >                         |    maxwell_ts bigint,
>> >                         |    ts_watermark as
>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>> >                         |) WITH (
>> >                         |)""".stripMargin
>> >
>> > error
>> >
>> >  The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: SQL parse failed. Encountered "table" at line
>> > 1, column 8.
>> > Was expecting one of:
>> >     "ABS" ...
>> >     "ALL" ...
>> >     "ARRAY" ...
>> >     "AVG" ...
>> >     "CARDINALITY" ...
>> >     "CASE" ...
>> >     "CAST" ...
>> >     "CEIL" ...
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

回复