hi all

??????????flink cdc??????????????????streaming 
mode??????????binlog??????????????????????????????????????????????????mysql??RELOAD????????????????sql????????cdc????????????
debezium.snap.shot.locking.mode = none????????


????????streaming 
mode??????????????????????????????????????????????????reload??????????????flink 
cdc??????????????????????
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String&gt; sourceFunction = MySQLSource.<String&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; .hostname("localhost")
&nbsp; &nbsp; &nbsp; &nbsp; .port(port)
&nbsp; &nbsp; &nbsp; &nbsp; .databaseList("database")&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; .tableList("database.test")
&nbsp; &nbsp; &nbsp; &nbsp; .username(??user)
&nbsp; &nbsp; &nbsp; &nbsp; .password("password")
&nbsp; &nbsp; &nbsp; &nbsp; .debeziumProperties(properties)
&nbsp; &nbsp; &nbsp; &nbsp; .deserializer(new 
StringDebeziumDeserializationSchema())
&nbsp; &nbsp; &nbsp; &nbsp; .build();

??.debeziumProperties(properties)????????????sql????????????????????????????????sourceFunction??????Streaming??????????????????????????????????????

回复