Hi,各位大佬,谁有空帮我看下这个问题

Source: Kafka
SinkL Kafka

主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
transaction_id,我这个模式应该是 append 模式,但是结果好像不是

Error

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: AppendStreamTableSink requires that Table has
only insert changes.
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)








Code

   val main_column = "`database`, `table`, `transaction_type`,
`transaction_id`, `reference_id`, `merchant_id`, `event_time`,
`status`"
    val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
    bsTableEnv.createTemporaryView("main_table", main_table)

    val merchant_column = "transaction_sn, user_id"
    val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
FROM Keystats_airpay_consumer WHERE `table` LIKE
'wallet_id_merchant_db%' ")
    bsTableEnv.createTemporaryView("merchant_table", merchant_table)

    bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
                           | SELECT `database`, `table`, `transaction_type`,
                           |   `merchant_id`, `event_time`, `status`,
                           |    FIRST_VALUE(`transaction_id`) OVER
(PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
PRECEDING)
                           | FROM (
                           |    SELECT `database`, `table`,
`transaction_type`, `transaction_id`,
                           |    `merchant_id`, `event_time`, `status`,
`reference_id`
                           |    FROM main_table
                           |    LEFT JOIN merchant_table
                           |    ON main_table.reference_id =
merchant_table.transaction_sn
                           | )
                           |""".stripMargin)

回复