退订

2020-09-29 Thread 提运亨
退订

回复: Flink 1.11 table.executeInsert 程序退出

2020-09-29 Thread 史 正超
这个是一个已知问题,可以看看这个jira: https://issues.apache.org/jira/browse/FLINK-18545
规避这个问题的话,可以不用执行 tableEnv.execute("jobname"); 直接用 executeSql 
就可以了,遇到INSERT语句就能生成job了。

发件人: HunterXHunter <1356469...@qq.com>
发送时间: 2020年9月30日 2:32
收件人: user-zh@flink.apache.org 
主题: Flink 1.11 table.executeInsert 程序退出

当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);

tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(1L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 回复:关于flink sql cdc

2020-09-29 Thread 史 正超
HI, Kyle Zhang, 我刚才重现了你的问题,虽然你的mysql 
binlog设置是ROW格式,但是不排除其它session更改了binlog_format格式。重现步骤:

  1.  登录mysql客户端(注意用cmd登录) 执行语句, SET SESSION binlog_format='MIXED'; SET SESSION 
tx_isolation='REPEATABLE-READ'; COMMIT;
  2.  随便update或者insert一条语句。

然后就得到了和你一样的错误:
2020-09-30 10:46:37.607 [debezium-engine] ERROR 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction  - Reporting error:
org.apache.kafka.connect.errors.ConnectException: Received DML 'update orders 
set product_id = 1122 where order_number = 10001' for processing, binlog 
probably contains events generated with statement or mixed based replication 
format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML 
'update orders set product_id = 1122 where order_number = 10001' for 
processing, binlog probably contains events generated with statement or mixed 
based replication format
at 
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
... 5 common frames omitted

所以应该是其它session更忙了binlog_format格式,并且事务隔离级别为 REPEATABLE-READ
希望对你有帮助,
best,
shizhengchao

发件人: 谢治平 
发送时间: 2020年9月30日 1:25
收件人: user-zh 
抄送: user-zh 
主题: 回复:关于flink sql cdc

能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:xiezhiping...@163.com
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Flink 1.11 table.executeInsert 程序退出

2020-09-29 Thread HunterXHunter
当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);
 
tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(1L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 Thread Yang Peng
感谢回复,我们看了consumer的lag很小
而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
而且任务重启了没法jstack判断了

hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:

>
>
>
> 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> 也可以 jstack 采下堆栈看下,GC等看下。
> 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> Best,
> Hailong Wang
> 在 2020-09-29 20:06:50,"Yang Peng"  写道:
>
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> >
> >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> >
> >>
> >>
> >>
> >> Hi Yang Peng:
> >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> >> 2. Source 的序列化耗时严重,导致拉取变慢。
> >> 可以尝试着扩kafka 分区,加大Source并发看下。
> >> Best,
> >> Hailong Wang
> >>
> >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> >> >kafka消费没有积压,也没有反压,
> 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> >>
>


flink1.11.1 kafka eos checkpoints??????Timeout expired after 600000milliseconds

2020-09-29 Thread ????????
flink 
1.9kafkaSemantic??EXACTLY_ONCE1.11?? 
,checkpoints??InitProducerId??TransactionTimeout??MaxBlockMS
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
60milliseconds while awaiting InitProducerId
kafkakafka??offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

回复:关于flink sql cdc

2020-09-29 Thread 谢治平
能不能退掉邮箱信息,退出




| |
谢治平
|
|
邮箱:xiezhiping...@163.com
|

签名由 网易邮箱大师 定制

在2020年09月30日 09:24,Kyle Zhang 写道:
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 Thread Kyle Zhang
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 Thread Kyle Zhang
代码部分基本没有什么东西

public class CDC {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

String ddl = "CREATE TABLE mysql_binlog (\n" +
" id INT NOT NULL,\n" +
" emp_name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'xxx',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'eric',\n" +
" 'table-name' = 'employee1'\n" +
")";
tEnv.executeSql(ddl);

tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')\n" +
"LIKE mysql_binlog (EXCLUDING ALL)");

tEnv.executeSql("insert into print_table select *  from mysql_binlog");
}
}


08:30:19,945 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 0: disabling autocommit, enabling repeatable read
transactions, and setting lock wait timeout to 10
08:30:19,964 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 1: flush and obtain global read lock to prevent
writes to database
08:30:19,982 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 2: start transaction with consistent snapshot
08:30:19,985 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 3: read binlog position of MySQL master
08:30:19,989 INFO  io.debezium.connector.mysql.SnapshotReader
 [] -using binlog 'binlog.001254' at position '152522471'
and gtid ''
08:30:19,989 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 4: read list of available databases
08:30:19,995 INFO  io.debezium.connector.mysql.SnapshotReader
 [] -list of available databases is: [一堆database]
08:30:19,995 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 5: read list of available tables in each database

。。。一堆table

08:30:20,198 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 6: generating DROP and CREATE statements to reflect
current database schemas:
08:30:20,918 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 7: releasing global read lock to enable MySQL
writes
08:30:20,926 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 7: blocked writes to MySQL for a total of
00:00:00.948
08:30:20,927 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: scanning contents of 1 tables while still in
transaction
08:30:20,937 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: - scanning table 'eric.employee1' (1 of 1
tables)
08:30:20,937 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - For table 'eric.employee1' using select statement:
'SELECT * FROM `eric`.`employee1`'
08:30:20,949 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: - Completed scanning a total of 5 rows from
table 'eric.employee1' after 00:00:00.012
08:30:20,950 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 8: scanned 5 rows in 1 tables in 00:00:00.022
08:30:20,950 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Step 9: committing transaction
08:30:20,954 INFO  io.debezium.connector.mysql.SnapshotReader
 [] - Completed snapshot in 00:00:01.03
08:30:21,391 INFO
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer []
- Database snapshot phase can't perform checkpoint, acquired
Checkpoint lock.
+I(1,Eric,23)
+I(2,Jack,22)
+I(3,Amy,33)
+I(4,Dell,12)
08:30:21,392 INFO
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer []
- Received record from streaming binlog phase, released checkpoint
lock.
+I(5,Hello,44)


之后就会报其他表update或者insert的错

09:18:54,326 ERROR io.debezium.connector.mysql.BinlogReader
 [] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'UPDATE
triggers SET trigger_source='SimpleTimeTrigger',
modify_time=1601428734343, enc_type=2,

Re: Re: sql-cli执行sql报错

2020-09-29 Thread hl9...@126.com
没有修改kafka,就用官方的jar。后来我用1.11.2版本重新尝试了下,成功了,没有任何错误。
这个问题就不纠结了



hl9...@126.com
 
发件人: Benchao Li
发送时间: 2020-09-29 18:17
收件人: user-zh
主题: Re: Re: sql-cli执行sql报错
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?
 
hl9...@126.com  于2020年9月28日周一 下午6:06写道:
 
> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>
 
 
-- 
 
Best,
Benchao Li


Flink 1.12 snapshot throws ClassNotFoundException

2020-09-29 Thread Lian Jiang
Hi,

I use Flink source master to build a snapshot and use the jars in my
project. The goal is to avoid hacky deserialization code caused by avro 1.8
in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
code throws below ClassNotFoundException. I have verified that the
akka-actor jar 2.5.12 is available and specified in -classpath. I can even
create an object using akka/serialization/NullSerializer class in my
application, indicating there is no problem for this app to use any class
under namespace akka/serialization.

Caused by: java.lang.NoClassDefFoundError:
akka/serialization/BaseSerializer$class
at
akka.remote.serialization.MiscMessageSerializer.(MiscMessageSerializer.scala:25)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
at scala.util.Try$.apply(Try.scala:213)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
at
akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
at scala.util.Success.flatMap(Try.scala:251)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
at
akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
at
scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
at akka.serialization.Serialization.(Serialization.scala:346)
at
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
at
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
at
akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
at
org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
... 11 more


This is my gradle:

implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-connector-kafka_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-connector-wikiedits_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-core-1.12-SNAPSHOT.jar')
implementation files('lib/flink-java-1.12-SNAPSHOT.jar')
implementation files('lib/flink-metrics-dropwizard-1.12-SNAPSHOT.jar')
implementation files('lib/flink-streaming-java_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-connector-kafka-base_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-avro-1.12-SNAPSHOT.jar')
implementation files('lib/flink-annotations-1.12-SNAPSHOT.jar')
implementation files('lib/flink-runtime_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-shaded-asm-7-7.1-11.0.jar')
implementation files('lib/flink-metrics-core-1.12-SNAPSHOT.jar')
implementation files('lib/flink-optimizer_2.11-1.12-SNAPSHOT.jar')
implementation files('lib/flink-shaded-guava-18.0-11.0.jar')

implementation group: 'org.scala-lang', name: 'scala-library',
version: '2.12.12'
implementation group: 'org.apache.commons', name: 'commons-lang3',

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-09-29 Thread Austin Cawley-Edwards
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small
repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann  wrote:

> Hi Austin,
>
> could you share with us the exact job you are running (including the
> custom window trigger)? This would help us to better understand your
> problem.
>
> I am also pulling in Klou and Timo who might help with the windowing logic
> and the Table to DataStream conversion.
>
> Cheers,
> Till
>
> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm not sure if I've missed something in the docs, but I'm having a bit
>> of trouble with a streaming SQL job that starts w/ raw SQL queries and then
>> transitions to a more traditional streaming job. I'm on Flink 1.10 using
>> the Blink planner, running locally with no checkpointing.
>>
>> The job looks roughly like:
>>
>> CSV 1 -->
>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/
>> process func & custom trigger --> some other ops
>> CSV 3 -->
>>
>>
>> When I remove the windowing directly after the `toRetractStream`, the
>> records make it to the "some other ops" stage, but with the windowing,
>> those operations are sometimes not sent any data. I can also get data sent
>> to the downstream operators by putting in a no-op map before the window and
>> placing some breakpoints in there to manually slow down processing.
>>
>>
>> The logs don't seem to indicate anything went wrong and generally look
>> like:
>>
>> 4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>> 4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Source: Custom File source (1/1)
>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
>> and sending final execution state FINISHED to JobManager for task Source:
>> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>> ...
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  -
>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
>> from RUNNING to FINISHED.
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Window(TumblingProcessingTimeWindows(6),
>> TimedCountTrigger, ProcessWindowFunction$1) (1/1)
>> (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
>> ...
>> rest of the shutdown
>> ...
>> Program execution finished
>> Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
>> Job Runtime: 783 ms
>>
>>
>> Is there something I'm missing in my setup? Could it be my custom window
>> trigger? Bug? I'm stumped.
>>
>>
>> Thanks,
>> Austin
>>
>>
>>


Re: Efficiently processing sparse events in a time windows

2020-09-29 Thread Steven Murdoch
Thanks David, this is very helpful. I'm glad that it's not just that I had 
missed something obvious from the (generally very clear) documentation. I found 
various features that felt almost right (e.g. the priority queue behind Timers) 
but nothing that did the job. The temporal state idea does sound a very handy 
feature to have.

On Thu, 24 Sep 2020, at 08:50, David Anderson wrote:
> Steven,
> 
> I'm pretty sure this is a scenario that doesn't have an obvious good 
> solution. As you have discovered, the window API isn't much help; using a 
> process function does make sense. The challenge is finding a data structure 
> to use in keyed state that can be efficiently accessed and updated.
> 
> One option would be to use MapState, where the keys are timestamps (longs) 
> and the values are lists of the events with the given timestamps (or just a 
> count of those events, if that's sufficient). If you then use the RocksDB 
> state backend, you can leverage an implementation detail of that state 
> backend, which is that you can iterate over the entries in order, sorted by 
> the key (the serialized, binary key), which in the case of keys that are 
> longs, will do the right thing. Also, with the RocksDB state backend, you 
> only have to do ser/de to access and update individual entries -- and not the 
> entire map.
> 
> It's not exactly pretty to rely on this, and some of us have been giving some 
> thought to adding a temporal state type to Flink that would make these 
> scenarios feasible to implement efficiently on all of the state backends, but 
> for now, this may be the best solution.
> 
> Regards,
> David
> 
> On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch  
> wrote:
>> Hello,
>> 
>> I am trying to do something that seems like it should be quite simple but I 
>> haven’t found an efficient way to do this with Flink and I expect I’m 
>> missing something obvious here. 
>> 
>> The task is that I would like to process a sequence of events when a certain 
>> number appear within a keyed event-time window. There will be many keys but 
>> events within each keyed window will normally be quite sparse. 
>> 
>> My first guess was to use Flink’s sliding windowing functionality. However 
>> my concern is that events are duplicated for each window. I would like to be 
>> precise about timing so every event would trigger hundreds of copies of an 
>> event in hundreds of windows, most which are then discarded because there 
>> are insufficient events. 
>> 
>> My next guess was to use a process function, and maintain a queue of events 
>> as the state. When an event occurred I would add it to the queue and then 
>> remove any events which fell off the end of my window. I thought ListState 
>> would help here, but that appears to not allow items to be removed.
>> 
>> I then thought about using a ValueState with some queue data structure. 
>> However my understanding is that changes to a ValueState result in the 
>> entire object being copied and so would be quite inefficient and best 
>> avoided. 
>> 
>> Finally I thought about trying to just maintain a series of timers – 
>> incrementing on an event and decrementing on its expiry. However I then hit 
>> the problem of timer coalescing. If an event occurs at the same time as its 
>> predecessor, the timer will not get set so the counter will get incremented 
>> but never decremented. 
>> 
>> What I’m doing seems like it would be a common task but none of the options 
>> look good, so I feel I’m missing something. Could anyone offer some advice 
>> on how to handle this case?
>> 
>> Thanks in advance. 
>> 
>> Best wishes,
>> Steven


Re: Savepoint incomplete when job was killed after a cancel timeout

2020-09-29 Thread Paul Lam
Hi Till,

Thanks a lot for the pointer! I tried to restore the job using the
savepoint in a dry run, and it worked!

Guess I've misunderstood the configuration option, and confused by the
non-existent paths that the metadata contains.

Best,
Paul Lam

Till Rohrmann  于2020年9月29日周二 下午10:30写道:

> Thanks for sharing the logs with me. It looks as if the total size of the
> savepoint is 335kb for a job with a parallelism of 60 and a total of 120
> tasks. Hence, the average size of a state per task is between 2.5kb - 5kb.
> I think that the state size threshold refers to the size of the per task
> state. Hence, I believe that the _metadata file should contain all of your
> state. Have you tried restoring from this savepoint?
>
> Cheers,
> Till
>
> On Tue, Sep 29, 2020 at 3:47 PM Paul Lam  wrote:
>
>> Hi Till,
>>
>> Thanks for your quick reply.
>>
>> The checkpoint/savepoint size would be around 2MB, which is larger than
>> `state.backend.fs.memory-threshold`.
>>
>> The jobmanager logs are attached, which looks normal to me.
>>
>> Thanks again!
>>
>> Best,
>> Paul Lam
>>
>> Till Rohrmann  于2020年9月29日周二 下午8:32写道:
>>
>>> Hi Paul,
>>>
>>> could you share with us the logs of the JobManager? They might help to
>>> better understand in which order each operation occurred.
>>>
>>> How big are you expecting the size of the state to be? If it is smaller
>>> than state.backend.fs.memory-threshold, then the state data will be stored
>>> in the _metadata file.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Sep 29, 2020 at 1:52 PM Paul Lam  wrote:
>>>
 Hi,

 We have a Flink job that was stopped erroneously with no available
 checkpoint/savepoint to restore,
 and are looking for some help to narrow down the problem.

 How we ran into this problem:

 We stopped the job using cancel with savepoint command (for
 compatibility issue), but the command
 timed out after 1 min because there was some backpressure. So we force
 kill the job by yarn kill command.
 Usually, this would not cause troubles because we can still use the
 last checkpoint to restore the job.

 But at this time, the last checkpoint dir was cleaned up and empty (the
 retained checkpoint number was 1).
 According to zookeeper and the logs, the savepoint finished (job master
 logged “Savepoint stored in …”)
 right after the cancel timeout. However, the savepoint directory
 contains only _metadata file, and other
 state files referred by metadata are absent.

 Environment & Config:
 - Flink 1.11.0
 - YARN job cluster
 - HA via zookeeper
 - FsStateBackend
 - Aligned non-incremental checkpoint

 Any comments and suggestions are appreciated! Thanks!

 Best,
 Paul Lam




flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-29 Thread 王刚
这个问题我们之前使用sql窗口的时候也遇到过,当时是在1.7版本的tablesource后面加了个rebanlance算子让数据少的kafka分区的subtask
 watermark均衡下

发送自autohome

发件人: Benchao Li mailto:libenc...@apache.org>>
发送时间: 2020-09-29 18:10:42
收件人: user-zh mailto:user-zh@flink.apache.org>>
主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道:

> 你好!
>   我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
>   使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



--

Best,
Benchao Li


Re: Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 Thread 刘大龙
Hi, MiniBatch Agg目前没有实现State 
TTl,我提了个PR修复这个问题,参考https://github.com/apache/flink/pull/11830
@Jark,辛苦有空时帮忙reveiw一下代码,这个问题越来越多用户用户遇到了。


> -原始邮件-
> 发件人: "刘建刚" 
> 发送时间: 2020-09-29 18:27:47 (星期二)
> 收件人: user-zh 
> 抄送: 
> 主题: Re: 回复: BLinkPlanner sql join状态清理
> 
> miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830
> 
> Benchao Li  于2020年9月29日周二 下午5:18写道:
> 
> > Hi Ericliuk,
> >
> > 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> > 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
> >
> > Ericliuk  于2020年9月29日周二 下午4:59写道:
> >
> > > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > > <
> > >
> > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> > >
> > >
> > >
> > > 不太清楚为什么用了mini batch就没读取这个配置。
> > > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


--

Best


Re:Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 Thread hailongwang



不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
也可以 jstack 采下堆栈看下,GC等看下。
至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
Best,
Hailong Wang
在 2020-09-29 20:06:50,"Yang Peng"  写道:
>感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
>flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
>
>hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
>
>>
>>
>>
>> Hi Yang Peng:
>> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
>> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
>> 2. Source 的序列化耗时严重,导致拉取变慢。
>> 可以尝试着扩kafka 分区,加大Source并发看下。
>> Best,
>> Hailong Wang
>>
>> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
>> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>> >kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>>


Re: Savepoint incomplete when job was killed after a cancel timeout

2020-09-29 Thread Till Rohrmann
Thanks for sharing the logs with me. It looks as if the total size of the
savepoint is 335kb for a job with a parallelism of 60 and a total of 120
tasks. Hence, the average size of a state per task is between 2.5kb - 5kb.
I think that the state size threshold refers to the size of the per task
state. Hence, I believe that the _metadata file should contain all of your
state. Have you tried restoring from this savepoint?

Cheers,
Till

On Tue, Sep 29, 2020 at 3:47 PM Paul Lam  wrote:

> Hi Till,
>
> Thanks for your quick reply.
>
> The checkpoint/savepoint size would be around 2MB, which is larger than
> `state.backend.fs.memory-threshold`.
>
> The jobmanager logs are attached, which looks normal to me.
>
> Thanks again!
>
> Best,
> Paul Lam
>
> Till Rohrmann  于2020年9月29日周二 下午8:32写道:
>
>> Hi Paul,
>>
>> could you share with us the logs of the JobManager? They might help to
>> better understand in which order each operation occurred.
>>
>> How big are you expecting the size of the state to be? If it is smaller
>> than state.backend.fs.memory-threshold, then the state data will be stored
>> in the _metadata file.
>>
>> Cheers,
>> Till
>>
>> On Tue, Sep 29, 2020 at 1:52 PM Paul Lam  wrote:
>>
>>> Hi,
>>>
>>> We have a Flink job that was stopped erroneously with no available
>>> checkpoint/savepoint to restore,
>>> and are looking for some help to narrow down the problem.
>>>
>>> How we ran into this problem:
>>>
>>> We stopped the job using cancel with savepoint command (for
>>> compatibility issue), but the command
>>> timed out after 1 min because there was some backpressure. So we force
>>> kill the job by yarn kill command.
>>> Usually, this would not cause troubles because we can still use the last
>>> checkpoint to restore the job.
>>>
>>> But at this time, the last checkpoint dir was cleaned up and empty (the
>>> retained checkpoint number was 1).
>>> According to zookeeper and the logs, the savepoint finished (job master
>>> logged “Savepoint stored in …”)
>>> right after the cancel timeout. However, the savepoint directory
>>> contains only _metadata file, and other
>>> state files referred by metadata are absent.
>>>
>>> Environment & Config:
>>> - Flink 1.11.0
>>> - YARN job cluster
>>> - HA via zookeeper
>>> - FsStateBackend
>>> - Aligned non-incremental checkpoint
>>>
>>> Any comments and suggestions are appreciated! Thanks!
>>>
>>> Best,
>>> Paul Lam
>>>
>>>


Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-29 Thread Till Rohrmann
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang  wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann  于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang  wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments 

Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 Thread Yang Peng
感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?

hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:

>
>
>
> Hi Yang Peng:
> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> 2. Source 的序列化耗时严重,导致拉取变慢。
> 可以尝试着扩kafka 分区,加大Source并发看下。
> Best,
> Hailong Wang
>
> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> >kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>


Re:Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 Thread hailongwang



Hi Yang Peng:
根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
2. Source 的序列化耗时严重,导致拉取变慢。
可以尝试着扩kafka 分区,加大Source并发看下。
Best,
Hailong Wang

在 2020-09-29 19:44:44,"Yang Peng"  写道:
>请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、


Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 Thread Yang Peng
请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、


Re: Re: flink使用在docker环境中部署出现的两个问题

2020-09-29 Thread cxydeve...@163.com
你好,我这边看到您在另一个问题[1]中有做了相关的回答,
我在k8s上部署是遇到相同的问题,相同的错误,您这边是否有空帮忙试试看是不是flink-docker的bug, 还是我的什么配置错了
我也发出了一个新的问题[2]



[1]:http://apache-flink.147419.n8.nabble.com/flink-1-11-on-kubernetes-td4586.html#a4692
[2]:http://apache-flink.147419.n8.nabble.com/flink1-11-2-k8s-volume-Read-only-file-system-td7555.html




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Savepoint incomplete when job was killed after a cancel timeout

2020-09-29 Thread Till Rohrmann
Hi Paul,

could you share with us the logs of the JobManager? They might help to
better understand in which order each operation occurred.

How big are you expecting the size of the state to be? If it is smaller
than state.backend.fs.memory-threshold, then the state data will be stored
in the _metadata file.

Cheers,
Till

On Tue, Sep 29, 2020 at 1:52 PM Paul Lam  wrote:

> Hi,
>
> We have a Flink job that was stopped erroneously with no available
> checkpoint/savepoint to restore,
> and are looking for some help to narrow down the problem.
>
> How we ran into this problem:
>
> We stopped the job using cancel with savepoint command (for compatibility
> issue), but the command
> timed out after 1 min because there was some backpressure. So we force
> kill the job by yarn kill command.
> Usually, this would not cause troubles because we can still use the last
> checkpoint to restore the job.
>
> But at this time, the last checkpoint dir was cleaned up and empty (the
> retained checkpoint number was 1).
> According to zookeeper and the logs, the savepoint finished (job master
> logged “Savepoint stored in …”)
> right after the cancel timeout. However, the savepoint directory contains
> only _metadata file, and other
> state files referred by metadata are absent.
>
> Environment & Config:
> - Flink 1.11.0
> - YARN job cluster
> - HA via zookeeper
> - FsStateBackend
> - Aligned non-incremental checkpoint
>
> Any comments and suggestions are appreciated! Thanks!
>
> Best,
> Paul Lam
>
>


Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-29 Thread zheng faaron
Hi,

可以检查一下这个参数是否设置正确,也可以在jobmanager页面上看下是否有这个参数。我之前遇到过类似问题,设置这个参数可以解决问题。

Best,
Faaron Zheng


From: jy l 
Sent: Monday, September 28, 2020 4:57:46 PM
To: user-zh@flink.apache.org 
Subject: Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the 
maximum akka framesize

如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。

李加燕  于2020年9月28日周一 下午3:07写道:

> Flink batch 模式消费hdfs上的文件,并做了一个word count
> 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
> java.lang.reflect.UndeclaredThrowableException: null
> at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource)
> ~[?:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds
> the maximum akka framesize.
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> ... 28 more
> 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
> 请求帮助。


Re: flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-09-29 Thread cxydeve...@163.com
官网例子[1]没有改动之前,是可以正常启动的
原来的配置文件内容如下
...
"volumes": [
  {
"name": "flink-config-volume",
"configMap": {
  "name": "flink-config",
  "items": [
{
  "key": "flink-conf.yaml",
  "path": "flink-conf.yaml"
},
{
  "key": "log4j-console.properties",
  "path": "log4j-console.properties"
}
  ],
  "defaultMode": 420
}
  }
],
...
"volumeMounts": [
  {
"name": "flink-config-volume",
"mountPath": "/opt/flink/conf"
  }
],
...

后面在k8s上修改了yaml文件,就是增加了其他volumes
然后增加uploadjar, completed-jobs和lib的挂载,如下
...
"volumes": [
  {
"name": "flink-config-volume",
"configMap": {
  "name": "flink-config",
  "items": [
{
  "key": "flink-conf.yaml",
  "path": "flink-conf.yaml"
},
{
  "key": "log4j-console.properties",
  "path": "log4j-console.properties"
}
  ],
  "defaultMode": 420
}
  },
  {
"name": "flink-uploadjar-volume",
"hostPath": {
  "path": "/data/volumes/flink/jobmanager/uploadjar",
  "type": ""
}
  },
  {
"name": "flink-completejobs-volume",
"hostPath": {
  "path": "/data/volumes/flink/jobmanager/completed-jobs/",
  "type": ""
}
  },
  {
"name": "libs-volume",
"hostPath": {
  "path": "/data/volumes/flink/jobmanager/lib",
  "type": ""
}
  }
],
...
"volumeMounts": [
  {
"name": "flink-config-volume",
"mountPath": "/opt/flink/conf"
  },
  {
"name": "flink-uploadjar-volume",
"mountPath": "/opt/flink/flink-uploadjar"
  },
  {
"name": "flink-completejobs-volume",
"mountPath": "/opt/flink/completed-jobs/"
  },
  {
"name": "libs-volume",
"mountPath": "/opt/flink/lib"
  }
],
...
增加了volume之后就报了错误
2020-09-29T12:09:33.055804861Z Starting Job Manager
2020-09-29T12:09:33.061359237Z sed: couldn't open temporary file
/opt/flink/conf/sedVef7YR: Read-only file system
2020-09-29T12:09:33.06561576Z sed: couldn't open temporary file
/opt/flink/conf/sed4a7zGR: Read-only file system
2020-09-29T12:09:33.068683501Z /docker-entrypoint.sh: 72:
/docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml:
Permission denied
2020-09-29T12:09:33.068700999Z /docker-entrypoint.sh: 91:
/docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml.tmp:
Read-only file system
2020-09-29T12:09:33.919147511Z Starting standalonesession as a console
application on host flink-jobmanager-6d5bc45dbb-jjs4f.
2020-09-29T12:09:34.154740531Z log4j:WARN No appenders could be found for
logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
2020-09-29T12:09:34.154769537Z log4j:WARN Please initialize the log4j system
properly.
2020-09-29T12:09:34.154776198Z log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

经过排查,发现是在挂载了/opt/flink/lib之后就会出错,挂载其他目录是可以正常运行的/opt/flink/flink-uploadjar,/opt/flink/completed-jobs/

就是增加了下面这个配置出现的错误
"volumeMounts": [
  ...
  {
"name": "libs-volume",
"mountPath": "/opt/flink/lib"
  }
  ...
],



[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Savepoint incomplete when job was killed after a cancel timeout

2020-09-29 Thread Paul Lam
Hi,

We have a Flink job that was stopped erroneously with no available 
checkpoint/savepoint to restore, 
and are looking for some help to narrow down the problem.

How we ran into this problem:

We stopped the job using cancel with savepoint command (for compatibility 
issue), but the command
timed out after 1 min because there was some backpressure. So we force kill the 
job by yarn kill command.
Usually, this would not cause troubles because we can still use the last 
checkpoint to restore the job.

But at this time, the last checkpoint dir was cleaned up and empty (the 
retained checkpoint number was 1).
According to zookeeper and the logs, the savepoint finished (job master logged 
“Savepoint stored in …”) 
right after the cancel timeout. However, the savepoint directory contains only 
_metadata file, and other 
state files referred by metadata are absent. 

Environment & Config:
- Flink 1.11.0
- YARN job cluster
- HA via zookeeper
- FsStateBackend
- Aligned non-incremental checkpoint

Any comments and suggestions are appreciated! Thanks!

Best,
Paul Lam



关于flink sql cdc

2020-09-29 Thread Kyle Zhang
Hi,all
  今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 emp_name STRING,
 age INT
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'xxx',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'empoylee1'
);
结果直接用print table
运行一段时间后报错
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Error during binlog processing. Last offset stored = null, binlog
reader near position = binlog.001254/132686776
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT INTO
execution_flows (project_id, flow_id, version, status, submit_time,
submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
'INSERT INTO execution_flows (project_id, flow_id, version, status,
submit_time, submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 5 more

sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题

Best,
Kyle Zhang


Re: 回复: flink sql count问题

2020-09-29 Thread Robin Zhang
Hi lemon,
不是很理解你的疑问是什么,flink是事件驱动的,所以,来一条数据,就会被处理,走你的逻辑,就会产生一个结果,如果是第一次出现的key,只有一条数据,如果是状态中已经存在的key,会在控制台输出两条数据,一条true的是最终sink的结果。所以,每次输出一条结果有什么问题吗?


Best,
Robin



lemon wrote
> 感谢各位的回答,各位的方法我都试了一下,但是都会在下游输出一条结果,一条不符合条件的语句count会在下游输出0
> 我理解是flink中的count对于每一条数据都会输出一条结果,所以只能把if中的判断条件再放到最后的where中进行过滤
> 类似于 selectcount(if(name like '南京%',1 , null)) where name
> like'南京%' or name like'杭州%' group by ** 这样
> 
> 
> --原始邮件--
> 发件人:  
>  
> "user-zh" 
>   
> <

> vincent2015qdlg@

> ;
> 发送时间:2020年9月29日(星期二) 下午5:32
> 收件人:"user-zh"<

> user-zh@.apache

> ;
> 
> 主题:Re: flink sql count问题
> 
> 
> 
> Hi lemon,
>  内部判断if函数可以替换为case when
> 
> Best,
> Robin
> 
> 
> lemon wrote
>  请教各位:
>  我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
>  之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink
>  sql中count不能为null,有什么别的方法能实现该功能吗?
>  使用的是flink1.10.1 blink
>  nbsp;
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 Thread Jun Zhang
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。



BestJun


-- 原始邮件 --
发件人: me 

flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 Thread me
flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
tableEnv.executeSql("insert into dwd_security_log select * from " + table)
实际写入hive之后,查看hdfs上写入的文件为19M,这是60秒内写入hive的,flink流式写入hive通过checkpotin来把数据刷入hive中。


请问大家只有有什么提升写入速度的参数或者方式吗?

Re: pyflink1.11 window groupby出错

2020-09-29 Thread Xingbo Huang
Hello,

现在的descriptor的方式存在很多bug,社区已经在进行重构了。当前你可以使用DDL[1]的方式来解决问题。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
Best,
Xingbo

刘乘九  于2020年9月29日周二 下午5:46写道:

> 各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group
> 方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?
>
> 错误信息:
> py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
>
>
>
>
> demo程序:
> from pyflink.datastream import *
> from pyflink.table import *
> from pyflink.table.descriptors import *
> from pyflink.table.descriptors import Json
> from pyflink.table.window import *
>
> test_out_put_data_path = r'D:\test_doc\test_result_data.csv'
>
> s_nev = StreamExecutionEnvironment.get_execution_environment()
> s_nev.set_parallelism(3)
> st_nev = StreamTableEnvironment.create(s_nev,
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>
> st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
> cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092,
> cdh5:9092")) \
> .with_format(Json()
>  .fail_on_missing_field(False)
>  .schema(DataTypes.ROW([DataTypes.FIELD('time',
> DataTypes.TIMESTAMP(3)),
>
> DataTypes.FIELD('prev_page',DataTypes.STRING()),
> DataTypes.FIELD('page',
> DataTypes.STRING()),
> DataTypes.FIELD("app",
> DataTypes.STRING()),
>
> DataTypes.FIELD("nextApp",DataTypes.STRING()),
>
> DataTypes.FIELD("service",DataTypes.STRING()),
>
> DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
> .with_schema(Schema().
> field('prev_page', DataTypes.STRING())
>  .field('page', DataTypes.STRING())
>  .field('app', DataTypes.STRING())
>  .field('nextApp', DataTypes.STRING())
>  .field('service', DataTypes.STRING())
>  .field('userId', DataTypes.BIGINT())
>  .field('time', DataTypes.TIMESTAMP(3))
>  .rowtime(Rowtime()
>   .timestamps_from_field('time')
>   .watermarks_periodic_bounded(6)))\
> .in_append_mode()\
> .create_temporary_table('raw_web_log_data')
>
>
> st_nev.connect(FileSystem().path(test_out_put_data_path))\
> .with_format(OldCsv()
>  .field_delimiter(',')
>  .field("userId", DataTypes.BIGINT())
>  .field('dataCount', DataTypes.BIGINT())
>  .field('count_time', DataTypes.TIMESTAMP(3))
>  )\
> .with_schema(Schema()
>  .field('userId', DataTypes.BIGINT())
>  .field('dataCount', DataTypes.BIGINT())
>  .field('count_time', DataTypes.TIMESTAMP(3))
>  )\
> .create_temporary_table('test_out_put')
>
>
> if __name__ == '__main__':
> st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
> w').select('userId, page.count as d, w.end').execute_insert('test_out_put')
>


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 Thread todd
是不是和你上层依赖的jar冲突了?exclude剔除不了冲突jar吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 Thread 刘建刚
miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830

Benchao Li  于2020年9月29日周二 下午5:18写道:

> Hi Ericliuk,
>
> 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
>
> Ericliuk  于2020年9月29日周二 下午4:59写道:
>
> > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> >
> >
> >
> > 不太清楚为什么用了mini batch就没读取这个配置。
> > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: Apache Qpid connector.

2020-09-29 Thread Master Yoda
Hi Austin, thanks for the response. Yes, the protocol is AMQP. I will try
out the RabbitMQ connector with Qpid

thanks,
Parag


On Sat, Sep 26, 2020 at 12:22 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey (Master) Parag,
>
> I don't know anything about Apache Qpid, but from the homepage[1], it
> looks like the protocol is just AMQP? Are there more specifics than that?
> If it is just AMQP would the RabbitMQ connector[2] work for you?
>
> Best,
> Austin
>
> [1]: https://qpid.apache.org/
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html
>
> On Fri, Sep 25, 2020 at 11:26 AM Master Yoda 
> wrote:
>
>>  Hello,   Is there a flink source and sink
>> from/to Apache Qpid. ? I searched around a bit but could not find one.
>> Would I need to write one if there isn't one already ?
>>
>> thanks,
>> Parag
>>
>


Re: Re: sql-cli执行sql报错

2020-09-29 Thread Benchao Li
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?

hl9...@126.com  于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>


-- 

Best,
Benchao Li


Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-29 Thread Benchao Li
这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道:

> 你好!
>   我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
>   使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



-- 

Best,
Benchao Li


Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 Thread 王敏超
嗯嗯,是的。安装大佬的方法,的确成功了。再次感谢大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 Thread Dream-底限
可以直接用yarnclient直接提交,flinkonyarn也是yarnclient提交的吧,不过感觉自己实现一遍挺麻烦的,我们最后也选的是process的方式

xiao cai  于2020年9月29日周二 下午5:54写道:

> 这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗?
>
>
>  原始邮件
> 发件人: todd
> 收件人: user-zh
> 发送时间: 2020年9月29日(周二) 17:36
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> https://github.com/todd5167/flink-spark-submiter
> 可以参考这个案例,用ClusterCLient提交。 -- Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 Thread xiao cai
这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗?


 原始邮件 
发件人: todd
收件人: user-zh
发送时间: 2020年9月29日(周二) 17:36
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


https://github.com/todd5167/flink-spark-submiter 可以参考这个案例,用ClusterCLient提交。 -- 
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink配置hdfs状态后端遇到的问题

2020-09-29 Thread jester_jim
Hi Robin Zhang,
其实在只要不是在根目录下创建文件夹,只要在我指定的目录下创建即可,我其实是有权限的,Hadoop管理员给我分配了一个目录,我想把目录设置到分配的目录,但是一直报这个错,想问一下除了创建job信息,Flink还有什么机制会去hdfs上创建文件或文件夹的?
祝好!


在 2020年9月29日 17:15,Robin Zhang 写道:


Hi jester_jim, 
配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job
 id.所以,并不是新建父目录就可以,依然会存在权限问题 。 祝好,Robin Zhang Flink中文社区的各位大佬你们好: 
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2 
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
 state.backend: filesystem # Directory for checkpoints filesystem, when using 
any of the default bundled # state backends. # state.checkpoints.dir: 
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints # Default target 
directory for savepoints, optional. # state.savepoints.dir: 
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints 
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行: 2020-09-29 11:27:20,430 
INFO org.apache.hadoop.security.UserGroupInformation [] - Login successful for 
user jester/principle using keytab file 
/kafka/flink/flink-1.11.2/conf/jester.keytab Job has been submitted with JobID 
41c01241338f1c7112d48f277701d9c3 但是如果不注释,当我提交作业就会抛出: (本部分放在异常信息前面: 
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
 classpath` 另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode? ) 2020-09-29 
11:21:20,446 INFO org.apache.hadoop.security.UserGroupInformation [] - Login 
successful for user jester/principle using keytab file 
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 Thread Benchao Li
你的timeout方法应该要正确的处理ResultFuture,
比如ResultFuture.complete或者completeExceptionally,如果你什么都没做,那么这个异步请求就还没有真的结束。

王敏超  于2020年9月29日周二 下午5:43写道:

>  AsyncDataStream
>   //顺序异步IO
>   .orderedWait(input, new AsyncDatabaseRequest(), 5000,
> TimeUnit.MILLISECONDS, 1000)
>
>   当我没重写timeout方法的时候,会执行这个报错信息
> resultFuture.completeExceptionally(new TimeoutException("Async function
> call
> has timed out."))
>
>
>   当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
>   override def timeout(input: String, resultFuture: ResultFuture[Int]):
> Unit
> = {
> println("time out ... ")
>   }
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


?????? flink sql count????

2020-09-29 Thread lemon
count0
flinkcount??ifwhere??
?? selectcount(if(name like '%',1 , null)) where name 
like'%' or name like'%' group by ** 


----
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

Re: Poor performance with large keys using RocksDB and MapState

2020-09-29 Thread ירון שני
Thanks Yun!,
I used this option, and it greatly helped

2:44 

val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends
DefaultConfigurableOptionsFactory {  override def
createColumnOptions(currentOptions: ColumnFamilyOptions,
handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions =
{
super.createColumnOptions(currentOptions,
handlesToClose).optimizeForPointLookup(2000)
  }
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)


But now I cant use the RocksDBSharedResources because of
setCacheIndexAndFilterBlocks seems to make the hash index not work properly
and the performance is bad again.
Only when using  be.getMemoryConfiguration.setUseManagedMemory(false) and
skipping setCacheIndexAndFilterBlocks , only then its working :(





On Fri, Sep 25, 2020 at 9:56 AM Yun Tang  wrote:

> Hi
>
> If you want to improve the performance of point lookup, you could try to
> use additional hash index. This feature needs to pass a prefix extractor,
> however, original interface is not exposed out directly in java API.
>
> You could try to call
> columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would
> use NoopTransform prefix extractor by default[1].
> Please also consider to use this feature after Flink-1.10.2 due to RocksDB
> internal bug [2].
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
> [2] https://issues.apache.org/jira/browse/FLINK-17800
>
> Best
> Yun Tang
>
>
> --
> *From:* ירון שני 
> *Sent:* Wednesday, September 23, 2020 23:56
> *To:* user@flink.apache.org 
> *Subject:* Poor performance with large keys using RocksDB and MapState
>
> Hello,
> I have a poor throughput issue, and I think I managed to reproduce it
> using the following code:
>
> val conf = new Configuration()
> conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 
> 1000))
> conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 
> 1000))
> conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
> conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))
>
> val be = new RocksDBStateBackend("file:///tmp")
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>   .setStateBackend(be)
>
> env.setParallelism(3)
> env.getConfig.enableObjectReuse()
>
> val r = new scala.util.Random(31)
> val randStr = r.nextString(4992)
> val s = env.fromElements(1).process((value: Int, ctx: 
> _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, 
> _root_.scala.Predef.String]#Context, out: 
> _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
>   for (a <- 1 to 1000 * 1000 * 10) {
> out.collect( randStr + r.nextString(8) )
>
>   }
> }).keyBy(a=>a).process(new ProcessFunction[String, String] {
>   private var someState: MapState[String, String] = _
>
>   override def open(parameters: Configuration): Unit = {
> someState = getRuntimeContext.getMapState(
>   new MapStateDescriptor[String, String]("someState", 
> createTypeInformation[String], createTypeInformation[String])
> )
>   }
>
>   override def processElement(value: _root_.scala.Predef.String, ctx: 
> _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
>  _root_.scala.Predef.String]#Context, out: 
> _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
> if(!someState.contains(value)) {
>   someState.put(value, value)
> }
>   }
> })
>
> env.execute()
>
> This has really poor throughput.
> Now changing
> out.collect( randStr + r.nextString(8) )
>
> to
> out.collect( r.nextString(8) + randStr)
> Solves the issue.
> Is there any way easy to fix this?
> I tried to use hash index, but it required rocks db option called "prefix
> extractor" which I don't know how to fill yet, and no idea if it will fix
> it.
> If anyone encountered that before, I would really use some advice/help.
> Thanks!
>
>
>
>
>
>
>
>


pyflink1.11 window groupby出错

2020-09-29 Thread 刘乘九
各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group 
方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?

错误信息:
py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time 
attribute for grouping in a stream environment.
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)




demo程序:
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.descriptors import *
from pyflink.table.descriptors import Json
from pyflink.table.window import *

test_out_put_data_path = r'D:\test_doc\test_result_data.csv'

s_nev = StreamExecutionEnvironment.get_execution_environment()
s_nev.set_parallelism(3)
st_nev = StreamTableEnvironment.create(s_nev, 
environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())

st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
 cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092, 
cdh5:9092")) \
.with_format(Json()
 .fail_on_missing_field(False)
 .schema(DataTypes.ROW([DataTypes.FIELD('time', 
DataTypes.TIMESTAMP(3)),

DataTypes.FIELD('prev_page',DataTypes.STRING()),
DataTypes.FIELD('page', 
DataTypes.STRING()),
DataTypes.FIELD("app", 
DataTypes.STRING()),

DataTypes.FIELD("nextApp",DataTypes.STRING()),

DataTypes.FIELD("service",DataTypes.STRING()),

DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
.with_schema(Schema().
field('prev_page', DataTypes.STRING())
 .field('page', DataTypes.STRING())
 .field('app', DataTypes.STRING())
 .field('nextApp', DataTypes.STRING())
 .field('service', DataTypes.STRING())
 .field('userId', DataTypes.BIGINT())
 .field('time', DataTypes.TIMESTAMP(3))
 .rowtime(Rowtime()
  .timestamps_from_field('time')
  .watermarks_periodic_bounded(6)))\
.in_append_mode()\
.create_temporary_table('raw_web_log_data')


st_nev.connect(FileSystem().path(test_out_put_data_path))\
.with_format(OldCsv()
 .field_delimiter(',')
 .field("userId", DataTypes.BIGINT())
 .field('dataCount', DataTypes.BIGINT())
 .field('count_time', DataTypes.TIMESTAMP(3))
 )\
.with_schema(Schema()
 .field('userId', DataTypes.BIGINT())
 .field('dataCount', DataTypes.BIGINT())
 .field('count_time', DataTypes.TIMESTAMP(3))
 )\
.create_temporary_table('test_out_put')


if __name__ == '__main__':
st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
 w').select('userId, page.count as d, w.end').execute_insert('test_out_put')


使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 Thread 王敏超
 AsyncDataStream
  //顺序异步IO
  .orderedWait(input, new AsyncDatabaseRequest(), 5000,
TimeUnit.MILLISECONDS, 1000)

  当我没重写timeout方法的时候,会执行这个报错信息 
resultFuture.completeExceptionally(new TimeoutException("Async function call
has timed out."))


  当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
  override def timeout(input: String, resultFuture: ResultFuture[Int]): Unit
= {
println("time out ... ")
  }




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-29 Thread Michael Ran
~.~ 不是有几百个star 嘛。海豚 这个到apache 社区会强大些
在 2020-09-29 16:45:30,"赵一旦"  写道:
>看了下,hera算了,虽然文档看起来还行,但是5个star,不敢用。
>海豚这个看起来还不错,可以试试看。
>
>Michael Ran  于2020年9月29日周二 上午10:43写道:
>
>> ~。~ hera、海豚都行
>> 在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com" <
>> chengyanan1...@foxmail.com> 写道:
>> >
>> >Apache DolphinScheduler 你值得拥有
>> >
>> >https://dolphinscheduler.apache.org/zh-cn/
>> >
>> >
>> >
>> >发件人: 赵一旦
>> >发送时间: 2020-09-28 20:47
>> >收件人: user-zh
>> >主题: Re: 了解下大家生产中都用什么任务调度系统呢
>> >感觉ooize成熟但不想用,xml写起来难受。
>> >azkaban也需要单独上传。
>> >
>> >我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
>> >
>> >孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
>> >
>> >> Airflow  oozie
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 发自我的iPhone
>> >>
>> >>
>> >> -- 原始邮件 --
>> >> 发件人: 赵一旦 > >> 发送时间: 2020年9月28日 19:41
>> >> 收件人: user-zh > >> 主题: 回复:了解下大家生产中都用什么任务调度系统呢
>> >>
>> >>
>> >>
>> >> 主要是指开源的调度系统。
>> >>
>> >> 公司有个系统感觉经常挂,想换个开源的自己搭建。
>> >> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
>> >> (2)在生产中长期应用,稳定,能满足大多数需求的。
>> >>
>> >> 希望大家推荐下。
>>


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 Thread todd
https://github.com/todd5167/flink-spark-submiter   可以参考这个案例,用ClusterCLient提交。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 Thread todd
https://github.com/todd5167/flink-spark-submiter   可以参考下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count问题

2020-09-29 Thread Robin Zhang
Hi lemon,
内部判断if函数可以替换为case when

Best,
Robin


lemon wrote
> 请教各位:
> 我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
> 之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink
> sql中count不能为null,有什么别的方法能实现该功能吗?
> 使用的是flink1.10.1 blink
> 





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 Thread Robin Zhang
Hi Benchao,
 
 感谢回复,解决了我最近的疑惑。

Best,
Robin


Benchao Li-2 wrote
> Hi Robin,
> 
> 目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
> 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
> 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
> 当前如果你想实现类似功能,可以先自己写一个udaf来做。
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19449
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年9月29日周二 下午2:04写道:
> 
>> 环境: flink 1.10,使用flinkSQL
>>
>> kafka输入数据如:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>>
>> sql如下:
>>
>> INSERT INTO topic_sink
>> SELECT
>>   t,
>>   id,
>>   speed,
>>   LAG(speed, 1) OVER w AS speed_1,
>>   LAG(speed, 2) OVER w AS speed_2
>> FROM topic_source
>> WINDOW w AS (
>>   PARTITION BY id
>>   ORDER BY t
>> )
>> 我期望得到的结果数据是
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
>> "speed_2":null}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
>> "speed_2":null}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
>> "speed_2":4.0}
>>
>> 实际得到的结果数据是:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
>> "speed_2":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
>> "speed_2":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
>> "speed_2":6.0}
>>
>> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
> 
> 
> -- 
> 
> Best,
> Benchao Li





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 Thread Benchao Li
Hi Ericliuk,

这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~

Ericliuk  于2020年9月29日周二 下午4:59写道:

> 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> <
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png>
>
>
> 不太清楚为什么用了mini batch就没读取这个配置。
> 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: Flink配置hdfs状态后端遇到的问题

2020-09-29 Thread Robin Zhang
Hi jester_jim,

配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job
id.所以,并不是新建父目录就可以,依然会存在权限问题
。

  祝好,Robin Zhang




Flink中文社区的各位大佬你们好:
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default
bundled
# state backends.
#
state.checkpoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行:
2020-09-29 11:27:20,430 INFO 
org.apache.hadoop.security.UserGroupInformation  [] - Login
successful for user jester/principle using keytab file
/kafka/flink/flink-1.11.2/conf/jester.keytab
Job has been submitted with JobID 41c01241338f1c7112d48f277701d9c3


但是如果不注释,当我提交作业就会抛出:
(本部分放在异常信息前面:
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
classpath`
另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode?
)
2020-09-29 11:21:20,446 INFO 
org.apache.hadoop.security.UserGroupInformation  [] - Login
successful for user  jester/principle  using keytab file
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 Thread Benchao Li
Hi Robin,

目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
当前如果你想实现类似功能,可以先自己写一个udaf来做。

[1] https://issues.apache.org/jira/browse/FLINK-19449

Robin Zhang  于2020年9月29日周二 下午2:04写道:

> 环境: flink 1.10,使用flinkSQL
>
> kafka输入数据如:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>
> sql如下:
>
> INSERT INTO topic_sink
> SELECT
>   t,
>   id,
>   speed,
>   LAG(speed, 1) OVER w AS speed_1,
>   LAG(speed, 2) OVER w AS speed_2
> FROM topic_source
> WINDOW w AS (
>   PARTITION BY id
>   ORDER BY t
> )
> 我期望得到的结果数据是
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
> "speed_2":null}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
> "speed_2":null}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
> "speed_2":4.0}
>
> 实际得到的结果数据是:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
> "speed_2":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
> "speed_2":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
> "speed_2":6.0}
>
> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 Thread Ericliuk
我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。

 

不太清楚为什么用了mini batch就没读取这个配置。
一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-29 Thread 赵一旦
看了下,hera算了,虽然文档看起来还行,但是5个star,不敢用。
海豚这个看起来还不错,可以试试看。

Michael Ran  于2020年9月29日周二 上午10:43写道:

> ~。~ hera、海豚都行
> 在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com" <
> chengyanan1...@foxmail.com> 写道:
> >
> >Apache DolphinScheduler 你值得拥有
> >
> >https://dolphinscheduler.apache.org/zh-cn/
> >
> >
> >
> >发件人: 赵一旦
> >发送时间: 2020-09-28 20:47
> >收件人: user-zh
> >主题: Re: 了解下大家生产中都用什么任务调度系统呢
> >感觉ooize成熟但不想用,xml写起来难受。
> >azkaban也需要单独上传。
> >
> >我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
> >
> >孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
> >
> >> Airflow  oozie
> >>
> >>
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: 赵一旦  >> 发送时间: 2020年9月28日 19:41
> >> 收件人: user-zh  >> 主题: 回复:了解下大家生产中都用什么任务调度系统呢
> >>
> >>
> >>
> >> 主要是指开源的调度系统。
> >>
> >> 公司有个系统感觉经常挂,想换个开源的自己搭建。
> >> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
> >> (2)在生产中长期应用,稳定,能满足大多数需求的。
> >>
> >> 希望大家推荐下。
>


创建BatchTableEnvironment报错

2020-09-29 Thread hl9...@126.com
flink 1.11.2版本,我写了个WordCountTable例子,代码如下:
public class WordCountTable {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet input = env.fromElements(new WC("Hello", 
1L),
运行报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Create 
BatchTableEnvironment failed.
at 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
at 
org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
at com.toonyoo.table.WordCountTable.main(WordCountTable.java:12)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

pox table相关依赖:


org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}
provided



hl9...@126.com


Re: Flink Batch Processing

2020-09-29 Thread Timo Walther

Hi Sunitha,

currently, not every connector can be mixed with every API. I agree that 
it is confusing from time to time. The HBase connector is an 
InputFormat. DataSet, DataStream and Table API can work with 
InputFormats. The current Hbase input format might work best with Table 
API. If you like to use CEP API, you can use Table API 
(StreamTableEnvironment) to read from Hbase and call `toAppendStream` 
directly afterwards to further process in DataStream API. This works 
also for bounded streams thus you can do "batch" processing.


Regards,
Timo


On 29.09.20 09:56, Till Rohrmann wrote:

Hi Sunitha,

here is some documentation about how to use the Hbase sink with Flink 
[1, 2].


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hbase.html
[2] 
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-connector.html


Cheers,
Till

On Tue, Sep 29, 2020 at 9:16 AM s_penakalap...@yahoo.com 
 > wrote:


Hi Piotrek,

Thank you for the reply.

Flink changes are good, However Flink is changing so much that we
are unable to get any good implementation examples either on Flink
documents or any other website.

Using HBaseInputFormat I was able to read the data as a DataSet<>,
now I see that DataSet would be deprecated.

In recent release Flink 1.11.1 I see Blink planner, but I was not
able to get one example on how to connect to HBase and read data. Is
there any link I can refer to see some implementation of reading
from HBase as bounded data using Blink Planner/DataStream API.

Regards,
Sunitha.



On Monday, September 28, 2020, 07:12:19 PM GMT+5:30, Piotr Nowojski
mailto:pnowoj...@apache.org>> wrote:


Hi Sunitha,

First and foremost, the DataSet API will be deprecated soon [1] so I
would suggest trying to migrate to the DataStream API. When using
the DataStream API it doesn't mean that you can not work with
bounded inputs - you can. Flink SQL (Blink planner) is in fact using
DataStream API to execute both streaming and batch queries. Maybe
this path would be easier?

And about answering your question using the DataSet API - sorry, I
don't know it :( I will try to ping someone who could help here.

Piotrek

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741

pon., 28 wrz 2020 o 15:14 s_penakalap...@yahoo.com
 mailto:s_penakalap...@yahoo.com>> napisał(a):

Hi All,

Need your help in Flink Batch processing: scenario described below:

we have multiple vehicles, we get data from each vehicle at a
very high speed, 1 record per minute.
thresholds can be set by the owner for each vehicle.

Say: we have 3 vehicles, threshold is set for 2 vehicles.
Vehicle 1, threshold 20 hours, allowedPetrolConsumption=15
vehicle 2, threshold 35 hours, allowedPetrolConsumption=28
vehicle 3  no threshold set by owner.

All the vehicle data is stored in HBase tables. We have a
scheduled Batch Job every day at 12 pm to check the status of
vehicle movement and Petrol consumption against threshold and
raise an alert (vehicle1 did not move for past 20 hours, vehicle
2 consumed more petrol. )

Since it is a Batch Job, I loaded all threshold data in one
DataSet and HBase Data in another Dataset using HbaseInputFormat.

What I am failing to figure out is:
1> vehicle 1 is having threshold of 20 hours where as vehicle 2
has threshold of 35 hours, I need to fetch data from Hbase for
different scenario. Is there any better approach to get all data
using one Hbase connection.
2> how to apply alert on Dataset.  CEP pattern/ Match_recognize
is allowed only on DataStream. Please help me with a simple
example. (alert can be raised if count is zero or like petrol
consumption is too high)


I could not get any example for Dataset on google where an alert
is raised. Kindly guide me if there is any better approach

Regards,
Sunitha.





Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-09-29 Thread Till Rohrmann
Hi Austin,

could you share with us the exact job you are running (including the custom
window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic
and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> I'm not sure if I've missed something in the docs, but I'm having a bit of
> trouble with a streaming SQL job that starts w/ raw SQL queries and then
> transitions to a more traditional streaming job. I'm on Flink 1.10 using
> the Blink planner, running locally with no checkpointing.
>
> The job looks roughly like:
>
> CSV 1 -->
> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/
> process func & custom trigger --> some other ops
> CSV 3 -->
>
>
> When I remove the windowing directly after the `toRetractStream`, the
> records make it to the "some other ops" stage, but with the windowing,
> those operations are sometimes not sent any data. I can also get data sent
> to the downstream operators by putting in a no-op map before the window and
> placing some breakpoints in there to manually slow down processing.
>
>
> The logs don't seem to indicate anything went wrong and generally look
> like:
>
> 4819 [Source: Custom File source (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
> 4819 [Source: Custom File source (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
> streams are closed for task Source: Custom File source (1/1)
> (3578629787c777320d9ab030c004abd4) [FINISHED]
> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
> and sending final execution state FINISHED to JobManager for task Source:
> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
> ...
> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
> ProcessWindowFunction$1) (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  -
> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
> from RUNNING to FINISHED.
> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
> ProcessWindowFunction$1) (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
> ProcessWindowFunction$1) (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
> streams are closed for task Window(TumblingProcessingTimeWindows(6),
> TimedCountTrigger, ProcessWindowFunction$1) (1/1)
> (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
> ...
> rest of the shutdown
> ...
> Program execution finished
> Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
> Job Runtime: 783 ms
>
>
> Is there something I'm missing in my setup? Could it be my custom window
> trigger? Bug? I'm stumped.
>
>
> Thanks,
> Austin
>
>
>


Re: Flink Batch Processing

2020-09-29 Thread Till Rohrmann
Hi Sunitha,

here is some documentation about how to use the Hbase sink with Flink [1,
2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hbase.html
[2]
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-hbase-connector.html

Cheers,
Till

On Tue, Sep 29, 2020 at 9:16 AM s_penakalap...@yahoo.com <
s_penakalap...@yahoo.com> wrote:

> Hi Piotrek,
>
> Thank you for the reply.
>
> Flink changes are good, However Flink is changing so much that we are
> unable to get any good implementation examples either on Flink documents or
> any other website.
>
> Using HBaseInputFormat I was able to read the data as a DataSet<>, now I
> see that DataSet would be deprecated.
>
> In recent release Flink 1.11.1 I see Blink planner, but I was not able to
> get one example on how to connect to HBase and read data. Is there any link
> I can refer to see some implementation of reading from HBase as bounded
> data using Blink Planner/DataStream API.
>
> Regards,
> Sunitha.
>
>
>
> On Monday, September 28, 2020, 07:12:19 PM GMT+5:30, Piotr Nowojski <
> pnowoj...@apache.org> wrote:
>
>
> Hi Sunitha,
>
> First and foremost, the DataSet API will be deprecated soon [1] so I would
> suggest trying to migrate to the DataStream API. When using the DataStream
> API it doesn't mean that you can not work with bounded inputs - you can.
> Flink SQL (Blink planner) is in fact using DataStream API to execute both
> streaming and batch queries. Maybe this path would be easier?
>
> And about answering your question using the DataSet API - sorry, I don't
> know it :( I will try to ping someone who could help here.
>
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>
> pon., 28 wrz 2020 o 15:14 s_penakalap...@yahoo.com <
> s_penakalap...@yahoo.com> napisał(a):
>
> Hi All,
>
> Need your help in Flink Batch processing: scenario described below:
>
> we have multiple vehicles, we get data from each vehicle at a very high
> speed, 1 record per minute.
> thresholds can be set by the owner for each vehicle.
>
> Say: we have 3 vehicles, threshold is set for 2 vehicles.
> Vehicle 1, threshold 20 hours, allowedPetrolConsumption=15
> vehicle 2, threshold 35 hours, allowedPetrolConsumption=28
> vehicle 3  no threshold set by owner.
>
> All the vehicle data is stored in HBase tables. We have a scheduled Batch
> Job every day at 12 pm to check the status of vehicle movement and Petrol
> consumption against threshold and raise an alert (vehicle1 did not move for
> past 20 hours, vehicle 2 consumed more petrol. )
>
> Since it is a Batch Job, I loaded all threshold data in one DataSet and
> HBase Data in another Dataset using HbaseInputFormat.
>
> What I am failing to figure out is:
> 1> vehicle 1 is having threshold of 20 hours where as vehicle 2 has
> threshold of 35 hours, I need to fetch data from Hbase for different
> scenario. Is there any better approach to get all data using one Hbase
> connection.
> 2> how to apply alert on Dataset.  CEP pattern/ Match_recognize is allowed
> only on DataStream. Please help me with a simple example. (alert can be
> raised if count is zero or like petrol consumption is too high)
>
>
> I could not get any example for Dataset on google where an alert is
> raised. Kindly guide me if there is any better approach
>
> Regards,
> Sunitha.
>
>


Re: Reading from HDFS and publishing to Kafka

2020-09-29 Thread Aljoscha Krettek

Hi,

I actually have no experience running a Flink job on K8s against a 
kerberized HDFS so please take what I'll say with a grain of salt.


The only thing you should need to do is to configure the path of your 
keytab and possibly some other Kerberos settings. For that check out [1] 
and [2].


I think in general this looks like the right approach and Romans 
comments are correct as well.


Aljoscha

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html#yarnmesos-mode
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems


On 27.09.20 21:54, Khachatryan Roman wrote:

Hi,

1. Yes, StreamingExecutionEnvironment.readFile can be used for files on HDFS
2. I think this is a valid concern. Besides that, there are plans to
deprecate DataSet API [1]
4. Yes, the approach looks good

I'm pulling in Aljoscha for your 3rd question (and probably some
clarifications on others).

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741

Regards,
Roman


On Fri, Sep 25, 2020 at 12:50 PM Damien Hawes 
wrote:


Hi folks,

I've got the following use case, where I need to read data from HDFS and
publish the data to Kafka, such that it can be reprocessed by another job.

I've searched the web and read the docs. This has turned up no and
concrete examples or information of how this is achieved, or even if it's
possible at all.

Further context:

1. Flink will be deployed to Kubernetes.
2. Kerberos is active on Hadoop.
3. The data is stored on HDFS as Avro.
4. I cannot install Flink on our Hadoop environment.
5. No stateful computations will be performed.

I've noticed that the flink-avro package provides a class called
AvroInputFormat, with a nullable path field, and I think this is my goto.

Apologies for the poor formatting ahead, but the code I have in mind looks
something like this:



StreamingExecutionEnvironment env = ...;
AvroInputFormat inf = new AvroInputFormat(null, Source.class);
DataStreamSource stream = env.readFile(inf, "hdfs://path/to/data");
// rest, + publishing to Kafka using the FlinkKafkaProducer



My major questions and concerns are:

1. Is it possible to use read from HDFS using the
StreamingExecutionEnvironment object? I'm planning on using the Data Stream
API because of point (2) below.
2. Because Flink will be deployed on Kubernetes, I have a major concern
that if I were to use the Data Set API, once Flink completes and exits, the
pods will restart, causing unnecessary duplication of data. Is the pod
restart a valid concern?
3. Is there anything special I need to be worried about regarding Kerberos
in this instance? The key tab will be materialised on the pods upon start
up.
4. Is this even a valid approach? The dataset I need to read and replay is
small (12 TB).

Any help, even in part will be appreciated.

Kind regards,

Damien










Re: Flink Batch Processing

2020-09-29 Thread s_penakalap...@yahoo.com
 Hi Piotrek,
Thank you for the reply.
Flink changes are good, However Flink is changing so much that we are unable to 
get any good implementation examples either on Flink documents or any other 
website.
Using HBaseInputFormat I was able to read the data as a DataSet<>, now I see 
that DataSet would be deprecated.
In recent release Flink 1.11.1 I see Blink planner, but I was not able to get 
one example on how to connect to HBase and read data. Is there any link I can 
refer to see some implementation of reading from HBase as bounded data using 
Blink Planner/DataStream API.
Regards,Sunitha.


On Monday, September 28, 2020, 07:12:19 PM GMT+5:30, Piotr Nowojski 
 wrote:  
 
 Hi Sunitha,
First and foremost, the DataSet API will be deprecated soon [1] so I would 
suggest trying to migrate to the DataStream API. When using the DataStream API 
it doesn't mean that you can not work with bounded inputs - you can. Flink SQL 
(Blink planner) is in fact using DataStream API to execute both streaming and 
batch queries. Maybe this path would be easier?
And about answering your question using the DataSet API - sorry, I don't know 
it :( I will try to ping someone who could help here.
Piotrek
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
pon., 28 wrz 2020 o 15:14 s_penakalap...@yahoo.com  
napisał(a):

Hi All,
Need your help in Flink Batch processing: scenario described below:
we have multiple vehicles, we get data from each vehicle at a very high speed, 
1 record per minute.thresholds can be set by the owner for each vehicle. 
Say: we have 3 vehicles, threshold is set for 2 vehicles. Vehicle 1, threshold 
20 hours, allowedPetrolConsumption=15vehicle 2, threshold 35 hours, 
allowedPetrolConsumption=28vehicle 3  no threshold set by owner.
All the vehicle data is stored in HBase tables. We have a scheduled Batch Job 
every day at 12 pm to check the status of vehicle movement and Petrol 
consumption against threshold and raise an alert (vehicle1 did not move for 
past 20 hours, vehicle 2 consumed more petrol. )
Since it is a Batch Job, I loaded all threshold data in one DataSet and HBase 
Data in another Dataset using HbaseInputFormat.
What I am failing to figure out is:1> vehicle 1 is having threshold of 20 hours 
where as vehicle 2 has threshold of 35 hours, I need to fetch data from Hbase 
for different scenario. Is there any better approach to get all data using one 
Hbase connection.2> how to apply alert on Dataset.  CEP pattern/ 
Match_recognize is allowed only on DataStream. Please help me with a simple 
example. (alert can be raised if count is zero or like petrol consumption is 
too high)

I could not get any example for Dataset on google where an alert is raised. 
Kindly guide me if there is any better approach
Regards,Sunitha.
  

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-29 Thread Till Rohrmann
Great, thanks Klou!

Cheers,
Till

On Mon, Sep 28, 2020 at 5:07 PM Kostas Kloudas  wrote:

> Hi all,
>
> I will have a look.
>
> Kostas
>
> On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann 
> wrote:
> >
> > Hi Cristian,
> >
> > thanks for reporting this issue. It looks indeed like a very critical
> problem.
> >
> > The problem seems to be that the ApplicationDispatcherBootstrap class
> produces an exception (that the request job can no longer be found because
> of a lost ZooKeeper connection) which will be interpreted as a job failure.
> Due to this interpretation, the cluster will be shut down with a terminal
> state of FAILED which will cause the HA data to be cleaned up. The exact
> problem occurs in the JobStatusPollingUtils.getJobResult which is called by
> ApplicationDispatcherBootstrap.getJobResult().
> >
> > I think there are two problems here: First of all not every exception
> bubbling up in the future returned by
> ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
> job failure. Some of them can also indicate a framework failure which
> should not lead to the clean up of HA data. The other problem is that the
> polling logic cannot properly handle a temporary connection loss to
> ZooKeeper which is a normal situation.
> >
> > I am pulling in Aljoscha and Klou who worked on this feature and might
> be able to propose a solution for these problems. I've also updated the
> JIRA issue FLINK-19154.
> >
> > Cheers,
> > Till
> >
> > On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:
> >>
> >> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> >> Since we could submit multiple jobs into a Flink session, what i mean
> is when a job
> >> reached to the terminal state, the sub node(e.g.
> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> >> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_/) still exists.
> >>
> >>
> >> For your current case, it is a different case(perjob cluster). I think
> we need to figure out why the only
> >> running job reached the terminal state. For example, the restart
> attempts are exhausted. And you
> >> could find the following logs in your JobManager log.
> >>
> >> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
> >>
> >>
> >> Best,
> >> Yang
> >>
> >>
> >>
> >>
> >> Cristian  于2020年9月9日周三 上午11:26写道:
> >>>
> >>> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> >>>
> >>> What does this mean?
> >>>
> >>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
> >>>
> >>> The only cases where I expect Flink to clean up the checkpoint data
> from ZK is when I explicitly stop or cancel the job (in those cases the job
> manager takes a savepoint before cleaning up zk and finishing the cluster).
> >>>
> >>> Which is not the case here. Flink was on autopilot here and decided to
> wipe my poor, good checkpoint metadata as the logs show.
> >>>
> >>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
> >>>
> >>> AFAIK, the HA data, including Zookeeper meta data and real data on
> DFS, will only be cleaned up
> >>> when the Flink cluster reached terminated state.
> >>>
> >>> So if you are using a session cluster, the root cluster node on Zk
> will be cleaned up after you manually
> >>> stop the session cluster. The job sub directory will be cleaned up
> when the job finished/canceled/failed.
> >>>
> >>> If you are using a job/application cluster, once the only running job
> finished/failed, all the HA data will
> >>> be cleaned up. I think you need to check the job restart strategy you
> have set. For example, the following
> >>> configuration will make the Flink cluster terminated after 10 attempts.
> >>>
> >>> restart-strategy: fixed-delay
> >>> restart-strategy.fixed-delay.attempts: 10
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>> Cristian  于2020年9月9日周三 上午12:28写道:
> >>>
> >>>
> >>> I'm using the standalone script to start the cluster.
> >>>
> >>> As far as I can tell, it's not easy to reproduce. We found that
> zookeeper lost a node around the time this happened, but all of our other
> 75 Flink jobs which use the same setup, version and zookeeper, didn't have
> any issues. They didn't even restart.
> >>>
> >>> So unfortunately I don't know how to reproduce this. All I know is I
> can't sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
> >>>
> >>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
> >>>
> >>> Thanks a lot for reporting this problem here Cristian!
> >>>
> >>> I am not super familiar with the involved components, but the behavior
> you are describing doesn't sound right to me.
> >>> Which entrypoint are you using? This is 

flinksql多sink案例

2020-09-29 Thread todd
最近在看flinksql优化部分的代码,Flink在对多sink情况下进行公共子图的优化。所以想问下finksql执行多sink的案例,

在org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder#buildRelNodeBlockPlan中convertedRelNodes.size才能大于1.






--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink消费kafka,事务关闭问题

2020-09-29 Thread 宁吉浩
hi,all
最近在使用 flink 读写kafka,频繁输出procuder close 日志
但是flink checkpoint观察一段时间没有失败,数据也写入到kafka了,观察kafka server.log 也没发现报错
目前写入kafka的数据是否有丢失,暂时还没校验,有人可以给我解释下下列的日志是什么原因吗?

频繁输出下列日志 :
2020-09-29 14:13:28,916 INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version: 2.2.0
2020-09-29 14:13:28,916 INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId: 05fcfde8f69b0349
2020-09-29 14:13:28,917 INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting 
FlinkKafkaInternalProducer (1/1) to produce into default topic 
Dim_OLMS_MediaResources_collect
2020-09-29 14:13:28,918 INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-32, transactionalId=Source: 
readKafka_DWD2Dim_collect_DWD2Dim_collect -> 
flatMap_DWD2Dim_collect_DWD2Dim_collect -> Sink: 
DataStreamSink_DWD2Dim_collect_DWD2Dim_collect-1cf52c94a0b506fe08b6a6a2a44aa4ec-1]
 ProducerId set to -1 with epoch -1
2020-09-29 14:13:29,020 INFO org.apache.kafka.clients.Metadata - Cluster ID: 
kcv8nwDySzie-OBCzqs15w
2020-09-29 14:13:29,127 INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-32, transactionalId=Source: 
readKafka_DWD2Dim_collect_DWD2Dim_collect -> 
flatMap_DWD2Dim_collect_DWD2Dim_collect -> Sink: 
DataStreamSink_DWD2Dim_collect_DWD2Dim_collect-1cf52c94a0b506fe08b6a6a2a44aa4ec-1]
 ProducerId set to 2297 with epoch 2
2020-09-29 14:13:29,227 INFO 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - 
FlinkKafkaProducer 0/1 - checkpoint 6 complete, committing transaction 
TransactionHolder{handle=KafkaTransactionState [transactionalId=Source: 
readKafka_DWD2Dim_collect_DWD2Dim_collect -> 
flatMap_DWD2Dim_collect_DWD2Dim_collect -> Sink: 
DataStreamSink_DWD2Dim_collect_DWD2Dim_collect-1cf52c94a0b506fe08b6a6a2a44aa4ec-2,
 producerId=1321, epoch=2], transactionStartTime=1601359949123} from checkpoint 
6
2020-09-29 14:13:29,231 INFO org.apache.kafka.clients.producer.KafkaProducer - 
[Producer clientId=producer-31, transactionalId=Source: 
readKafka_DWD2Dim_collect_DWD2Dim_collect -> 
flatMap_DWD2Dim_collect_DWD2Dim_collect -> Sink: 
DataStreamSink_DWD2Dim_collect_DWD2Dim_collect-1cf52c94a0b506fe08b6a6a2a44aa4ec-2]
 Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-09-29 14:14:28,914 INFO 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer 
- Flushing new partitions



Re: checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-09-29 Thread 王冶
Hi~ 按你的问题顺序回答如下:
 1. Flink中的RocksDB是支持保存到hdfs的,且支持的非常好,将rocksdb的存储路径设置为hdfs路径即可。
 2.
in-flight的数据是保存在本地磁盘的,仅当checkpoint的时候,才会将本地的状态拷贝到hdfs。而且checkpoint本身不会因为远程拷贝影响计算速度。
 3.
多久备份一次,取决于你配置的checkpoint的间隔。每次checkpoint都会备份&远程拷贝。但请注意,默认配置下checkpoint会在作业停止后删除,这时候你需要手动触发savepoint,你当然也可以在作业运行过程中出发保存savepoint,savepoint的好处是不会随作业停止而删除,且可以让新作业基于savepoint启动,从而实现exactly-once或at-least的语义。
 4. Flink提供多种状态后端,需要根据你的实际场景选择。但对于大状态和高可用场景,推荐rocksdb。具体的推荐还是多读下文档。

文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#rocksdb-state-backend-details
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/large_state_tuning.html

祝好,
By Soda


On Tue, 29 Sep 2020 at 11:06, Michael Ran  wrote:

> dear all :
> 我们checkpoint 信息默认保存在rocksdb上,但是rocksdb
> 是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。
>
>
>问题是:
>1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
>2. 如果仅保存hdfs,那么性能可能跟不上
>3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
>4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的


Re:Re: flink基于源码如何编译打包生成flink-table-blink.jar

2020-09-29 Thread XiaChang
Hi Xingbo Huang
我试一下,非常感谢








在 2020-09-29 13:53:02,"Xingbo Huang"  写道:
>Hi XiaChang
>
>你可以在flink-table目录下执行打包命令。然后flink-table-uber-blink的target目录下生成的flink-table-uber-blink_2.11-1.12-SNAPSHOT.jar这个包就是你要的flink-table-blink_2.11-1.12-SNAPSHOT.jar
>
>Best,
>Xingbo
>
>zilong xiao  于2020年9月29日周二 上午11:56写道:
>
>> Hi XiaChang
>>
>>  
>> 你可以对整个Flink工程打包,然后在flink-dist/target/flink-${version}-bin/flink-${version}/lib中找到,希望对你有帮助~
>>
>> 祝好
>>
>> XiaChang <13628620...@163.com> 于2020年9月29日周二 上午10:46写道:
>>
>> > 基于flink源码 如何编译打包生成flink-table-blink.jar
>> >
>> > 源码中,flink-table是多模块的,正常打包(mvn clean install -DskipTests
>> > -Dfast)生成的是每个模块单独的jar,而不是flink-table-blink.jar
>> >
>> > 请问如何打包才能生成flink-table-blink.jar
>> >
>> >
>> >
>> >
>> >
>> >
>>


Flink配置hdfs状态后端遇到的问题

2020-09-29 Thread jester_jim
Flink中文社区的各位大佬你们好:
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2 
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: 
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir: 
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行:
2020-09-29 11:27:20,430 INFO  org.apache.hadoop.security.UserGroupInformation   
   [] - Login successful for user jester/principle using keytab file 
/kafka/flink/flink-1.11.2/conf/jester.keytab
Job has been submitted with JobID 41c01241338f1c7112d48f277701d9c3


但是如果不注释,当我提交作业就会抛出:
(本部分放在异常信息前面:
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
 classpath`
另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode?
)
2020-09-29 11:21:20,446 INFO  org.apache.hadoop.security.UserGroupInformation   
   [] - Login successful for user  jester/principle  using keytab file 
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab



 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job 'OracleSource'.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'OracleSource'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
at OracleSource.OracleSourceMain$.main(OracleSourceMain.scala:52)
at OracleSource.OracleSourceMain.main(OracleSourceMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at 

Re: 回复: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 Thread chengyanan1...@foxmail.com
zeeplin的相关文档可以参考:

https://www.yuque.com/jeffzhangjianfeng/gldg8w

这里写的很详细,收藏依赖慢慢看



 
发件人: xiao cai
发送时间: 2020-09-29 13:13
收件人: user-zh
主题: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn
非常感谢建议,有zeeplin api的相关文档吗
 
 
 
 
原始邮件 
发件人: chengyanan1...@foxmail.com
收件人: user-zh
发送时间: 2020年9月29日(周二) 09:54
主题: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn
 
 
我们项目中也是用到了这个,我也是暂时采用的捕获日志来解析得到yarn application id 和 flink job id的 
后期重点研究一下zeeplin,或许可以修改一下源码来镶嵌到我们自己的系统中或者直接调用zeeplin的api 发件人: xushanshan 发送时间: 
2020-09-25 16:42 收件人: user-zh 主题: Re: 怎么样在Flink中使用java代码提交job到yarn 
可以捕获控制台打印出来的日志,flink相关日志的格式很固定,字符串截取就能获得 yarn application id 和 flink job id > 在 
2020年9月25日,下午4:23,xiao cai  写道: > > Hi all: > 大家好,我目前遇到一个flink 
任务提交方面的困扰: > 想要在自己的项目中(springboot)提交flink 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > 
best, > xiao


Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-29 Thread Yang Wang
Hi Till, thanks for your valuable feedback.

1. Yes, leader election and storing leader information will use a same
ConfigMap. When a contender successfully performs a versioned annotation
update operation to the ConfigMap, it means that it has been elected as the
leader. And it will write the leader information in the callback of leader
elector[1]. The Kubernetes resource version will help us to avoid the
leader ConfigMap is wrongly updated.

2. The lock and release is really a valid concern. Actually in current
design, we could not guarantee that the node who tries to write his
ownership is the real leader. Who writes later, who is the owner. To
address this issue, we need to store all the owners of the key. Only when
the owner is empty, the specific key(means a checkpoint or job graph) could
be deleted. However, we may have a residual checkpoint or job graph when
the old JobManager crashed exceptionally and do not release the lock. To
solve this problem completely, we need a timestamp renew mechanism
for CompletedCheckpointStore and JobGraphStore, which could help us to the
check the JobManager timeout and then clean up the residual keys.

3. Frankly speaking, I am not against with this solution. However, in my
opinion, it is more like a temporary proposal. We could use StatefulSet to
avoid leader election and leader retrieval. But I am not sure whether
TaskManager could properly handle the situation that same hostname with
different IPs, because the JobManager failed and relaunched. Also we may
still have two JobManagers running in some corner cases(e.g. kubelet is
down but the pod is running). Another concern is we have a strong
dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
is not always true especially in self-build Kubernetes cluster. Moreover,
PV provider should guarantee that each PV could only be mounted once. Since
the native HA proposal could cover all the functionality of StatefulSet
proposal, that's why I prefer the former.


[1].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70

Best,
Yang

Till Rohrmann  于2020年9月28日周一 下午9:29写道:

> Thanks for creating this FLIP Yang Wang. I believe that many of our users
> will like a ZooKeeper-less HA setup.
>
> +1 for not separating the leader information and the leader election if
> possible. Maybe it is even possible that the contender writes his leader
> information directly when trying to obtain the leadership by performing a
> versioned write operation.
>
> Concerning the lock and release operation I have a question: Can there be
> multiple owners for a given key-value pair in a ConfigMap? If not, how can
> we ensure that the node which writes his ownership is actually the leader
> w/o transactional support from K8s? In ZooKeeper we had the same problem
> (we should probably change it at some point to simply use a
> transaction which checks whether the writer is still the leader) and
> therefore introduced the ephemeral lock nodes. What they allow is that
> there can be multiple owners of a given ZNode at a time. The last owner
> will then be responsible for the cleanup of the node.
>
> I see the benefit of your proposal over the stateful set proposal because
> it can support multiple standby JMs. Given the problem of locking key-value
> pairs it might be simpler to start with this approach where we only have
> single JM. This might already add a lot of benefits for our users. Was
> there a specific reason why you discarded this proposal (other than
> generality)?
>
> @Uce it would be great to hear your feedback on the proposal since you
> already implemented a K8s based HA service.
>
> Cheers,
> Till
>
> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang  wrote:
>
>> Hi Xintong and Stephan,
>>
>> Thanks a lot for your attention on this FLIP. I will address the comments
>> inline.
>>
>> # Architecture -> One or two ConfigMaps
>>
>> Both of you are right. One ConfigMap will make the design and
>> implementation easier. Actually, in my POC codes,
>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>> server component) for the leader election
>> and storage. Once a JobManager win the election, it will update the
>> ConfigMap with leader address and periodically
>> renew the lock annotation to keep as the active leader. I will update the
>> FLIP document, including the architecture diagram,
>> to avoid the misunderstanding.
>>
>>
>> # HA storage > Lock and release
>>
>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>> deleted by the ZK server automatically when
>> the client is timeout. It could happen in a bad network environment or
>> the ZK client crashed exceptionally. For Kubernetes,
>> we need to implement a similar mechanism. First, when we want to lock a
>> specific key in ConfigMap, we will put the owner identify,
>> lease 

如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 Thread Robin Zhang
环境: flink 1.10,使用flinkSQL

kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}

sql如下:

INSERT INTO topic_sink
SELECT
  t,
  id,
  speed,
  LAG(speed, 1) OVER w AS speed_1,
  LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
  PARTITION BY id
  ORDER BY t
)
我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
"speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
"speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
"speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
"speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
"speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
"speed_2":4.0}

实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
"speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
"speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
"speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
"speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
"speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
"speed_2":6.0}

想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?



--
Sent from: http://apache-flink.147419.n8.nabble.com/