hi 可以考虑使用 temporal table join [1] Best, Godfrey
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table 小屁孩 <[email protected]> 于2020年6月4日周四 下午5:51写道: > 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去 > 这是我的source > > > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > > > import java.sql.Connection; > import java.sql.PreparedStatement; > import java.sql.ResultSet; > import java.util.HashMap; > import java.util.Map; > > > public class GetMysqlDvcId extends RichSourceFunction<Map<String, > Integer>> { > > > private Connection connection = null; > private PreparedStatement ps = null; > private volatile boolean isRunning = true; > > > > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > String database="db_nssa"; > String host="212.21.12.12"; > String password="saa!"; > String port="3306"; > String username="root"; > > > > > String driver = "com.mysql.jdbc.Driver"; > String url = "jdbc:mysql://" + host + ":" + > port + "/" + database + "?useUnicode=true&characterEncoding=UTF-8"; > connection = > MySQLUtil.getConnection(driver, url, username, password); > > > > > if (this.connection != null) { > String sql = "select > ip,device_id from sys_device"; > ps > =connection.prepareStatement(sql); > } > } > > > @Override > public void run(SourceContext<Map<String, Integer>> > ctx) throws Exception { > Map<String, Integer> map = new > HashMap<>(); > while (isRunning) { > ResultSet resultSet = > ps.executeQuery(); > while (resultSet.next()) { > > map.put(resultSet.getString("ip"),resultSet.getInt("device_id")); > } > > System.out.println("=======select alarm notify from mysql, size = {}, map = > {}"+ map.size()+ map); > ctx.collect(map); > map.clear(); > Thread.sleep(2000 * 60); > } > > > } > > > > > @Override > public void cancel() { > try { > super.close(); > if (connection != null) { > connection.close(); > } > if (ps != null) { > ps.close(); > } > } catch (Exception e) { > > System.out.println("runException:{}"+e); > } > isRunning = false; > } > } > > ------------------ 原始邮件 ------------------ > 发件人: "Michael Ran"<[email protected]>; > 发送时间: 2020年6月4日(星期四) 下午5:22 > 收件人: "user-zh"<[email protected]>; > > 主题: Re:关于flinksql 与维表mysql的关联问题 > > > > 放到open 方法里面可以吗? > 在 2020-06-04 14:15:05,"小屁孩" <[email protected]> 写道: > >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
