像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便

Best,
Shammon FY

On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun <jiabao....@xtransfer.cn.invalid>
wrote:

> Hi,
>
> 可以尝试使用 flink-cdc-connectors 去实时关联。
> 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
> 被关联的表变化不大的话可以考虑 lookup join。
>
> Best,
> Jiabao
>
>
> > 2023年8月8日 上午11:10,小昌同学 <ccc0606fight...@163.com> 写道:
> >
> > 谢谢老师指导呀;
> >
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> > 老师这一块有更好的建议嘛
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> > ---- 回复的原邮件 ----
> > | 发件人 | Shammon FY<zjur...@gmail.com> |
> > | 发送日期 | 2023年8月8日 10:37 |
> > | 收件人 | <user-zh@flink.apache.org> |
> > | 主题 | Re: Flink消费MySQL |
> > Hi,
> >
> > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> >
> > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> >
> > Best,
> > Shammon FY
> >
> > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 <ccc0606fight...@163.com> wrote:
> >
> > 各位老师好
> >
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> > 以下是我的代码:
> > |
> > public class MysqlSource2 extends RichSourceFunction<ActionType> {
> > PreparedStatement ps;
> > private Connection connection;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> > connection = getConnection();
> > String sql="select * from actiontype;";
> > ps = connection.prepareStatement(sql);
> > }
> >
> > private static Connection getConnection(){
> > Connection con=null;
> > String driverClass= FlinkConfig.config.getProperty("driverClass");
> > String url=FlinkConfig.config.getProperty("jdbcUrl");
> > String user=FlinkConfig.config.getProperty("jdbcUser");
> > String passWord=FlinkConfig.config.getProperty("passWord");
> >
> > try {
> > Class.forName(driverClass);
> > con= DriverManager.getConnection(url,user,passWord);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > return con;
> > }
> >
> > @Override
> > public void run(SourceContext<ActionType> ctx) throws Exception {
> > ResultSet resultSet = ps.executeQuery();
> > while (resultSet.next()){
> > ActionType actionType = new ActionType(
> > resultSet.getString("action"),
> > resultSet.getString("action_name")
> > );
> > ctx.collect(actionType);
> > }
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > if (null!=connection){
> > connection.close();
> > }
> > if (null!=ps){
> > ps.close();
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > }
> > };
> >
> >
> > |
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>
>

回复