????????????????datastreaming
API??????debezium????????????????????????debezium??????
public static Properties debeziumProperties(){
Properties properties = new Properties();
properties.setProperty(??xxxx??,??xxxx");
return properties;
}
SourceFunction<T> sourceFunction = MySQLSource.<T>builder()
. . .
.debeziumProperties(debeziumProperties())
.build();
| |
MOBIN
|
|
[email protected]
|
??????????????????????
??2021??04??27?? 14:46????????????<[email protected]> ??????
hi all
??????????flink cdc??????????????????streaming
mode??????????binlog??????????????????????????????????????????????????mysql??RELOAD????????????????sql????????cdc????????????
debezium.snap.shot.locking.mode = none????????
????????streaming
mode??????????????????????????????????????????????????reload??????????????flink
cdc??????????????????????
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(port)
.databaseList("database")
.tableList("database.test")
.username(??user)
.password("password")
.debeziumProperties(properties)
.deserializer(new
StringDebeziumDeserializationSchema())
.build();
??.debeziumProperties(properties)????????????sql????????????????????????????????sourceFunction??????Streaming??????????????????????????????????????