大家好,

我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA 有一个 topic 
“mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:


  1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为 
binlog-stream,但是要暂停消费 Kafka;
  2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为 table-stream;
  3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保 binlog 是 *后*  
应用到某个快照表上。

问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态  startBinlog,初始值为 
false:

  binlog-stream -> waitOperator   ->   sinkOperator
  table-stream -> notifyOperator -> sinkOperator

两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等 table-stream 
消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样 binlog-stream 就能被继续消费了。

但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。

请教怎么破?

谢谢!

回复