你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.

On 8/26/21, jie han <baibaiwuch...@gmail.com> wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空 <wukon...@foxmail.com> 于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用&nbsp;--isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> ------------------&nbsp;原始邮件&nbsp;------------------
>> 发件人:
>>                                                   "user-zh"
>>                                                                     <
>> tsreape...@gmail.com&gt;;
>> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>>
>> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 <wukon...@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>>
>> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>> &gt; 加入的,然后执行execute()方法
>> &gt;
>> &gt;
>> &gt;
>> &gt;
>> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
>> &gt; 发件人:
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "user-zh"
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> <
>> &gt; fskm...@gmail.com&amp;gt;;
>> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
>> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
>> &gt;
>> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> &gt;
>> &gt;
>> &gt;
>> &gt; 说的是 statement set [1] 吗 ?
>> &gt;
>> &gt; [1]
>> &gt;
>> &gt;
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> &gt
>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
>> ;
>> &gt; 悟空 <wukon...@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
>> &gt;
>> &gt; &amp;gt; hi all:&amp;amp;nbsp;
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>> &gt; 在一个事务里 先将kafka
>> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
>> db_table_sink&amp;amp;nbsp;select *
>> &gt; from&amp;amp;nbsp;
>> &gt; &amp;gt; kafka_source_table;
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
>> select * from
>> &gt; kafka_source_table;
>> &gt; &amp;gt;
>> &gt; &amp;gt;
>> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>> &gt; 程序没有挂掉。
>

回复