Hi,
几点需要考虑一下:
1. 上游(Source端), 你需要用Flink CDC的MySQL Connector;
2. 下游(Sink端),
看你用哪种数据库,如果没有合适的connector,也需要自定义SInk类,继承RichSinkFunction类,
重载open和invoke等几个方法。
Regards,
Leo
在 2023/8/7 17:04, 小昌同学 写道:
各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction<ActionType> {
PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}
private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");
try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}
@Override
public void run(SourceContext<ActionType> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}
@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}
@Override
public void cancel() {
}
};
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|