hi 可以考虑使用 temporal table join [1]

Best,
Godfrey

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table

小屁孩 <[email protected]> 于2020年6月4日周四 下午5:51写道:

> 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
> 这是我的source
>
>
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.util.HashMap;
> import java.util.Map;
>
>
> public class GetMysqlDvcId extends RichSourceFunction<Map<String,
> Integer&gt;&gt; {
>
>
> &nbsp; &nbsp; private Connection connection = null;
> &nbsp; &nbsp; private PreparedStatement ps = null;
> &nbsp; &nbsp; private volatile boolean isRunning = true;
>
>
>
>
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
> &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String database="db_nssa";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String host="212.21.12.12";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String password="saa!";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String port="3306";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String username="root";
>
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; String driver = "com.mysql.jdbc.Driver";
> &nbsp; &nbsp; &nbsp; &nbsp; String url = "jdbc:mysql://" + host + ":" +
> port + "/" + database + "?useUnicode=true&amp;characterEncoding=UTF-8";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection =
> MySQLUtil.getConnection(driver, url, username, password);
>
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; if (this.connection != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String sql = "select
> ip,device_id from sys_device";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps
> =connection.prepareStatement(sql);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; }
>
>
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void run(SourceContext<Map<String, Integer&gt;&gt;
> ctx) throws Exception {
> &nbsp; &nbsp; &nbsp; &nbsp; Map<String, Integer&gt; map = new
> HashMap<&gt;();
> &nbsp; &nbsp; &nbsp; &nbsp; while (isRunning) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResultSet resultSet =
> ps.executeQuery();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (resultSet.next()) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.out.println("=======select alarm notify from mysql, size = {}, map =
> {}"+ map.size()+ map);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.collect(map);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.clear();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(2000 * 60);
> &nbsp; &nbsp; &nbsp; &nbsp; }
>
>
> &nbsp; &nbsp; }
>
>
>
>
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void cancel() {
> &nbsp; &nbsp; &nbsp; &nbsp; try {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (connection != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ps != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.out.println("runException:{}"+e);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; isRunning = false;
> &nbsp; &nbsp; }
> }
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Michael Ran"<[email protected]&gt;;
> 发送时间:&nbsp;2020年6月4日(星期四) 下午5:22
> 收件人:&nbsp;"user-zh"<[email protected]&gt;;
>
> 主题:&nbsp;Re:关于flinksql 与维表mysql的关联问题
>
>
>
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <[email protected]&gt; 写道:
> &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据

回复