没有 join,只是简单的 union:
DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
DataStream snapshotStream =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
// map() is to convert two streams into same type: (action, fields…),
where action is “insert”, “update”, “delete”. The action for “snapshotStream”
is always “insert”.
DataStream tableStream =
binlogStream.map(…).union(snapshotStream.map(…));
tableStream.print();
env.execute(“example”);
我希望下游只看到一个流,这个流里先出现 snapshotStream 的所有消息,等这个发完后,再从 binlog 里读取,但是上面的代码段里
我控制不了 snapshotStream和 binlogStream 谁先发完消息。
你说的消费完后再切换到 Kafka 具体怎么做? DataStream 「消费完」这个事件没有暴露 hook 出来,而且好像 DataStream 的 DAG
构造好后不能变了??
Snapshot + binlog 的幂等是可以保证的,binlog 的 insert/update/delete 总是覆盖到 snapshot 之上。
user@flink 邮件列表里有人提到 side input,跟我的需求很像,binlogStream 开一个 side input 读取完
snapshotStream 然后再发自己(binlogStream) 的消息,但是可惜这个功能还没做完。
谢谢!
On 2020/4/7, 2:16 PM, "Jark Wu" <[email protected]> wrote:
Hi,
你这里的合并是用join 来做么? 这样的话,会比较耗性能。
一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的
binlog,没法做到 精确地切换到 kafka offset 上。
另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
Best,
Jark
On Sun, 5 Apr 2020 at 22:48, 刘宇宝 <[email protected]> wrote:
> 大家好,
>
> 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA 有一个
> topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
>
>
> 1. 先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> binlog-stream,但是要暂停消费 Kafka;
> 2. 用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为 table-stream;
> 3. 合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保 binlog 是
> *后* 应用到某个快照表上。
>
> 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> startBinlog,初始值为 false:
>
> binlog-stream -> waitOperator -> sinkOperator
> table-stream -> notifyOperator -> sinkOperator
>
> 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> table-stream 消费完(不知道怎么判断消费完了。。。), notifyOperator 修改全局状态,这样 binlog-stream
> 就能被继续消费了。
>
> 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
>
> 请教怎么破?
>
> 谢谢!
>
>