Re: Flink消费MySQL
像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便 Best, Shammon FY On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun wrote: > Hi, > > 可以尝试使用 flink-cdc-connectors 去实时关联。 > 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。 > 被关联的表变化不大的话可以考虑 lookup join。 > > Best, > Jiabao > > > > 2023年8月8日 上午11:10,小昌同学 写道: > > > > 谢谢老师指导呀; > > > 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 > > 老师这一块有更好的建议嘛 > > > > > > | | > > 小昌同学 > > | > > | > > ccc0606fight...@163.com > > | > > 回复的原邮件 > > | 发件人 | Shammon FY | > > | 发送日期 | 2023年8月8日 10:37 | > > | 收件人 | | > > | 主题 | Re: Flink消费MySQL | > > Hi, > > > > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 > > > > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded > > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 > > > > Best, > > Shammon FY > > > > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 wrote: > > > > 各位老师好 > > > ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; > > 以下是我的代码: > > | > > public class MysqlSource2 extends RichSourceFunction { > > PreparedStatement ps; > > private Connection connection; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > connection = getConnection(); > > String sql="select * from actiontype;"; > > ps = connection.prepareStatement(sql); > > } > > > > private static Connection getConnection(){ > > Connection con=null; > > String driverClass= FlinkConfig.config.getProperty("driverClass"); > > String url=FlinkConfig.config.getProperty("jdbcUrl"); > > String user=FlinkConfig.config.getProperty("jdbcUser"); > > String passWord=FlinkConfig.config.getProperty("passWord"); > > > > try { > > Class.forName(driverClass); > > con= DriverManager.getConnection(url,user,passWord); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > return con; > > } > > > > @Override > > public void run(SourceContext ctx) throws Exception { > > ResultSet resultSet = ps.executeQuery(); > > while (resultSet.next()){ > > ActionType actionType = new ActionType( > > resultSet.getString("action"), > > resultSet.getString("action_name") > > ); > > ctx.collect(actionType); > > } > > } > > > > @Override > > public void close() throws Exception { > > super.close(); > > if (null!=connection){ > > connection.close(); > > } > > if (null!=ps){ > > ps.close(); > > } > > } > > > > @Override > > public void cancel() { > > } > > }; > > > > > > | > > > > > > | | > > 小昌同学 > > | > > | > > ccc0606fight...@163.com > > | > >
Re: Flink消费MySQL
Hi, 可以尝试使用 flink-cdc-connectors 去实时关联。 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。 被关联的表变化不大的话可以考虑 lookup join。 Best, Jiabao > 2023年8月8日 上午11:10,小昌同学 写道: > > 谢谢老师指导呀; > 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 > 老师这一块有更好的建议嘛 > > > | | > 小昌同学 > | > | > ccc0606fight...@163.com > | > 回复的原邮件 > | 发件人 | Shammon FY | > | 发送日期 | 2023年8月8日 10:37 | > | 收件人 | | > | 主题 | Re: Flink消费MySQL | > Hi, > > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 > > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 > > Best, > Shammon FY > > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 wrote: > > 各位老师好 > ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; > 以下是我的代码: > | > public class MysqlSource2 extends RichSourceFunction { > PreparedStatement ps; > private Connection connection; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > connection = getConnection(); > String sql="select * from actiontype;"; > ps = connection.prepareStatement(sql); > } > > private static Connection getConnection(){ > Connection con=null; > String driverClass= FlinkConfig.config.getProperty("driverClass"); > String url=FlinkConfig.config.getProperty("jdbcUrl"); > String user=FlinkConfig.config.getProperty("jdbcUser"); > String passWord=FlinkConfig.config.getProperty("passWord"); > > try { > Class.forName(driverClass); > con= DriverManager.getConnection(url,user,passWord); > } catch (Exception e) { > throw new RuntimeException(e); > } > return con; > } > > @Override > public void run(SourceContext ctx) throws Exception { > ResultSet resultSet = ps.executeQuery(); > while (resultSet.next()){ > ActionType actionType = new ActionType( > resultSet.getString("action"), > resultSet.getString("action_name") > ); > ctx.collect(actionType); > } > } > > @Override > public void close() throws Exception { > super.close(); > if (null!=connection){ > connection.close(); > } > if (null!=ps){ > ps.close(); > } > } > > @Override > public void cancel() { > } > }; > > > | > > > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |
回复: Flink消费MySQL
谢谢老师指导呀; 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 老师这一块有更好的建议嘛 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年8月8日 10:37 | | 收件人 | | | 主题 | Re: Flink消费MySQL | Hi, 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 Best, Shammon FY On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 wrote: 各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com |
Re: Flink消费MySQL
Hi, 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 Best, Shammon FY On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 wrote: > 各位老师好 > ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; > 以下是我的代码: > | > public class MysqlSource2 extends RichSourceFunction { > PreparedStatement ps; > private Connection connection; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > connection = getConnection(); > String sql="select * from actiontype;"; > ps = connection.prepareStatement(sql); > } > > private static Connection getConnection(){ > Connection con=null; > String driverClass= FlinkConfig.config.getProperty("driverClass"); > String url=FlinkConfig.config.getProperty("jdbcUrl"); > String user=FlinkConfig.config.getProperty("jdbcUser"); > String passWord=FlinkConfig.config.getProperty("passWord"); > > try { > Class.forName(driverClass); > con= DriverManager.getConnection(url,user,passWord); > } catch (Exception e) { > throw new RuntimeException(e); > } > return con; > } > > @Override > public void run(SourceContext ctx) throws Exception { > ResultSet resultSet = ps.executeQuery(); > while (resultSet.next()){ > ActionType actionType = new ActionType( > resultSet.getString("action"), > resultSet.getString("action_name") > ); > ctx.collect(actionType); > } > } > > @Override > public void close() throws Exception { > super.close(); > if (null!=connection){ > connection.close(); > } > if (null!=ps){ > ps.close(); > } > } > > @Override > public void cancel() { > } > }; > > > | > > > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |
Re:Re: flink作业如何从yarn平滑迁移到k8s?
如果不是用的flink kubernetes operator或者hdfs和oss系统网络无法直接连通怎么办? 有没有办法读取hdfs的checkpoint/savepoint然后再另存为oss的checkpoint/savepoint呢?谢谢! 在 2023-08-07 10:33:25,"Ruibin Xing" 写道: >你好, > >如果你们也使用的是官方的Flink Kubernetes >Operator,可以参考我们迁移的经验:迁移的时候设置FlinkDeployment的initalSavepoint为HDFS上Savepoint的地址,同时配置savepoint/checkpoint目录为OSS。这样Flink启动的时候会从HDFS中的状态恢复,并将新的checkpoint保存在oss中。 > >On Sun, Aug 6, 2023 at 10:03 PM casel.chen wrote: > >> flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on >> k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!
Re: flink1.14.5 sql-client无法查询hbase1.4.3数据
Hi, 看着像是版本冲突了,你有在你的flink session集群目录里放hbase的包吗?可以检查一下跟flink hbase shaded的hbase版本是否一致 Best, Shammon FY On Sat, Aug 5, 2023 at 9:33 PM 杨东树 wrote: > 各位好, >目前使用sql-client查询hbase数据时,无法查询成功,麻烦指导下,谢谢。 >复现方法: > 1、hbase操作: > hbase(main):005:0> create 'flink_to_hbase','cf1' > 0 row(s) in 2.2900 seconds > hbase(main):006:0> put 'flink_to_hbase', 'rk0001', 'cf1:username', > 'zhangsan' > 0 row(s) in 0.0510 seconds > > > 2、flink操作: > ./start-cluster.sh > ./sql-client.sh > CREATE TABLE flink_to_hbase( > rowkey STRING, > cf1 ROW, > PRIMARY KEY (rowkey) NOT ENFORCED > )WITH( > 'connector'='hbase-1.4', > 'table-name'='flink_to_hbase', > 'zookeeper.quorum'='192.168.21.128:2181', > 'zookeeper.znode.parent'='/hbase' > ); > > > 3、flink 报错日志: > 2023-08-05 21:00:35,081 INFO org.apache.flink.table.client.cli.CliClient > [] - Command history file path: /root/.flink-sql-history > 2023-08-05 21:00:52,011 INFO > org.apache.flink.configuration.Configuration [] - Config > uses fallback configuration key 'jobmanager.rpc.address' instead of key > 'rest.address' > 2023-08-05 21:00:52,026 INFO > org.apache.flink.client.program.rest.RestClusterClient [] - > Submitting job 'collect' (0c147bc0da5a43a5a382f2ec20740b45). > 2023-08-05 21:00:52,480 INFO > org.apache.flink.client.program.rest.RestClusterClient [] - > Successfully submitted job 'collect' (0c147bc0da5a43a5a382f2ec20740b45) to ' > http://localhost:8081'. > 2023-08-05 21:00:55,809 INFO > org.apache.flink.configuration.Configuration [] - Config > uses fallback configuration key 'jobmanager.rpc.address' instead of key > 'rest.address' > 2023-08-05 21:00:55,830 INFO > org.apache.flink.configuration.Configuration [] - Config > uses fallback configuration key 'jobmanager.rpc.address' instead of key > 'rest.address' > 2023-08-05 21:07:52,481 INFO > org.apache.flink.configuration.Configuration [] - Config > uses fallback configuration key 'jobmanager.rpc.address' instead of key > 'rest.address' > 2023-08-05 21:07:52,484 INFO > org.apache.flink.client.program.rest.RestClusterClient [] - > Submitting job 'collect' (d29904103fa3c83e3089c09f093372c9). > 2023-08-05 21:07:52,728 INFO > org.apache.flink.client.program.rest.RestClusterClient [] - > Successfully submitted job 'collect' (d29904103fa3c83e3089c09f093372c9) to ' > http://localhost:8081'. > 2023-08-05 21:07:55,972 WARN org.apache.flink.table.client.cli.CliClient > [] - Could not execute SQL statement. > org.apache.flink.table.client.gateway.SqlExecutionException: Could not > execute SQL statement. > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:211) > ~[flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:231) > ~[flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:532) > ~[flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:423) > ~[flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231] > at > org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) > [flink-sql-client_2.11-1.14.5.jar:1.14.5] > Caused by: org.apache.flink.table.api.TableException: Failed to execute sql > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) > ~[flink-table_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) > ~[flink-table_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209) > ~[flink-sql-client_2.11-1.14.5.jar:1.14.5] > at > org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) > ~[flink-sql-client_2.11-1.14.5.jar:1.14.5] > at >
Flink消费MySQL
各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com |