val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val sourceTable = """CREATE TABLE my_kafak_source (
| `table` varchar,
| `database` varchar,
| `data` row < transaction_id varchar,
| user_id int,
| amount int,
| reference_id varchar,
| status int,
| transaction_type int,
| merchant_id int,
| update_time int,
| create_time int
| >,
| maxwell_ts bigint,
| ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = '0.11',
| 'connector.topic' = 'xx',
| 'connector.startup-mode' = 'latest-offset',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)""".stripMargin
val dstTable = """CREATE TABLE my_kafak_dst (
| transaction_type int,
| transaction_id VARCHAR,
| reference_id VARCHAR,
| merchant_id int,
| status int,
| create_time int,
| maxwell_ts bigint,
| ts_watermark TIMESTAMP(3)
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = '0.11',
| 'connector.topic' = 'uu',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)""".stripMargin
bsTableEnv.sqlUpdate(sourceTable)
bsTableEnv.sqlUpdate(dstTable)
val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time,
maxwell_ts, ts_watermark FROM my_kafak_source")
bsTableEnv.createTemporaryView("main_table", main_table)
bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table")
macia kk <[email protected]> 于2020年6月7日周日 下午3:45写道:
> ```scala
> val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> val sourceTable = """CREATE TABLE my_kafak_source (
> | `table` varchar,
> | `database` varchar,
> | `data` row < transaction_id varchar,
> | user_id int,
> | amount int,
> | reference_id varchar,
> | status int,
> | transaction_type int,
> | merchant_id int,
> | update_time int,
> | create_time int
> | >,
> | maxwell_ts bigint,
> | ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> |) WITH (
> | 'connector.type' = 'kafka',
> | 'connector.version' = '0.11',
> | 'connector.topic' = 'xx',
> | 'connector.startup-mode' = 'latest-offset',
> | 'update-mode' = 'append',
> | 'format.type' = 'json',
> | 'format.derive-schema' = 'true'
> |)""".stripMargin
>
> val dstTable = """CREATE TABLE my_kafak_dst (
> | transaction_type int,
> | transaction_id VARCHAR,
> | reference_id VARCHAR,
> | merchant_id int,
> | status int,
> | create_time int,
> | maxwell_ts bigint,
> | ts_watermark TIMESTAMP(3)
> |) WITH (
> | 'connector.type' = 'kafka',
> | 'connector.version' = '0.11',
> | 'connector.topic' = 'uu',
> | 'update-mode' = 'append',
> | 'format.type' = 'json',
> | 'format.derive-schema' = 'true'
> |)""".stripMargin
>
>
> bsTableEnv.sqlUpdate(sourceTable)
> bsTableEnv.sqlUpdate(dstTable)
>
>
> val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
> transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
> ts_watermark FROM my_kafak_source")
> bsTableEnv.createTemporaryView("main_table", main_table)
>
> bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
> main_table")
> ```
>
> macia kk <[email protected]> 于2020年6月7日周日 下午3:41写道:
>
>> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>>
>> Benchao Li <[email protected]> 于2020年6月7日周日 下午3:38写道:
>>
>>> Hi,
>>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>>
>>> macia kk <[email protected]> 于2020年6月7日周日 下午3:33写道:
>>>
>>> > 各位大佬,
>>> >
>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>>> >
>>> > val bsSettings =
>>> >
>>> >
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>>> > val sourceTable = """CREATE TABLE my_kafak_source (
>>> > | `table` varchar,
>>> > | `database` varchar,
>>> > | `data` row < transaction_id varchar,
>>> > | user_id int,
>>> > | amount int,
>>> > | >,
>>> > | maxwell_ts bigint,
>>> > | ts_watermark as
>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>>> > |) WITH (
>>> > |)""".stripMargin
>>> >
>>> > error
>>> >
>>> > The program finished with the following exception:
>>> >
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> > method caused an error: SQL parse failed. Encountered "table" at line
>>> > 1, column 8.
>>> > Was expecting one of:
>>> > "ABS" ...
>>> > "ALL" ...
>>> > "ARRAY" ...
>>> > "AVG" ...
>>> > "CARDINALITY" ...
>>> > "CASE" ...
>>> > "CAST" ...
>>> > "CEIL" ...
>>> >
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>