????????????????datastreaming 
API??????debezium????????????????????????debezium??????
public static Properties debeziumProperties(){
    Properties properties = new Properties();
properties.setProperty(??xxxx??,??xxxx");
    return properties;
}


    SourceFunction<T> sourceFunction = MySQLSource.<T>builder()
            . . .
            .debeziumProperties(debeziumProperties())
            .build();


| |
MOBIN
|
|
[email protected]
|
??????????????????????


??2021??04??27?? 14:46????????????<[email protected]> ??????
hi all

??????????flink cdc??????????????????streaming 
mode??????????binlog??????????????????????????????????????????????????mysql??RELOAD????????????????sql????????cdc????????????
debezium.snap.shot.locking.mode = none????????


????????streaming 
mode??????????????????????????????????????????????????reload??????????????flink 
cdc??????????????????????
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String&gt; sourceFunction = MySQLSource.<String&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; .hostname("localhost")
&nbsp; &nbsp; &nbsp; &nbsp; .port(port)
&nbsp; &nbsp; &nbsp; &nbsp; .databaseList("database")&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; .tableList("database.test")
&nbsp; &nbsp; &nbsp; &nbsp; .username(??user)
&nbsp; &nbsp; &nbsp; &nbsp; .password("password")
&nbsp; &nbsp; &nbsp; &nbsp; .debeziumProperties(properties)
&nbsp; &nbsp; &nbsp; &nbsp; .deserializer(new 
StringDebeziumDeserializationSchema())
&nbsp; &nbsp; &nbsp; &nbsp; .build();

??.debeziumProperties(properties)????????????sql????????????????????????????????sourceFunction??????Streaming??????????????????????????????????????

回复