Flink on K8s statebackend 配置

2020-09-29 文章 superainbower
Hi,all
   请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作?
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:8020/flink/checkpoints
state.savepoints.dir: hdfs://master:8020/flink/savepoints
state.backend.incremental: true


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制



Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 tison
Hi Yang,

你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?

如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。

Best,
tison.


Yang Peng  于2020年9月30日周三 上午10:29写道:

> 感谢回复,我们看了consumer的lag很小
> 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> 而且任务重启了没法jstack判断了
>
> hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>
> >
> >
> >
> > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > 也可以 jstack 采下堆栈看下,GC等看下。
> > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > Best,
> > Hailong Wang
> > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > >
> > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > >
> > >>
> > >>
> > >>
> > >> Hi Yang Peng:
> > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > >> Best,
> > >> Hailong Wang
> > >>
> > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > >> >kafka消费没有积压,也没有反压,
> > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > >>
> >
>


Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-29 文章 Tianwang Li
这种有窗口统计没有影响吧?


刘建刚  于2020年9月30日周三 下午2:25写道:

> 修复方案参考https://github.com/apache/flink/pull/11830
>
> kandy.wang  于2020年9月30日周三 下午2:19写道:
>
> > group agg 开启了mini batch之后,state ttl不生效的问题:
> >
> >
> > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> > 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到
> 十几万。
> >
> >
> > sql-client-defaults.yaml对应的参数应该是这2个吧:
> > # minimum idle state retention in ms
> > min-idle-state-retention: 0
> > # maximum idle state retention in ms
> > max-idle-state-retention: 0
> > 这个现在进展如何了,这个社区打算什么时候支持
> >
> >
> >
> >
>


-- 
**
 tivanli
**


Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-29 文章 刘建刚
修复方案参考https://github.com/apache/flink/pull/11830

kandy.wang  于2020年9月30日周三 下午2:19写道:

> group agg 开启了mini batch之后,state ttl不生效的问题:
>
>
> 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。
>
>
> sql-client-defaults.yaml对应的参数应该是这2个吧:
> # minimum idle state retention in ms
> min-idle-state-retention: 0
> # maximum idle state retention in ms
> max-idle-state-retention: 0
> 这个现在进展如何了,这个社区打算什么时候支持
>
>
>
>


group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-29 文章 kandy.wang
group agg 开启了mini batch之后,state ttl不生效的问题:


现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink 
算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。


sql-client-defaults.yaml对应的参数应该是这2个吧:
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
这个现在进展如何了,这个社区打算什么时候支持





退订

2020-09-29 文章 提运亨
退订

回复: Flink 1.11 table.executeInsert 程序退出

2020-09-29 文章 史 正超
这个是一个已知问题,可以看看这个jira: https://issues.apache.org/jira/browse/FLINK-18545
规避这个问题的话,可以不用执行 tableEnv.execute("jobname"); 直接用 executeSql 
就可以了,遇到INSERT语句就能生成job了。

发件人: HunterXHunter <1356469...@qq.com>
发送时间: 2020年9月30日 2:32
收件人: user-zh@flink.apache.org 
主题: Flink 1.11 table.executeInsert 程序退出

当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);

tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(1L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 回复:关于flink sql cdc

2020-09-29 文章 史 正超
HI, Kyle Zhang, 我刚才重现了你的问题,虽然你的mysql 
binlog设置是ROW格式,但是不排除其它session更改了binlog_format格式。重现步骤:

  1.  登录mysql客户端(注意用cmd登录) 执行语句, SET SESSION binlog_format='MIXED'; SET SESSION 
tx_isolation='REPEATABLE-READ'; COMMIT;
  2.  随便update或者insert一条语句。

然后就得到了和你一样的错误:
2020-09-30 10:46:37.607 [debezium-engine] ERROR 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction  - Reporting error:
org.apache.kafka.connect.errors.ConnectException: Received DML 'update orders 
set product_id = 1122 where order_number = 10001' for processing, binlog 
probably contains events generated with statement or mixed based replication 
format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML 
'update orders set product_id = 1122 where order_number = 10001' for 
processing, binlog probably contains events generated with statement or mixed 
based replication format
at 
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
... 5 common frames omitted

所以应该是其它session更忙了binlog_format格式,并且事务隔离级别为 REPEATABLE-READ
希望对你有帮助,
best,
shizhengchao

发件人: 谢治平 
发送时间: 2020年9月30日 1:25
收件人: user-zh 
抄送: user-zh 
主题: 回复:关于flink sql cdc

能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:xiezhiping...@163.com
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Flink 1.11 table.executeInsert 程序退出

2020-09-29 文章 HunterXHunter
当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);
 
tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(1L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 Yang Peng
感谢回复,我们看了consumer的lag很小
而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
而且任务重启了没法jstack判断了

hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:

>
>
>
> 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> 也可以 jstack 采下堆栈看下,GC等看下。
> 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> Best,
> Hailong Wang
> 在 2020-09-29 20:06:50,"Yang Peng"  写道:
>
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> >
> >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> >
> >>
> >>
> >>
> >> Hi Yang Peng:
> >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> >> 2. Source 的序列化耗时严重,导致拉取变慢。
> >> 可以尝试着扩kafka 分区,加大Source并发看下。
> >> Best,
> >> Hailong Wang
> >>
> >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> >> >kafka消费没有积压,也没有反压,
> 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> >>
>


flink1.11.1 kafka eos checkpoints??????Timeout expired after 600000milliseconds

2020-09-29 文章 ????????
flink 
1.9kafkaSemantic??EXACTLY_ONCE1.11?? 
,checkpoints??InitProducerId??TransactionTimeout??MaxBlockMS
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
60milliseconds while awaiting InitProducerId
kafkakafka??offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

回复:关于flink sql cdc

2020-09-29 文章 谢治平
能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:xiezhiping...@163.com
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 文章 Kyle Zhang
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 文章 Kyle Zhang
代码部分基本没有什么东西

public class CDC {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

String ddl = "CREATE TABLE mysql_binlog (\n" +
" id INT NOT NULL,\n" +
" emp_name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'xxx',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'eric',\n" +
" 'table-name' = 'employee1'\n" +
")";
tEnv.executeSql(ddl);

tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')\n" +
"LIKE mysql_binlog (EXCLUDING ALL)");

tEnv.executeSql("insert into print_table select *  from mysql_binlog");
}
}


08:30:19,945 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 0: disabling autocommit, enabling repeatable read
transactions, and setting lock wait timeout to 10
08:30:19,964 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 1: flush and obtain global read lock to prevent
writes to database
08:30:19,982 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 2: start transaction with consistent snapshot
08:30:19,985 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 3: read binlog position of MySQL master
08:30:19,989 INFO  io.debezium.connector.mysql.SnapshotReader
 [] -using binlog 'binlog.001254' at position '152522471'
and gtid ''
08:30:19,989 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 4: read list of available databases
08:30:19,995 INFO  io.debezium.connector.mysql.SnapshotReader
 [] -list of available databases is: [一堆database]
08:30:19,995 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 5: read list of available tables in each database

。。。一堆table

08:30:20,198 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 6: generating DROP and CREATE statements to reflect
current database schemas:
08:30:20,918 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 7: releasing global read lock to enable MySQL
writes
08:30:20,926 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 7: blocked writes to MySQL for a total of
00:00:00.948
08:30:20,927 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: scanning contents of 1 tables while still in
transaction
08:30:20,937 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: - scanning table 'eric.employee1' (1 of 1
tables)
08:30:20,937 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - For table 'eric.employee1' using select statement:
'SELECT * FROM `eric`.`employee1`'
08:30:20,949 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: - Completed scanning a total of 5 rows from
table 'eric.employee1' after 00:00:00.012
08:30:20,950 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: scanned 5 rows in 1 tables in 00:00:00.022
08:30:20,950 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 9: committing transaction
08:30:20,954 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Completed snapshot in 00:00:01.03
08:30:21,391 INFO
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer []
- Database snapshot phase can't perform checkpoint, acquired
Checkpoint lock.
+I(1,Eric,23)
+I(2,Jack,22)
+I(3,Amy,33)
+I(4,Dell,12)
08:30:21,392 INFO
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer []
- Received record from streaming binlog phase, released checkpoint
lock.
+I(5,Hello,44)


之后就会报其他表update或者insert的错

09:18:54,326 ERROR io.debezium.connector.mysql.BinlogReader
 [] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'UPDATE
triggers SET trigger_source='SimpleTimeTrigger',
modify_time=1601428734343, enc_type=2,
data=x'1F8B0800E553C98E1A3110FD179F120925CD9209C38D214D42C4C088E5308A46C8B80B7070DB1DDB4D60D0FC7BAA7A61114CA228C7D0175CAFB657AF6ACF5C3A8FA59F3AB0ACC5F8F39ACFB96615E63CF7A943D3286C7F7A448305077EA8C36D222DA0DDDB14C82F0B9FC8986CD50F41F33668DE3403FC2128F5C2B0D6FE05DD4C6A05B98C659C2820FF8995CB2556AD30C872B685974663C96F4F15CC9E811DA3234966CC426ED884CB5EEC8E3B29284D6705620D76567D071BAEDEBCC57C223751AA7DF9F8EAF22432BA1A8C511EDFCF4653936D27F9FBF18AEBE58A4BE2E14620526BA55E9E305FCBE4813B9FE57047FB42DAC2F8CA54346CCFF19BA0DAA8078D124FC04A436DD68398FAD

Re: Re: sql-cli执行sql报错

2020-09-29 文章 hl9...@126.com
没有修改kafka,就用官方的jar。后来我用1.11.2版本重新尝试了下,成功了,没有任何错误。
这个问题就不纠结了



hl9...@126.com
 
发件人: Benchao Li
发送时间: 2020-09-29 18:17
收件人: user-zh
主题: Re: Re: sql-cli执行sql报错
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?
 
hl9...@126.com  于2020年9月28日周一 下午6:06写道:
 
> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>
 
 
-- 
 
Best,
Benchao Li


flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-29 文章 王刚
这个问题我们之前使用sql窗口的时候也遇到过,当时是在1.7版本的tablesource后面加了个rebanlance算子让数据少的kafka分区的subtask
 watermark均衡下

发送自autohome

发件人: Benchao Li mailto:libenc...@apache.org>>
发送时间: 2020-09-29 18:10:42
收件人: user-zh mailto:user-zh@flink.apache.org>>
主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道:

> 你好!
>      我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
>      使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



--

Best,
Benchao Li


Re: Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章 刘大龙
Hi, MiniBatch Agg目前没有实现State 
TTl,我提了个PR修复这个问题,参考https://github.com/apache/flink/pull/11830
@Jark,辛苦有空时帮忙reveiw一下代码,这个问题越来越多用户用户遇到了。


> -原始邮件-
> 发件人: "刘建刚" 
> 发送时间: 2020-09-29 18:27:47 (星期二)
> 收件人: user-zh 
> 抄送: 
> 主题: Re: 回复: BLinkPlanner sql join状态清理
> 
> miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830
> 
> Benchao Li  于2020年9月29日周二 下午5:18写道:
> 
> > Hi Ericliuk,
> >
> > 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> > 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
> >
> > Ericliuk  于2020年9月29日周二 下午4:59写道:
> >
> > > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > > <
> > >
> > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> > >
> > >
> > >
> > > 不太清楚为什么用了mini batch就没读取这个配置。
> > > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


--

Best


Re:Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 hailongwang



不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
也可以 jstack 采下堆栈看下,GC等看下。
至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
Best,
Hailong Wang
在 2020-09-29 20:06:50,"Yang Peng"  写道:
>感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
>flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
>
>hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
>
>>
>>
>>
>> Hi Yang Peng:
>> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
>> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
>> 2. Source 的序列化耗时严重,导致拉取变慢。
>> 可以尝试着扩kafka 分区,加大Source并发看下。
>> Best,
>> Hailong Wang
>>
>> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
>> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>> >kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>>


Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 Yang Peng
感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?

hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:

>
>
>
> Hi Yang Peng:
> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> 2. Source 的序列化耗时严重,导致拉取变慢。
> 可以尝试着扩kafka 分区,加大Source并发看下。
> Best,
> Hailong Wang
>
> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> >kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>


Re:Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 hailongwang



Hi Yang Peng:
根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
2. Source 的序列化耗时严重,导致拉取变慢。
可以尝试着扩kafka 分区,加大Source并发看下。
Best,
Hailong Wang

在 2020-09-29 19:44:44,"Yang Peng"  写道:
>请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、


Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 Yang Peng
请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、


Re: Re: flink使用在docker环境中部署出现的两个问题

2020-09-29 文章 cxydeve...@163.com
你好,我这边看到您在另一个问题[1]中有做了相关的回答,
我在k8s上部署是遇到相同的问题,相同的错误,您这边是否有空帮忙试试看是不是flink-docker的bug, 还是我的什么配置错了
我也发出了一个新的问题[2]



[1]:http://apache-flink.147419.n8.nabble.com/flink-1-11-on-kubernetes-td4586.html#a4692
[2]:http://apache-flink.147419.n8.nabble.com/flink1-11-2-k8s-volume-Read-only-file-system-td7555.html




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-29 文章 zheng faaron
Hi,

可以检查一下这个参数是否设置正确,也可以在jobmanager页面上看下是否有这个参数。我之前遇到过类似问题,设置这个参数可以解决问题。

Best,
Faaron Zheng


From: jy l 
Sent: Monday, September 28, 2020 4:57:46 PM
To: user-zh@flink.apache.org 
Subject: Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the 
maximum akka framesize

如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。

李加燕  于2020年9月28日周一 下午3:07写道:

> Flink batch 模式消费hdfs上的文件,并做了一个word count
> 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
> java.lang.reflect.UndeclaredThrowableException: null
> at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource)
> ~[?:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds
> the maximum akka framesize.
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> ... 28 more
> 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
> 请求帮助。


Re: flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-09-29 文章 cxydeve...@163.com
官网例子[1]没有改动之前,是可以正常启动的
原来的配置文件内容如下
...
"volumes": [
  {
"name": "flink-config-volume",
"configMap": {
  "name": "flink-config",
  "items": [
{
  "key": "flink-conf.yaml",
  "path": "flink-conf.yaml"
},
{
  "key": "log4j-console.properties",
  "path": "log4j-console.properties"
}
  ],
  "defaultMode": 420
}
  }
],
...
"volumeMounts": [
  {
"name": "flink-config-volume",
"mountPath": "/opt/flink/conf"
  }
],
...

后面在k8s上修改了yaml文件,就是增加了其他volumes
然后增加uploadjar, completed-jobs和lib的挂载,如下
...
"volumes": [
  {
"name": "flink-config-volume",
"configMap": {
  "name": "flink-config",
  "items": [
{
  "key": "flink-conf.yaml",
  "path": "flink-conf.yaml"
},
{
  "key": "log4j-console.properties",
  "path": "log4j-console.properties"
}
  ],
  "defaultMode": 420
}
  },
  {
"name": "flink-uploadjar-volume",
"hostPath": {
  "path": "/data/volumes/flink/jobmanager/uploadjar",
  "type": ""
}
  },
  {
"name": "flink-completejobs-volume",
"hostPath": {
  "path": "/data/volumes/flink/jobmanager/completed-jobs/",
  "type": ""
}
  },
  {
"name": "libs-volume",
"hostPath": {
  "path": "/data/volumes/flink/jobmanager/lib",
  "type": ""
}
  }
],
...
"volumeMounts": [
  {
"name": "flink-config-volume",
"mountPath": "/opt/flink/conf"
  },
  {
"name": "flink-uploadjar-volume",
"mountPath": "/opt/flink/flink-uploadjar"
  },
  {
"name": "flink-completejobs-volume",
"mountPath": "/opt/flink/completed-jobs/"
  },
  {
"name": "libs-volume",
"mountPath": "/opt/flink/lib"
  }
],
...
增加了volume之后就报了错误
2020-09-29T12:09:33.055804861Z Starting Job Manager
2020-09-29T12:09:33.061359237Z sed: couldn't open temporary file
/opt/flink/conf/sedVef7YR: Read-only file system
2020-09-29T12:09:33.06561576Z sed: couldn't open temporary file
/opt/flink/conf/sed4a7zGR: Read-only file system
2020-09-29T12:09:33.068683501Z /docker-entrypoint.sh: 72:
/docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml:
Permission denied
2020-09-29T12:09:33.068700999Z /docker-entrypoint.sh: 91:
/docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml.tmp:
Read-only file system
2020-09-29T12:09:33.919147511Z Starting standalonesession as a console
application on host flink-jobmanager-6d5bc45dbb-jjs4f.
2020-09-29T12:09:34.154740531Z log4j:WARN No appenders could be found for
logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
2020-09-29T12:09:34.154769537Z log4j:WARN Please initialize the log4j system
properly.
2020-09-29T12:09:34.154776198Z log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

经过排查,发现是在挂载了/opt/flink/lib之后就会出错,挂载其他目录是可以正常运行的/opt/flink/flink-uploadjar,/opt/flink/completed-jobs/

就是增加了下面这个配置出现的错误
"volumeMounts": [
  ...
  {
"name": "libs-volume",
"mountPath": "/opt/flink/lib"
  }
  ...
],



[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions




--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于flink sql cdc

2020-09-29 文章 Kyle Zhang
Hi,all
  今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 emp_name STRING,
 age INT
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'xxx',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'empoylee1'
);
结果直接用print table
运行一段时间后报错
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Error during binlog processing. Last offset stored = null, binlog
reader near position = binlog.001254/132686776
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT INTO
execution_flows (project_id, flow_id, version, status, submit_time,
submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
'INSERT INTO execution_flows (project_id, flow_id, version, status,
submit_time, submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 5 more

sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题

Best,
Kyle Zhang


Re: 回复: flink sql count问题

2020-09-29 文章 Robin Zhang
Hi lemon,
不是很理解你的疑问是什么,flink是事件驱动的,所以,来一条数据,就会被处理,走你的逻辑,就会产生一个结果,如果是第一次出现的key,只有一条数据,如果是状态中已经存在的key,会在控制台输出两条数据,一条true的是最终sink的结果。所以,每次输出一条结果有什么问题吗?


Best,
Robin



lemon wrote
> 感谢各位的回答,各位的方法我都试了一下,但是都会在下游输出一条结果,一条不符合条件的语句count会在下游输出0
> 我理解是flink中的count对于每一条数据都会输出一条结果,所以只能把if中的判断条件再放到最后的where中进行过滤
> 类似于 select count(if(name like '南京%',1 , null)) where name
> like '南京%'  or name like '杭州%'  group by ** 这样
> 
> 
> -- 原始邮件 --
> 发件人:  
>  
> "user-zh" 
>   
> <

> vincent2015qdlg@

> >;
> 发送时间: 2020年9月29日(星期二) 下午5:32
> 收件人: "user-zh"<

> user-zh@.apache

> >;
> 
> 主题: Re: flink sql count问题
> 
> 
> 
> Hi lemon,
>     内部判断if函数可以替换为case when
> 
> Best,
> Robin
> 
> 
> lemon wrote
> > 请教各位:
> > 我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
> > 之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink
> > sql中count不能为null,有什么别的方法能实现该功能吗?
> > 使用的是flink1.10.1 blink
> >  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 文章 Jun Zhang
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。



Best  Jun


-- 原始邮件 --
发件人: me 

flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 文章 me
flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
tableEnv.executeSql("insert into dwd_security_log select * from " + table)
实际写入hive之后,查看hdfs上写入的文件为19M,这是60秒内写入hive的,flink流式写入hive通过checkpotin来把数据刷入hive中。


请问大家只有有什么提升写入速度的参数或者方式吗?

Re: pyflink1.11 window groupby出错

2020-09-29 文章 Xingbo Huang
Hello,

现在的descriptor的方式存在很多bug,社区已经在进行重构了。当前你可以使用DDL[1]的方式来解决问题。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
Best,
Xingbo

刘乘九  于2020年9月29日周二 下午5:46写道:

> 各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group
> 方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?
>
> 错误信息:
> py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
>
>
>
>
> demo程序:
> from pyflink.datastream import *
> from pyflink.table import *
> from pyflink.table.descriptors import *
> from pyflink.table.descriptors import Json
> from pyflink.table.window import *
>
> test_out_put_data_path = r'D:\test_doc\test_result_data.csv'
>
> s_nev = StreamExecutionEnvironment.get_execution_environment()
> s_nev.set_parallelism(3)
> st_nev = StreamTableEnvironment.create(s_nev,
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>
> st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
> cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092,
> cdh5:9092")) \
> .with_format(Json()
>  .fail_on_missing_field(False)
>  .schema(DataTypes.ROW([DataTypes.FIELD('time',
> DataTypes.TIMESTAMP(3)),
>
> DataTypes.FIELD('prev_page',DataTypes.STRING()),
> DataTypes.FIELD('page',
> DataTypes.STRING()),
> DataTypes.FIELD("app",
> DataTypes.STRING()),
>
> DataTypes.FIELD("nextApp",DataTypes.STRING()),
>
> DataTypes.FIELD("service",DataTypes.STRING()),
>
> DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
> .with_schema(Schema().
> field('prev_page', DataTypes.STRING())
>  .field('page', DataTypes.STRING())
>  .field('app', DataTypes.STRING())
>  .field('nextApp', DataTypes.STRING())
>  .field('service', DataTypes.STRING())
>  .field('userId', DataTypes.BIGINT())
>  .field('time', DataTypes.TIMESTAMP(3))
>  .rowtime(Rowtime()
>   .timestamps_from_field('time')
>   .watermarks_periodic_bounded(6)))\
> .in_append_mode()\
> .create_temporary_table('raw_web_log_data')
>
>
> st_nev.connect(FileSystem().path(test_out_put_data_path))\
> .with_format(OldCsv()
>  .field_delimiter(',')
>  .field("userId", DataTypes.BIGINT())
>  .field('dataCount', DataTypes.BIGINT())
>  .field('count_time', DataTypes.TIMESTAMP(3))
>  )\
> .with_schema(Schema()
>  .field('userId', DataTypes.BIGINT())
>  .field('dataCount', DataTypes.BIGINT())
>  .field('count_time', DataTypes.TIMESTAMP(3))
>  )\
> .create_temporary_table('test_out_put')
>
>
> if __name__ == '__main__':
> st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
> w').select('userId, page.count as d, w.end').execute_insert('test_out_put')
>


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 todd
是不是和你上层依赖的jar冲突了?exclude剔除不了冲突jar吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章 刘建刚
miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830

Benchao Li  于2020年9月29日周二 下午5:18写道:

> Hi Ericliuk,
>
> 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
>
> Ericliuk  于2020年9月29日周二 下午4:59写道:
>
> > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> >
> >
> >
> > 不太清楚为什么用了mini batch就没读取这个配置。
> > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: Re: sql-cli执行sql报错

2020-09-29 文章 Benchao Li
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?

hl9...@126.com  于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>


-- 

Best,
Benchao Li


Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-29 文章 Benchao Li
这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道:

> 你好!
>      我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
>      使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



-- 

Best,
Benchao Li


Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 文章 王敏超
嗯嗯,是的。安装大佬的方法,的确成功了。再次感谢大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 Dream-底限
可以直接用yarnclient直接提交,flinkonyarn也是yarnclient提交的吧,不过感觉自己实现一遍挺麻烦的,我们最后也选的是process的方式

xiao cai  于2020年9月29日周二 下午5:54写道:

> 这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗?
>
>
>  原始邮件
> 发件人: todd
> 收件人: user-zh
> 发送时间: 2020年9月29日(周二) 17:36
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> https://github.com/todd5167/flink-spark-submiter
> 可以参考这个案例,用ClusterCLient提交。 -- Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 xiao cai
这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗?


 原始邮件 
发件人: todd
收件人: user-zh
发送时间: 2020年9月29日(周二) 17:36
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


https://github.com/todd5167/flink-spark-submiter 可以参考这个案例,用ClusterCLient提交。 -- 
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink配置hdfs状态后端遇到的问题

2020-09-29 文章 jester_jim
Hi Robin Zhang,
其实在只要不是在根目录下创建文件夹,只要在我指定的目录下创建即可,我其实是有权限的,Hadoop管理员给我分配了一个目录,我想把目录设置到分配的目录,但是一直报这个错,想问一下除了创建job信息,Flink还有什么机制会去hdfs上创建文件或文件夹的?
祝好!


在 2020年9月29日 17:15,Robin Zhang 写道:


Hi jester_jim, 
配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job
 id.所以,并不是新建父目录就可以,依然会存在权限问题 。 祝好,Robin Zhang Flink中文社区的各位大佬你们好: 
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2 
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
 state.backend: filesystem # Directory for checkpoints filesystem, when using 
any of the default bundled # state backends. # state.checkpoints.dir: 
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints # Default target 
directory for savepoints, optional. # state.savepoints.dir: 
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints 
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行: 2020-09-29 11:27:20,430 
INFO org.apache.hadoop.security.UserGroupInformation [] - Login successful for 
user jester/principle using keytab file 
/kafka/flink/flink-1.11.2/conf/jester.keytab Job has been submitted with JobID 
41c01241338f1c7112d48f277701d9c3 但是如果不注释,当我提交作业就会抛出: (本部分放在异常信息前面: 
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
 classpath` 另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode? ) 2020-09-29 
11:21:20,446 INFO org.apache.hadoop.security.UserGroupInformation [] - Login 
successful for user jester/principle using keytab file 
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 文章 Benchao Li
你的timeout方法应该要正确的处理ResultFuture,
比如ResultFuture.complete或者completeExceptionally,如果你什么都没做,那么这个异步请求就还没有真的结束。

王敏超  于2020年9月29日周二 下午5:43写道:

>  AsyncDataStream
>   //顺序异步IO
>   .orderedWait(input, new AsyncDatabaseRequest(), 5000,
> TimeUnit.MILLISECONDS, 1000)
>
>   当我没重写timeout方法的时候,会执行这个报错信息
> resultFuture.completeExceptionally(new TimeoutException("Async function
> call
> has timed out."))
>
>
>   当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
>   override def timeout(input: String, resultFuture: ResultFuture[Int]):
> Unit
> = {
> println("time out ... ")
>   }
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


?????? flink sql count????

2020-09-29 文章 lemon
count0
flinkcount??ifwhere??
?? select count(if(name like '%',1 , null)) where name 
like '%'  or name like '%'  group by ** 


--  --
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

pyflink1.11 window groupby出错

2020-09-29 文章 刘乘九
各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group 
方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?

错误信息:
py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time 
attribute for grouping in a stream environment.
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)




demo程序:
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.descriptors import *
from pyflink.table.descriptors import Json
from pyflink.table.window import *

test_out_put_data_path = r'D:\test_doc\test_result_data.csv'

s_nev = StreamExecutionEnvironment.get_execution_environment()
s_nev.set_parallelism(3)
st_nev = StreamTableEnvironment.create(s_nev, 
environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())

st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
 cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092, 
cdh5:9092")) \
.with_format(Json()
 .fail_on_missing_field(False)
 .schema(DataTypes.ROW([DataTypes.FIELD('time', 
DataTypes.TIMESTAMP(3)),

DataTypes.FIELD('prev_page',DataTypes.STRING()),
DataTypes.FIELD('page', 
DataTypes.STRING()),
DataTypes.FIELD("app", 
DataTypes.STRING()),

DataTypes.FIELD("nextApp",DataTypes.STRING()),

DataTypes.FIELD("service",DataTypes.STRING()),

DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
.with_schema(Schema().
field('prev_page', DataTypes.STRING())
 .field('page', DataTypes.STRING())
 .field('app', DataTypes.STRING())
 .field('nextApp', DataTypes.STRING())
 .field('service', DataTypes.STRING())
 .field('userId', DataTypes.BIGINT())
 .field('time', DataTypes.TIMESTAMP(3))
 .rowtime(Rowtime()
  .timestamps_from_field('time')
  .watermarks_periodic_bounded(6)))\
.in_append_mode()\
.create_temporary_table('raw_web_log_data')


st_nev.connect(FileSystem().path(test_out_put_data_path))\
.with_format(OldCsv()
 .field_delimiter(',')
 .field("userId", DataTypes.BIGINT())
 .field('dataCount', DataTypes.BIGINT())
 .field('count_time', DataTypes.TIMESTAMP(3))
 )\
.with_schema(Schema()
 .field('userId', DataTypes.BIGINT())
 .field('dataCount', DataTypes.BIGINT())
 .field('count_time', DataTypes.TIMESTAMP(3))
 )\
.create_temporary_table('test_out_put')


if __name__ == '__main__':
st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
 w').select('userId, page.count as d, w.end').execute_insert('test_out_put')


使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 文章 王敏超
 AsyncDataStream
  //顺序异步IO
  .orderedWait(input, new AsyncDatabaseRequest(), 5000,
TimeUnit.MILLISECONDS, 1000)

  当我没重写timeout方法的时候,会执行这个报错信息 
resultFuture.completeExceptionally(new TimeoutException("Async function call
has timed out."))


  当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
  override def timeout(input: String, resultFuture: ResultFuture[Int]): Unit
= {
println("time out ... ")
  }




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-29 文章 Michael Ran
~.~ 不是有几百个star 嘛。海豚 这个到apache 社区会强大些
在 2020-09-29 16:45:30,"赵一旦"  写道:
>看了下,hera算了,虽然文档看起来还行,但是5个star,不敢用。
>海豚这个看起来还不错,可以试试看。
>
>Michael Ran  于2020年9月29日周二 上午10:43写道:
>
>> ~。~ hera、海豚都行
>> 在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com" <
>> chengyanan1...@foxmail.com> 写道:
>> >
>> >Apache DolphinScheduler 你值得拥有
>> >
>> >https://dolphinscheduler.apache.org/zh-cn/
>> >
>> >
>> >
>> >发件人: 赵一旦
>> >发送时间: 2020-09-28 20:47
>> >收件人: user-zh
>> >主题: Re: 了解下大家生产中都用什么任务调度系统呢
>> >感觉ooize成熟但不想用,xml写起来难受。
>> >azkaban也需要单独上传。
>> >
>> >我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
>> >
>> >孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
>> >
>> >> Airflow & oozie
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 发自我的iPhone
>> >>
>> >>
>> >> -- 原始邮件 --
>> >> 发件人: 赵一旦 > >> 发送时间: 2020年9月28日 19:41
>> >> 收件人: user-zh > >> 主题: 回复:了解下大家生产中都用什么任务调度系统呢
>> >>
>> >>
>> >>
>> >> 主要是指开源的调度系统。
>> >>
>> >> 公司有个系统感觉经常挂,想换个开源的自己搭建。
>> >> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
>> >> (2)在生产中长期应用,稳定,能满足大多数需求的。
>> >>
>> >> 希望大家推荐下。
>>


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 todd
https://github.com/todd5167/flink-spark-submiter   可以参考这个案例,用ClusterCLient提交。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 todd
https://github.com/todd5167/flink-spark-submiter   可以参考下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count问题

2020-09-29 文章 Robin Zhang
Hi lemon,
内部判断if函数可以替换为case when

Best,
Robin


lemon wrote
> 请教各位:
> 我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
> 之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink
> sql中count不能为null,有什么别的方法能实现该功能吗?
> 使用的是flink1.10.1 blink
>  





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 文章 Robin Zhang
Hi Benchao,
 
 感谢回复,解决了我最近的疑惑。

Best,
Robin


Benchao Li-2 wrote
> Hi Robin,
> 
> 目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
> 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
> 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
> 当前如果你想实现类似功能,可以先自己写一个udaf来做。
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19449
> 
> Robin Zhang <

> vincent2015qdlg@

> > 于2020年9月29日周二 下午2:04写道:
> 
>> 环境: flink 1.10,使用flinkSQL
>>
>> kafka输入数据如:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>>
>> sql如下:
>>
>> INSERT INTO topic_sink
>> SELECT
>>   t,
>>   id,
>>   speed,
>>   LAG(speed, 1) OVER w AS speed_1,
>>   LAG(speed, 2) OVER w AS speed_2
>> FROM topic_source
>> WINDOW w AS (
>>   PARTITION BY id
>>   ORDER BY t
>> )
>> 我期望得到的结果数据是
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
>> "speed_2":null}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
>> "speed_2":null}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
>> "speed_2":4.0}
>>
>> 实际得到的结果数据是:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
>> "speed_2":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
>> "speed_2":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
>> "speed_2":6.0}
>>
>> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
> 
> 
> -- 
> 
> Best,
> Benchao Li





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章 Benchao Li
Hi Ericliuk,

这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~

Ericliuk  于2020年9月29日周二 下午4:59写道:

> 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> <
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png>
>
>
> 不太清楚为什么用了mini batch就没读取这个配置。
> 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: Flink配置hdfs状态后端遇到的问题

2020-09-29 文章 Robin Zhang
Hi jester_jim,

配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job
id.所以,并不是新建父目录就可以,依然会存在权限问题
。

  祝好,Robin Zhang




Flink中文社区的各位大佬你们好:
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default
bundled
# state backends.
#
state.checkpoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行:
2020-09-29 11:27:20,430 INFO 
org.apache.hadoop.security.UserGroupInformation  [] - Login
successful for user jester/principle using keytab file
/kafka/flink/flink-1.11.2/conf/jester.keytab
Job has been submitted with JobID 41c01241338f1c7112d48f277701d9c3


但是如果不注释,当我提交作业就会抛出:
(本部分放在异常信息前面:
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
classpath`
另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode?
)
2020-09-29 11:21:20,446 INFO 
org.apache.hadoop.security.UserGroupInformation  [] - Login
successful for user  jester/principle  using keytab file
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 文章 Benchao Li
Hi Robin,

目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
当前如果你想实现类似功能,可以先自己写一个udaf来做。

[1] https://issues.apache.org/jira/browse/FLINK-19449

Robin Zhang  于2020年9月29日周二 下午2:04写道:

> 环境: flink 1.10,使用flinkSQL
>
> kafka输入数据如:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>
> sql如下:
>
> INSERT INTO topic_sink
> SELECT
>   t,
>   id,
>   speed,
>   LAG(speed, 1) OVER w AS speed_1,
>   LAG(speed, 2) OVER w AS speed_2
> FROM topic_source
> WINDOW w AS (
>   PARTITION BY id
>   ORDER BY t
> )
> 我期望得到的结果数据是
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
> "speed_2":null}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
> "speed_2":null}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
> "speed_2":4.0}
>
> 实际得到的结果数据是:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
> "speed_2":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
> "speed_2":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
> "speed_2":6.0}
>
> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章 Ericliuk
我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。

 

不太清楚为什么用了mini batch就没读取这个配置。
一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-29 文章 赵一旦
看了下,hera算了,虽然文档看起来还行,但是5个star,不敢用。
海豚这个看起来还不错,可以试试看。

Michael Ran  于2020年9月29日周二 上午10:43写道:

> ~。~ hera、海豚都行
> 在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com" <
> chengyanan1...@foxmail.com> 写道:
> >
> >Apache DolphinScheduler 你值得拥有
> >
> >https://dolphinscheduler.apache.org/zh-cn/
> >
> >
> >
> >发件人: 赵一旦
> >发送时间: 2020-09-28 20:47
> >收件人: user-zh
> >主题: Re: 了解下大家生产中都用什么任务调度系统呢
> >感觉ooize成熟但不想用,xml写起来难受。
> >azkaban也需要单独上传。
> >
> >我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
> >
> >孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
> >
> >> Airflow & oozie
> >>
> >>
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: 赵一旦  >> 发送时间: 2020年9月28日 19:41
> >> 收件人: user-zh  >> 主题: 回复:了解下大家生产中都用什么任务调度系统呢
> >>
> >>
> >>
> >> 主要是指开源的调度系统。
> >>
> >> 公司有个系统感觉经常挂,想换个开源的自己搭建。
> >> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
> >> (2)在生产中长期应用,稳定,能满足大多数需求的。
> >>
> >> 希望大家推荐下。
>


创建BatchTableEnvironment报错

2020-09-29 文章 hl9...@126.com
flink 1.11.2版本,我写了个WordCountTable例子,代码如下:
public class WordCountTable {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet input = env.fromElements(new WC("Hello", 
1L),
运行报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Create 
BatchTableEnvironment failed.
at 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
at 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
at com.toonyoo.table.WordCountTable.main(WordCountTable.java:12)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

pox table相关依赖:


org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}
provided



hl9...@126.com