本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。
我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。 等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素; 这样kafka的batch sinka节奏应该就不用管了,两者的batch条件相互独立。 我自己初步看了下,应该可以? 初学者,望大佬提点,还有其它的注意事项要注意不? -- Sent from: http://apache-flink.147419.n8.nabble.com/
