Hi
Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的 sink无法处理retract消息。
你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,

通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。


祝好,
Leonard Xu



> 在 2020年5月27日,10:23,Benchao Li <[email protected]> 写道:
> 
> 而且你的SQL里面有一部分是会产生retract的:
> 这里用的是regular left join,这种join类型是会产生retract结果的。
> 
>                           | FROM (
>                           |    SELECT `database`, `table`,
> `transaction_type`, `transaction_id`,
>                           |    `merchant_id`, `event_time`, `status`,
> `reference_id`
>                           |    FROM main_table
>                           |    LEFT JOIN merchant_table
>                           |    ON main_table.reference_id =
> merchant_table.transaction_sn
>                           | )
> 
> 
> macia kk <[email protected]> 于2020年5月27日周三 上午1:20写道:
> 
>> Hi,各位大佬,谁有空帮我看下这个问题
>> 
>> Source: Kafka
>> SinkL Kafka
>> 
>> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
>> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
>> 
>> Error
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: AppendStreamTableSink requires that Table has
>> only insert changes.
>>    at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>    at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>    at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Code
>> 
>>   val main_column = "`database`, `table`, `transaction_type`,
>> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
>> `status`"
>>    val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
>> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
>>    bsTableEnv.createTemporaryView("main_table", main_table)
>> 
>>    val merchant_column = "transaction_sn, user_id"
>>    val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
>> FROM Keystats_airpay_consumer WHERE `table` LIKE
>> 'wallet_id_merchant_db%' ")
>>    bsTableEnv.createTemporaryView("merchant_table", merchant_table)
>> 
>>    bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
>>                           | SELECT `database`, `table`,
>> `transaction_type`,
>>                           |   `merchant_id`, `event_time`, `status`,
>>                           |    FIRST_VALUE(`transaction_id`) OVER
>> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
>> PRECEDING)
>>                           | FROM (
>>                           |    SELECT `database`, `table`,
>> `transaction_type`, `transaction_id`,
>>                           |    `merchant_id`, `event_time`, `status`,
>> `reference_id`
>>                           |    FROM main_table
>>                           |    LEFT JOIN merchant_table
>>                           |    ON main_table.reference_id =
>> merchant_table.transaction_sn
>>                           | )
>>                           |""".stripMargin)
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li

回复