val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
    val sourceTable = """CREATE TABLE my_kafak_source (
                        |    `table` varchar,
                        |    `database` varchar,
                        |    `data` row < transaction_id varchar,
                        |               user_id int,
                        |               amount int,
                        |               reference_id varchar,
                        |               status int,
                        |               transaction_type int,
                        |               merchant_id int,
                        |               update_time int,
                        |               create_time int
                        |    >,
                        |    maxwell_ts bigint,
                        |    ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
                        |) WITH (
                        |    'connector.type' = 'kafka',
                        |    'connector.version' = '0.11',
                        |    'connector.topic' = 'xx',
                        |    'connector.startup-mode' = 'latest-offset',
                        |    'update-mode' = 'append',
                        |    'format.type' = 'json',
                        |    'format.derive-schema' = 'true'
                        |)""".stripMargin

    val dstTable = """CREATE TABLE my_kafak_dst (
                     |     transaction_type int,
                     |     transaction_id VARCHAR,
                     |     reference_id VARCHAR,
                     |     merchant_id int,
                     |     status int,
                     |     create_time int,
                     |     maxwell_ts bigint,
                     |     ts_watermark TIMESTAMP(3)
                     |) WITH (
                     |    'connector.type' = 'kafka',
                     |    'connector.version' = '0.11',
                     |    'connector.topic' = 'uu',
                     |    'update-mode' = 'append',
                     |    'format.type' = 'json',
                     |    'format.derive-schema' = 'true'
                     |)""".stripMargin

    bsTableEnv.sqlUpdate(sourceTable)
    bsTableEnv.sqlUpdate(dstTable)


    val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time,
maxwell_ts, ts_watermark FROM my_kafak_source")
    bsTableEnv.createTemporaryView("main_table", main_table)

    bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table")


macia kk <[email protected]> 于2020年6月7日周日 下午3:45写道:

> ```scala
>     val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>     val sourceTable = """CREATE TABLE my_kafak_source (
>                         |    `table` varchar,
>                         |    `database` varchar,
>                         |    `data` row < transaction_id varchar,
>                         |               user_id int,
>                         |               amount int,
>                         |               reference_id varchar,
>                         |               status int,
>                         |               transaction_type int,
>                         |               merchant_id int,
>                         |               update_time int,
>                         |               create_time int
>                         |    >,
>                         |    maxwell_ts bigint,
>                         |    ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>                         |) WITH (
>                         |    'connector.type' = 'kafka',
>                         |    'connector.version' = '0.11',
>                         |    'connector.topic' = 'xx',
>                         |    'connector.startup-mode' = 'latest-offset',
>                         |    'update-mode' = 'append',
>                         |    'format.type' = 'json',
>                         |    'format.derive-schema' = 'true'
>                         |)""".stripMargin
>
>     val dstTable = """CREATE TABLE my_kafak_dst (
>                      |     transaction_type int,
>                      |     transaction_id VARCHAR,
>                      |     reference_id VARCHAR,
>                      |     merchant_id int,
>                      |     status int,
>                      |     create_time int,
>                      |     maxwell_ts bigint,
>                      |     ts_watermark TIMESTAMP(3)
>                      |) WITH (
>                      |    'connector.type' = 'kafka',
>                      |    'connector.version' = '0.11',
>                      |    'connector.topic' = 'uu',
>                      |    'update-mode' = 'append',
>                      |    'format.type' = 'json',
>                      |    'format.derive-schema' = 'true'
>                      |)""".stripMargin
>
>
>     bsTableEnv.sqlUpdate(sourceTable)
>     bsTableEnv.sqlUpdate(dstTable)
>
>
>     val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
> transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
> ts_watermark FROM my_kafak_source")
>     bsTableEnv.createTemporaryView("main_table", main_table)
>
>     bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
> main_table")
> ```
>
> macia kk <[email protected]> 于2020年6月7日周日 下午3:41写道:
>
>> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>>
>> Benchao Li <[email protected]> 于2020年6月7日周日 下午3:38写道:
>>
>>> Hi,
>>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>>
>>> macia kk <[email protected]> 于2020年6月7日周日 下午3:33写道:
>>>
>>> > 各位大佬,
>>> >
>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>>> >
>>> >     val bsSettings =
>>> >
>>> >
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> >     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>>> >     val sourceTable = """CREATE TABLE my_kafak_source (
>>> >                         |    `table` varchar,
>>> >                         |    `database` varchar,
>>> >                         |    `data` row < transaction_id varchar,
>>> >                         |               user_id int,
>>> >                         |               amount int,
>>> >                         |    >,
>>> >                         |    maxwell_ts bigint,
>>> >                         |    ts_watermark as
>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>>> >                         |) WITH (
>>> >                         |)""".stripMargin
>>> >
>>> > error
>>> >
>>> >  The program finished with the following exception:
>>> >
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> > method caused an error: SQL parse failed. Encountered "table" at line
>>> > 1, column 8.
>>> > Was expecting one of:
>>> >     "ABS" ...
>>> >     "ALL" ...
>>> >     "ARRAY" ...
>>> >     "AVG" ...
>>> >     "CARDINALITY" ...
>>> >     "CASE" ...
>>> >     "CAST" ...
>>> >     "CEIL" ...
>>> >
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>

回复