Hi,

几点需要考虑一下:

1. 上游(Source端), 你需要用Flink CDC的MySQL Connector;

2. 下游(Sink端), 看你用哪种数据库,如果没有合适的connector,也需要自定义SInk类,继承RichSinkFunction类,

重载open和invoke等几个方法。


Regards,

Leo


在 2023/8/7 17:04, 小昌同学 写道:
各位老师好 
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction<ActionType> {
     PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
         String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
     }

private static Connection getConnection(){
         Connection con=null;
         String driverClass= FlinkConfig.config.getProperty("driverClass");
         String url=FlinkConfig.config.getProperty("jdbcUrl");
         String user=FlinkConfig.config.getProperty("jdbcUser");
         String passWord=FlinkConfig.config.getProperty("passWord");

try {
             Class.forName(driverClass);
             con= DriverManager.getConnection(url,user,passWord);
         } catch (Exception e) {
throw new RuntimeException(e);
         }
return con;
     }

@Override
public void run(SourceContext<ActionType> ctx) throws Exception {
         ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
     ActionType actionType = new ActionType(
             resultSet.getString("action"),
             resultSet.getString("action_name")
     );
     ctx.collect(actionType);
}
     }

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
         }
if (null!=ps){
ps.close();
         }
     }

@Override
public void cancel() {
     }
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复