Re: Flink消费MySQL

2023-08-07 文章 Shammon FY
像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便

Best,
Shammon FY

On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun 
wrote:

> Hi,
>
> 可以尝试使用 flink-cdc-connectors 去实时关联。
> 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
> 被关联的表变化不大的话可以考虑 lookup join。
>
> Best,
> Jiabao
>
>
> > 2023年8月8日 上午11:10,小昌同学  写道:
> >
> > 谢谢老师指导呀;
> >
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> > 老师这一块有更好的建议嘛
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> >  回复的原邮件 
> > | 发件人 | Shammon FY |
> > | 发送日期 | 2023年8月8日 10:37 |
> > | 收件人 |  |
> > | 主题 | Re: Flink消费MySQL |
> > Hi,
> >
> > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> >
> > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> >
> > Best,
> > Shammon FY
> >
> > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> >
> > 各位老师好
> >
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> > 以下是我的代码:
> > |
> > public class MysqlSource2 extends RichSourceFunction {
> > PreparedStatement ps;
> > private Connection connection;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> > connection = getConnection();
> > String sql="select * from actiontype;";
> > ps = connection.prepareStatement(sql);
> > }
> >
> > private static Connection getConnection(){
> > Connection con=null;
> > String driverClass= FlinkConfig.config.getProperty("driverClass");
> > String url=FlinkConfig.config.getProperty("jdbcUrl");
> > String user=FlinkConfig.config.getProperty("jdbcUser");
> > String passWord=FlinkConfig.config.getProperty("passWord");
> >
> > try {
> > Class.forName(driverClass);
> > con= DriverManager.getConnection(url,user,passWord);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > return con;
> > }
> >
> > @Override
> > public void run(SourceContext ctx) throws Exception {
> > ResultSet resultSet = ps.executeQuery();
> > while (resultSet.next()){
> > ActionType actionType = new ActionType(
> > resultSet.getString("action"),
> > resultSet.getString("action_name")
> > );
> > ctx.collect(actionType);
> > }
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > if (null!=connection){
> > connection.close();
> > }
> > if (null!=ps){
> > ps.close();
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > }
> > };
> >
> >
> > |
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>
>


Re: Flink消费MySQL

2023-08-07 文章 Jiabao Sun
Hi,

可以尝试使用 flink-cdc-connectors 去实时关联。
使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
被关联的表变化不大的话可以考虑 lookup join。

Best,
Jiabao


> 2023年8月8日 上午11:10,小昌同学  写道:
> 
> 谢谢老师指导呀;
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> 老师这一块有更好的建议嘛
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年8月8日 10:37 |
> | 收件人 |  |
> | 主题 | Re: Flink消费MySQL |
> Hi,
> 
> 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> 
> 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> 
> Best,
> Shammon FY
> 
> On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> 
> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
> 
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
> 
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
> 
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
> 
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
> 
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
> 
> @Override
> public void cancel() {
> }
> };
> 
> 
> |
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |



回复: Flink消费MySQL

2023-08-07 文章 小昌同学
谢谢老师指导呀;
我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
老师这一块有更好的建议嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年8月8日 10:37 |
| 收件人 |  |
| 主题 | Re: Flink消费MySQL |
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: Flink消费MySQL

2023-08-07 文章 Shammon FY
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
>
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
>
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
>
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
>
> @Override
> public void cancel() {
> }
> };
>
>
> |
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re:Re: flink作业如何从yarn平滑迁移到k8s?

2023-08-07 文章 casel.chen






如果不是用的flink kubernetes operator或者hdfs和oss系统网络无法直接连通怎么办?
有没有办法读取hdfs的checkpoint/savepoint然后再另存为oss的checkpoint/savepoint呢?谢谢!











在 2023-08-07 10:33:25,"Ruibin Xing"  写道:
>你好,
>
>如果你们也使用的是官方的Flink Kubernetes
>Operator,可以参考我们迁移的经验:迁移的时候设置FlinkDeployment的initalSavepoint为HDFS上Savepoint的地址,同时配置savepoint/checkpoint目录为OSS。这样Flink启动的时候会从HDFS中的状态恢复,并将新的checkpoint保存在oss中。
>
>On Sun, Aug 6, 2023 at 10:03 PM casel.chen  wrote:
>
>> flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on
>> k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!


Re: flink1.14.5 sql-client无法查询hbase1.4.3数据

2023-08-07 文章 Shammon FY
Hi,

看着像是版本冲突了,你有在你的flink session集群目录里放hbase的包吗?可以检查一下跟flink hbase
shaded的hbase版本是否一致

Best,
Shammon FY

On Sat, Aug 5, 2023 at 9:33 PM 杨东树  wrote:

> 各位好,
>目前使用sql-client查询hbase数据时,无法查询成功,麻烦指导下,谢谢。
>复现方法:
> 1、hbase操作:
> hbase(main):005:0> create 'flink_to_hbase','cf1'
> 0 row(s) in 2.2900 seconds
> hbase(main):006:0> put 'flink_to_hbase', 'rk0001', 'cf1:username',
> 'zhangsan'
> 0 row(s) in 0.0510 seconds
>
>
> 2、flink操作:
> ./start-cluster.sh
> ./sql-client.sh
> CREATE TABLE flink_to_hbase(
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )WITH(
> 'connector'='hbase-1.4',
> 'table-name'='flink_to_hbase',
> 'zookeeper.quorum'='192.168.21.128:2181',
> 'zookeeper.znode.parent'='/hbase'
> );
>
>
> 3、flink 报错日志:
> 2023-08-05 21:00:35,081 INFO  org.apache.flink.table.client.cli.CliClient
> [] - Command history file path: /root/.flink-sql-history
> 2023-08-05 21:00:52,011 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:52,026 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (0c147bc0da5a43a5a382f2ec20740b45).
> 2023-08-05 21:00:52,480 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (0c147bc0da5a43a5a382f2ec20740b45) to '
> http://localhost:8081'.
> 2023-08-05 21:00:55,809 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:55,830 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,481 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,484 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (d29904103fa3c83e3089c09f093372c9).
> 2023-08-05 21:07:52,728 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (d29904103fa3c83e3089c09f093372c9) to '
> http://localhost:8081'.
> 2023-08-05 21:07:55,972 WARN  org.apache.flink.table.client.cli.CliClient
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:211)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:231)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:532)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:423)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> 

Flink消费MySQL

2023-08-07 文章 小昌同学
各位老师好 
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|