??????????open ????????????????????????????????????????source ????????
????????source


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;


public class GetMysqlDvcId extends RichSourceFunction<Map<String, 
Integer&gt;&gt; {


&nbsp; &nbsp; private Connection connection = null;
&nbsp; &nbsp; private PreparedStatement ps = null;
&nbsp; &nbsp; private volatile boolean isRunning = true;




&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String database="db_nssa";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String host="212.21.12.12";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String password="saa!";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String port="3306";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String username="root";




&nbsp; &nbsp; &nbsp; &nbsp; String driver = "com.mysql.jdbc.Driver";
&nbsp; &nbsp; &nbsp; &nbsp; String url = "jdbc:mysql://" + host + ":" + port + 
"/" + database + "?useUnicode=true&amp;characterEncoding=UTF-8";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection = MySQLUtil.getConnection(driver, 
url, username, password);




&nbsp; &nbsp; &nbsp; &nbsp; if (this.connection != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String sql = "select ip,device_id 
from sys_device";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps =connection.prepareStatement(sql);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }


&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void run(SourceContext<Map<String, Integer&gt;&gt; ctx) 
throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; Map<String, Integer&gt; map = new HashMap<&gt;();
&nbsp; &nbsp; &nbsp; &nbsp; while (isRunning) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResultSet resultSet = 
ps.executeQuery();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (resultSet.next()) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("=======select 
alarm notify from mysql, size = {}, map = {}"+ map.size()+ map);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.collect(map);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.clear();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(2000 * 60);
&nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; }




&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void cancel() {
&nbsp; &nbsp; &nbsp; &nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (connection != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ps != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
System.out.println("runException:{}"+e);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; isRunning = false;
&nbsp; &nbsp; }
}

------------------&nbsp;????????&nbsp;------------------
??????:&nbsp;"Michael Ran"<[email protected]&gt;;
????????:&nbsp;2020??6??4??(??????) ????5:22
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re:????flinksql ??????mysql??????????



????open ????????????????
?? 2020-06-04 14:15:05??"??????" <[email protected]&gt; ??????
&gt;dear??&amp;nbsp; &amp;nbsp; ??????????????????,????flinksql??mysql???????? 
????mysql?????????? 
????????????????????????????????mysql??????????????????mysql????????????????????????mysql????????????????????????????????????????????????????????

回复