??????????open ????????????????????????????????????????source ????????
????????source
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
public class GetMysqlDvcId extends RichSourceFunction<Map<String,
Integer>> {
private Connection connection = null;
private PreparedStatement ps = null;
private volatile boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String database="db_nssa";
String host="212.21.12.12";
String password="saa!";
String port="3306";
String username="root";
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + host + ":" + port +
"/" + database + "?useUnicode=true&characterEncoding=UTF-8";
connection = MySQLUtil.getConnection(driver,
url, username, password);
if (this.connection != null) {
String sql = "select ip,device_id
from sys_device";
ps =connection.prepareStatement(sql);
}
}
@Override
public void run(SourceContext<Map<String, Integer>> ctx)
throws Exception {
Map<String, Integer> map = new HashMap<>();
while (isRunning) {
ResultSet resultSet =
ps.executeQuery();
while (resultSet.next()) {
map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
}
System.out.println("=======select
alarm notify from mysql, size = {}, map = {}"+ map.size()+ map);
ctx.collect(map);
map.clear();
Thread.sleep(2000 * 60);
}
}
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
System.out.println("runException:{}"+e);
}
isRunning = false;
}
}
------------------ ???????? ------------------
??????: "Michael Ran"<[email protected]>;
????????: 2020??6??4??(??????) ????5:22
??????: "user-zh"<[email protected]>;
????: Re:????flinksql ??????mysql??????????
????open ????????????????
?? 2020-06-04 14:15:05??"??????" <[email protected]> ??????
>dear??&nbsp; &nbsp; ??????????????????,????flinksql??mysql????????
????mysql??????????
????????????????????????????????mysql??????????????????mysql????????????????????????mysql????????????????????????????????????????????????????????