Hi,各位大佬,谁有空帮我看下这个问题
Source: Kafka
SinkL Kafka
主要逻辑是 *main_table* left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
transaction_id,我这个模式应该是 append 模式,但是结果好像不是
Error
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: AppendStreamTableSink requires that Table has
only insert changes.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
Code
val main_column = "`database`, `table`, `transaction_type`,
`transaction_id`, `reference_id`, `merchant_id`, `event_time`,
`status`"
val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
bsTableEnv.createTemporaryView("main_table", main_table)
val merchant_column = "transaction_sn, user_id"
val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
FROM Keystats_airpay_consumer WHERE `table` LIKE
'wallet_id_merchant_db%' ")
bsTableEnv.createTemporaryView("merchant_table", merchant_table)
bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
| SELECT `database`, `table`, `transaction_type`,
| `merchant_id`, `event_time`, `status`,
| FIRST_VALUE(`transaction_id`) OVER
(PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
PRECEDING)
| FROM (
| SELECT `database`, `table`,
`transaction_type`, `transaction_id`,
| `merchant_id`, `event_time`, `status`,
`reference_id`
| FROM main_table
| LEFT JOIN merchant_table
| ON main_table.reference_id =
merchant_table.transaction_sn
| )
|""".stripMargin)