backpressure and memory

2020-03-22 Thread seeksst
Hi, everyone:


I’m a flink sql user, and the version is 1.8.2.
Recently I confuse about memory and backpressure. I have two job on yarn, 
due to memory over, it’s frequently killed by yarn.
One job,I have 3 taskmanagers and 6 parallelism, each one has 8G memory.It read 
from kafka, one minute tumble windows to calculate pv and uv. There are many 
aggregation dimensions, to avoid data skew, it group by 
deviceId,TUMBLE(event_time, INTERVAL '1' MINUTE)。My question is that the 
checkpoint is just 60MB, I give 24G memory, why it was killed by yarn? I use 
rocksdb as backend, and data is big, but I think it should trigger backpressure 
rather than OOM, although it dosen’t trigger. In Pool Usage is 0.45 normally.
Another job looks different, I use 2 taskmanagers and 4 parallelism, each one 
has 20G memory. I define a aggregate functions to calculate complex data, group 
by date,hour,deviceId. it seems like first job, OOM and no backpressure. but 
the problem is when I read one day data, just one taskmanager was killed by 
yarn, I confuse about this. according to dashboard, I don't find data skew, but 
why just one taskmanager?
May be it’s the same question or not, but I want to know more about memory used 
in flink, and backpressure can stop source or not, and how to trigger it, 
rocksdb affect on flink.
Thanks for reading, it would be better if there were some suggestions.Thank you.

Re: Flink SQL1.10 大表join如何优化?

2020-03-22 Thread Jingsong Li
只有source(包括和source chain起来的算子)的并行度是推断的,后续shuffle过后的节点都是依赖这个参数。

Best,
Jingsong Lee

On Mon, Mar 23, 2020 at 11:01 AM 111  wrote:

> Hi jingsong,
> 非常感谢,我以为这里的并行度是自动推断的, 没注意这个参数。我试试哈
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月23日 09:59,Jingsong Li 写道:
> 在[1]里的“configuration:”配table.exec.resource.default-parallelism
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#environment-files
>
> On Mon, Mar 23, 2020 at 9:48 AM 111  wrote:
>
> Hi jingsong:
> 这里的并发是系统自动生产的,前面两张表都是通过sql-gateway,在一个session中创建出来的。所以到这里并行度都是1了...
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月23日 09:33,Jingsong Li 写道:
> Hi,
>
> 看起来你的Join SQL是有Key等值条件的,所以它可以做分布式的Join。
> 但是你的并发为1,一般来说我们分布式的计算都不会设成1,不然就是单机运算了。
>
> 就像Kurt所说, 修改你的并发:
> table.exec.resource.default-parallelism,比如设为50或100试试。
>
> Best,
> Jingsong Lee
>
> On Sun, Mar 22, 2020 at 10:08 AM Kurt Young  wrote:
>
> 你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。
>
> Best,
> Kurt
>
>
> On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:
>
> Hi:
> 看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
> Hybrid hash
> join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
> 目前看磁盘上的那部分join应该是整个任务的瓶颈。
> 具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
> 在2020年03月21日 11:01,111 写道:
> Hi, wu:
> 好的,我这边观察下gc情况。
> 另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
> | select
>
>
> wte.external_user_id,
>
> wte.union_id,
>
> mr.fk_member_id as member_id
>
> from a wte
>
> left join b mr
>
> on wte.union_id = mr.relation_code
>
> where wte.ods_date = '${today}'
>
> limit 10;
>
> |
> 我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。
>
>
> 目前不太清楚性能的瓶颈点和优化的方向:
> 1
> 网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
> in和out缓慢变化,其他的都没有什么变化。
> 2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
> 3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
> 4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
> |
> 2020-03-21 09:23:14,732 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,738 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,744 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,750 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,756 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,762 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,772 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,779 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:16,357 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 14 ms for 65536 segments
> 2020-03-21 09:23:16,453 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,478 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,494 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,509 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,522 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,539 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,554 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,574 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,598 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,611 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:20,157 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 213 ms for 131072 segments
> 2020-03-21 09:23:21,579 INFO
> org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
> build phase.
> |
>
>
>
>
> 在2020年03月21日 10:31,Jark Wu 写道:

Flink 1.10 ?? JDBCUpsertOutputFormat flush??????????????????????

2020-03-22 Thread shangwen
??JDBCpostgresql??tcpkill


2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC 
executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



??Flink??
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
  try {
 jdbcWriter.executeBatch();
 batchCount = 0;
 break;
  } catch (SQLException e) {
 LOG.error("JDBC executeBatch error, retry times = {}", i, e);
 if (i = maxRetryTimes) {
throw e;
 }
 Thread.sleep(1000 * i);
  }
   }
}


debug??
JDBCUpsertOutputFormat.flush
 - AppendOnlyWriter.executeBatch
  ...
  - PgConnection.getAutoCommit
PSQLException: This connection has been 
closedbatchStatements??
// PgStatement.java private BatchResultHandler internalExecuteBatch() throws 
SQLException {   // Construct query/parameter arrays.   
transformQueriesAndParameters();   // Empty arrays should be passed to toArray  
 // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries 
= batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = 
batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 
??   batchParameters.clear();   ...   if 
(connection.getAutoCommit()) { //    flags |= 
QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }


??FlinkjdbcWriter.executeBatchbatchStatements??Empty??Flink
// PgStatement.java public int[] executeBatch() throws SQLException {   
checkClosed();   closeForNextExecution();   if (batchStatements == null || 
batchStatements.isEmpty()) { // return new int[0];   }   
return internalExecuteBatch().getUpdateCount(); }


openissue
https://issues.apache.org/jira/browse/FLINK-16708

回复: Flink SQL1.10 大表join如何优化?

2020-03-22 Thread 111
Hi jingsong,
非常感谢,我以为这里的并行度是自动推断的, 没注意这个参数。我试试哈


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月23日 09:59,Jingsong Li 写道:
在[1]里的“configuration:”配table.exec.resource.default-parallelism

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#environment-files

On Mon, Mar 23, 2020 at 9:48 AM 111  wrote:

Hi jingsong:
这里的并发是系统自动生产的,前面两张表都是通过sql-gateway,在一个session中创建出来的。所以到这里并行度都是1了...


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月23日 09:33,Jingsong Li 写道:
Hi,

看起来你的Join SQL是有Key等值条件的,所以它可以做分布式的Join。
但是你的并发为1,一般来说我们分布式的计算都不会设成1,不然就是单机运算了。

就像Kurt所说, 修改你的并发:
table.exec.resource.default-parallelism,比如设为50或100试试。

Best,
Jingsong Lee

On Sun, Mar 22, 2020 at 10:08 AM Kurt Young  wrote:

你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。

Best,
Kurt


On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:

Hi:
看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
Hybrid hash
join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
目前看磁盘上的那部分join应该是整个任务的瓶颈。
具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
在2020年03月21日 11:01,111 写道:
Hi, wu:
好的,我这边观察下gc情况。
另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
| select


wte.external_user_id,

wte.union_id,

mr.fk_member_id as member_id

from a wte

left join b mr

on wte.union_id = mr.relation_code

where wte.ods_date = '${today}'

limit 10;

|
我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。


目前不太清楚性能的瓶颈点和优化的方向:
1
网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
in和out缓慢变化,其他的都没有什么变化。
2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
|
2020-03-21 09:23:14,732 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,738 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,744 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,750 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,756 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,762 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,772 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,779 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:16,357 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 14 ms for 65536 segments
2020-03-21 09:23:16,453 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:16,478 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,494 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,509 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:16,522 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,539 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,554 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:16,574 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,598 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,611 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:20,157 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 213 ms for 131072 segments
2020-03-21 09:23:21,579 INFO
org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
build phase.
|




在2020年03月21日 10:31,Jark Wu 写道:
Hi,

看起来你的 join 没有等值关联条件,导致只能单并发运行。你可以观察下这个 join 节点的 gc 情况,看看是不是 full gc
导致运行缓慢。
关于 batch join,Jingsong 比我更熟悉一些调优手段,也许他能提供一些思路,cc @Jingsong Li


Best,
Jark

On Fri, 20 Mar 2020 at 17:56, 111  wrote:



图片好像挂了:






https://picabstract-preview-ftn.weiyun.com/ftn_pic_abs_v3/93a8ac1299f8edd31aa93d69bd591dcc5b768e2c6f2d7a32ff3ac244040b1cac3e8afffd0daf92c4703c276fa1202361?pictype=scale=30113=3.3.3.3=23603357=F74D73D5-810B-4AE7-888C-E65BF787E490.png=750


在2020年03月20日 

Re: Flink SQL1.10 大表join如何优化?

2020-03-22 Thread Jingsong Li
在[1]里的“configuration:”配table.exec.resource.default-parallelism

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#environment-files

On Mon, Mar 23, 2020 at 9:48 AM 111  wrote:

> Hi jingsong:
> 这里的并发是系统自动生产的,前面两张表都是通过sql-gateway,在一个session中创建出来的。所以到这里并行度都是1了...
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年03月23日 09:33,Jingsong Li 写道:
> Hi,
>
> 看起来你的Join SQL是有Key等值条件的,所以它可以做分布式的Join。
> 但是你的并发为1,一般来说我们分布式的计算都不会设成1,不然就是单机运算了。
>
> 就像Kurt所说, 修改你的并发:
> table.exec.resource.default-parallelism,比如设为50或100试试。
>
> Best,
> Jingsong Lee
>
> On Sun, Mar 22, 2020 at 10:08 AM Kurt Young  wrote:
>
> 你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。
>
> Best,
> Kurt
>
>
> On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:
>
> Hi:
> 看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
> Hybrid hash
> join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
> 目前看磁盘上的那部分join应该是整个任务的瓶颈。
> 具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
> 在2020年03月21日 11:01,111 写道:
> Hi, wu:
> 好的,我这边观察下gc情况。
> 另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
> | select
>
>
> wte.external_user_id,
>
> wte.union_id,
>
> mr.fk_member_id as member_id
>
> from a wte
>
> left join b mr
>
> on wte.union_id = mr.relation_code
>
> where wte.ods_date = '${today}'
>
> limit 10;
>
> |
> 我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。
>
>
> 目前不太清楚性能的瓶颈点和优化的方向:
> 1
> 网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
> in和out缓慢变化,其他的都没有什么变化。
> 2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
> 3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
> 4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
> |
> 2020-03-21 09:23:14,732 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,738 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,744 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,750 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,756 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,762 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,772 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,779 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:16,357 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 14 ms for 65536 segments
> 2020-03-21 09:23:16,453 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,478 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,494 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,509 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,522 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,539 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,554 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,574 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,598 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,611 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:20,157 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> take 213 ms for 131072 segments
> 2020-03-21 09:23:21,579 INFO
> org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
> build phase.
> |
>
>
>
>
> 在2020年03月21日 10:31,Jark Wu 写道:
> Hi,
>
> 看起来你的 join 没有等值关联条件,导致只能单并发运行。你可以观察下这个 join 节点的 gc 情况,看看是不是 full gc
> 导致运行缓慢。
> 关于 batch join,Jingsong 比我更熟悉一些调优手段,也许他能提供一些思路,cc @Jingsong Li
> 
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 17:56, 111  wrote:
>
>
>
> 图片好像挂了:
>
>
>
>
>
>
> 

回复: Flink SQL1.10 大表join如何优化?

2020-03-22 Thread 111
Hi jingsong:
这里的并发是系统自动生产的,前面两张表都是通过sql-gateway,在一个session中创建出来的。所以到这里并行度都是1了...


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年03月23日 09:33,Jingsong Li 写道:
Hi,

看起来你的Join SQL是有Key等值条件的,所以它可以做分布式的Join。
但是你的并发为1,一般来说我们分布式的计算都不会设成1,不然就是单机运算了。

就像Kurt所说, 修改你的并发:
table.exec.resource.default-parallelism,比如设为50或100试试。

Best,
Jingsong Lee

On Sun, Mar 22, 2020 at 10:08 AM Kurt Young  wrote:

你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。

Best,
Kurt


On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:

Hi:
看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
Hybrid hash
join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
目前看磁盘上的那部分join应该是整个任务的瓶颈。
具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
在2020年03月21日 11:01,111 写道:
Hi, wu:
好的,我这边观察下gc情况。
另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
| select


wte.external_user_id,

wte.union_id,

mr.fk_member_id as member_id

from a wte

left join b mr

on wte.union_id = mr.relation_code

where wte.ods_date = '${today}'

limit 10;

|
我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。


目前不太清楚性能的瓶颈点和优化的方向:
1
网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
in和out缓慢变化,其他的都没有什么变化。
2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
|
2020-03-21 09:23:14,732 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,738 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,744 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,750 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,756 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,762 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,772 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:14,779 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 4 ms for 32768 segments
2020-03-21 09:23:16,357 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 14 ms for 65536 segments
2020-03-21 09:23:16,453 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:16,478 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,494 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,509 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:16,522 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,539 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,554 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:16,574 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,598 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 9 ms for 65536 segments
2020-03-21 09:23:16,611 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 10 ms for 65536 segments
2020-03-21 09:23:20,157 INFO
org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
rehash
take 213 ms for 131072 segments
2020-03-21 09:23:21,579 INFO
org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
build phase.
|




在2020年03月21日 10:31,Jark Wu 写道:
Hi,

看起来你的 join 没有等值关联条件,导致只能单并发运行。你可以观察下这个 join 节点的 gc 情况,看看是不是 full gc
导致运行缓慢。
关于 batch join,Jingsong 比我更熟悉一些调优手段,也许他能提供一些思路,cc @Jingsong Li


Best,
Jark

On Fri, 20 Mar 2020 at 17:56, 111  wrote:



图片好像挂了:





https://picabstract-preview-ftn.weiyun.com/ftn_pic_abs_v3/93a8ac1299f8edd31aa93d69bd591dcc5b768e2c6f2d7a32ff3ac244040b1cac3e8afffd0daf92c4703c276fa1202361?pictype=scale=30113=3.3.3.3=23603357=F74D73D5-810B-4AE7-888C-E65BF787E490.png=750


在2020年03月20日 17:52,111 写道:
您好:
我有两张表数据量都是1000多万条,需要针对两张表做join。
提交任务后,发现join十分缓慢,请问有什么调优的思路?
需要调整managed memory吗?

目前每个TaskManager申请的总内存是2g,每个taskManager上面有4个slot。taskmanager的metrics如下:
| {
"id":"container_e40_1555496777286_675191_01_000107",
"path":"akka.tcp://flink@hnode9:33156/user/taskmanager_0",
"dataPort":39423,
"timeSinceLastHeartbeat":1584697728127,

Re: Flink SQL1.10 大表join如何优化?

2020-03-22 Thread Jingsong Li
Hi,

看起来你的Join SQL是有Key等值条件的,所以它可以做分布式的Join。
但是你的并发为1,一般来说我们分布式的计算都不会设成1,不然就是单机运算了。

就像Kurt所说, 修改你的并发:
table.exec.resource.default-parallelism,比如设为50或100试试。

Best,
Jingsong Lee

On Sun, Mar 22, 2020 at 10:08 AM Kurt Young  wrote:

> 你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。
>
> Best,
> Kurt
>
>
> On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:
>
> > Hi:
> > 看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
> > Hybrid hash
> > join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
> > 目前看磁盘上的那部分join应该是整个任务的瓶颈。
> > 具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
> > 在2020年03月21日 11:01,111 写道:
> > Hi, wu:
> > 好的,我这边观察下gc情况。
> > 另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
> > | select
> >
> >
> >   wte.external_user_id,
> >
> >   wte.union_id,
> >
> >   mr.fk_member_id as member_id
> >
> > from a wte
> >
> > left join b mr
> >
> >  on wte.union_id = mr.relation_code
> >
> > where wte.ods_date = '${today}'
> >
> > limit 10;
> >
> > |
> > 我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。
> >
> >
> > 目前不太清楚性能的瓶颈点和优化的方向:
> > 1
> > 网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
> > in和out缓慢变化,其他的都没有什么变化。
> > 2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
> > 3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
> > 4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
> > |
> > 2020-03-21 09:23:14,732 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,738 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,744 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,750 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,756 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,762 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,772 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:14,779 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 4 ms for 32768 segments
> > 2020-03-21 09:23:16,357 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 14 ms for 65536 segments
> > 2020-03-21 09:23:16,453 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 10 ms for 65536 segments
> > 2020-03-21 09:23:16,478 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 9 ms for 65536 segments
> > 2020-03-21 09:23:16,494 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 9 ms for 65536 segments
> > 2020-03-21 09:23:16,509 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 10 ms for 65536 segments
> > 2020-03-21 09:23:16,522 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 9 ms for 65536 segments
> > 2020-03-21 09:23:16,539 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 9 ms for 65536 segments
> > 2020-03-21 09:23:16,554 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 10 ms for 65536 segments
> > 2020-03-21 09:23:16,574 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 9 ms for 65536 segments
> > 2020-03-21 09:23:16,598 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 9 ms for 65536 segments
> > 2020-03-21 09:23:16,611 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 10 ms for 65536 segments
> > 2020-03-21 09:23:20,157 INFO
> > org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The
> rehash
> > take 213 ms for 131072 segments
> > 2020-03-21 09:23:21,579 INFO
> > org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
> > build phase.
> > |
> >
> >
> >
> >
> > 在2020年03月21日 10:31,Jark Wu 写道:
> > Hi,
> >
> > 看起来你的 join 没有等值关联条件,导致只能单并发运行。你可以观察下这个 join 节点的 gc 情况,看看是不是 full gc
> 导致运行缓慢。
> > 关于 batch join,Jingsong 比我更熟悉一些调优手段,也许他能提供一些思路,cc @Jingsong Li
> > 
> >
> > Best,
> > Jark
> >
> > On Fri, 20 Mar 2020 at 17:56, 111  wrote:
> >
> >
> >
> > 图片好像挂了:
> >
> >
> >
> >
> >
> 

flink influxdb 有些表中不包含jobname

2020-03-22 Thread Peihui He
hello,
我这边用的是influxdb 作为flink
1.9.2的reporter, 但是在一些表里面没有jobname信息。这样会是的在每次重启的时候都得修改grafana的图标信息,很麻烦。
 请问有什么好的儿方式解决没呢?


[ANNOUNCE] Weekly Community Update 2020/12

2020-03-22 Thread Konstantin Knauf
Dear community,

happy to share this week's community digest featuring "Flink Forward
Virtual Conference 2020", a small update on Flink 1.10.1, a better
Filesystem connector for the Table API & SQL, new source/sink interfaces
for the Table API and a bit more.

Flink Development
==

* [releases] For an update on the outstanding tickets
("Blocker"/"Critical") planned for Apache *Flink 1.10.1* please see the
overview posted by Yu Li in this release discussion thread [1].

* [sql] Timo has shared a proposal (FLIP-95) for *new TableSource and
TableSink interfaces*. It is based on discussions with Jark, Dawid,
Aljoscha, Kurt, Jingsong and many more. Its goals are to simplify the
current interface architecture, to support changelog sources (FLIP-105) and
to remove dependencies on the DataStream API as well as the planner
components. [2]

* [hadoop] Following up on a discussion [3] with Stephan and Till,
Sivaprasanna has shared an overview of Hadoop related utility components to
kick off a discussion on moving these into a separate module
"flink-hadoop-utils". [4]

* [sql] Jingsong Li has started a discussion on introducing a table source
that in essence generates a random stream of data of a given schema to
facilitate development and testing in Flink SQL [5].

* [sql] Jingsong Li has started a discussion on improving the filesystem
connector for the Table API. The current filesystem connector only supports
CSV format and can only be considered experimental for streaming use cases.
There seems to be a consensus to build on top of the existing
StreamingFileSink (DataStream API) and to focus on ORC, Parquet and better
Hive interoperability. [6]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-tp38689.html
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[3]
https://lists.apache.org/thread.html/r198f09496ba46885adbcc41fe778a7a34ad1cd685eeae8beb71e6fbb%40%3Cdev.flink.apache.org%3E
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-a-new-module-flink-hadoop-utils-tp39107.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-TableFactory-for-StatefulSequenceSource-tp39116.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-115-Filesystem-connector-in-Table-tp38870.html


Notable Bugs
==

* [FLINK-16684] [1.10.0] [1.9.2] The builder of the StreamingFileSink does
not work in Scala. This is one of the blockers to drop support for the
BucketingSink (covered in last week's update). Resolved in Flink 1.10.1. [7]

[7] https://issues.apache.org/jira/browse/FLINK-16684

Events, Blog Posts, Misc
===

* Unfortunately, we had to cancel Flink Forward SF due to the spread of
SARS-CoV-2 two weeks ago. But instead we will have a three day virtual
Flink Forward conference April 22 - 24. You can register for free under [8]

* Stefan Hausmann has published a blog post on how Apache Flink can be used
for streaming ETL on AWS (Kinesis, Kafka, ElasticSearch and S3
(StreamingFileSink)). [9]

* On the Ververica blog Nico Kruber presents a small benchmark comparing
the overhead of SSL encryption in Flink depending on the SSL provider (JDK
vs OpenSSL). The difference seems to be quite significant. [10]

* Upcoming Meetups: None.

[8] https://www.flink-forward.org/sf-2020
[9]
https://aws.amazon.com/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics
[10]
https://www.ververica.com/blog/how-openssl-in-ververica-platform-improves-your-flink-job-performance

Cheers,

Konstantin (@snntrable)

-- 

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Windows on SinkFunctions

2020-03-22 Thread tison
Hi Sidney,

For the case, you can exactly write

stream.
  ...
  .window()
  .apply()
  .addSink()

Operator chain will chain these operators into one so that you don't have
to worry about the efficiency.

Best,
tison.


Sidney Feiner  于2020年3月22日周日 下午10:03写道:

> Hey,
> I wanted to know if it's possible to define a SinkFunction as a
> WindowFunction as well.
> For example, I would like the sink to be invoked every 5 minute or once
> 500 events reached the sink.
> Is there a way to do this inside the sink implementation? Or do I have to
> create the windows prior in the pipeline?
> Because if I have multiple sinks that that only for one of them I need a
> Window, the second solution might be problematic.
>
> Thanks :)
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-22 Thread Andrey Zagrebin
Thanks for summarising the discussion points, Till.

# Configuration

## Env variables
Agree, this looks like an independent effort.

## dynamic program arguments
Indeed, jobmanager.sh needs small extension. It can be addressed
independently but I think it has chance to be addressed in this release
cycle.
taskmanager.sh/flink-console.sh/flink-daemon.sh look already fit for this
from what I see.
On the other hand, we already have FLINK_PROPERTIES and indeed we can keep
it until we have env vars.

## FLINK_PROPERTIES
I am not really saying to remove this right now. Indeed, it has been
already exposed and should stay for the backwards-compatibility at the
moment.
I was just wondering whether we could maintain/advertise less approaches in
future but which are good enough for users.
The benefit of env vars / dynamic args approaches is that they are supposed
to be already supported by Flink scripts out of the box
and would have almost no maintenance for us on the docker side.
On the other hand, I can also see that setting FLINK_PROPERTIES may be
easier in certain cases comparing to generation of env vars / dynamic args.

@Thomas Weise 
The current duplication may be fixed by setting the hardcoded ports
after FLINK_PROPERTIES if they are not set.
We can look at it during implementation in detail.

## flink_docker_utils configure "option.name" “value”
The previously discussed options are mostly for running the official image.
This scripted action is mostly for custom Dockerfiles or custom entry point
scripts extending the official image
where somebody wants to ship a preconfigured custom image.
This action would already deduplicate a lot of code for the ports which are
set to hardcoded values in the entry point script of the official
Dockerfile:

if grep -E "^blob\.server\.port:.*" "${CONF_FILE}"; then
sed -i -e "s/blob.server.port:*/blob.server.port: 6124/g" "${CONF_FILE}"
else
echo "blob.server.port: 6124" >> "${CONF_FILE}"
fi

If we are in doubt to document this for the users and expose as API, we do
not have to do it and expose later if needed.
I am ok to remove 'flink_docker_utils set_web_ui_port 8081' from FLIP.

# Logging

## Logging
The idea was to fix the existing console logging properties to log also
into files to fix Web UI [1].
Then we can just use Flink scripts with 'start-foreground' argument to log
into the stdout and files.

## Stdout/err

### Console
I think the stdout/stderr of Flink process might end up in the
container stdout/stderr if we run it in 'start-foreground' mode [2] (needs
checking).

### Local files
The proposal with the *tee* command looks promising. I would prefer to
write stdout/err into separate files and preserve them as stdout/err for
container logs.
This needs more experiments but may be possible with the *tee* command. I
suggest to check the details in PRs.

# Java/Python/Dev versiona
Shipping official images with various versions can be addressed
independently.

Best,
Andrey

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-111%3A+Docker+image+unification#FLIP-111:Dockerimageunification-Logging
[2] https://docs.docker.com/config/containers/logging

On Wed, Mar 18, 2020 at 2:14 PM Till Rohrmann  wrote:

> Thanks for creating this FLIP Andrey. I like the general idea pretty much.
>
> I tried to group some of the above-mentioned points in order to give my 50
> cent.
>
> # Configuration
>
> How to configure the Flink process seems to be the biggest question. Due
> to historical reasons we have a plethora of different ways on how to
> configure the Flink Docker image.
>
> I think the most general approach would be the configuration via
> environment variables where one can specify env variables with the form
> FLINK_= which will overwrite  with 
> in the effective configuration. However, this is something which deserves a
> separate effort and is out of scope for this proposal.
>
> The next best thing for configuring Flink (not the Flink process) would be
> dynamic program arguments. For this to work, we would need to extend the
> jobmanager.sh and taskmanager.sh scripts. I think this is strictly speaking
> also an orthogonal task and could happen as a follow up/independently.
>
> This leaves us with the envsubst and FLINK_PROPERTIES approaches. Even
> though I'm not a huge fan of these approaches, I think we should still
> support them for backwards compatibility reasons. Once we support
> configuration via env variables we should deprecate these methods and
> remove them in a subsequent release.
>
> Given this, I am bit unsure about introducing yet another way
> via flink_docker_utils configure "option.name" “value”. I think we should
> only offer this option if we are sure that we want to keep it in the future
> and that it won't be superceded by the env variables approach. Otherwise it
> will only add more maintenance burden.
>
> Long story short, with the existing configuration options (envsubts,
> FLINK_PROPERTIES) we can already configure the Flink 

Windows on SinkFunctions

2020-03-22 Thread Sidney Feiner
Hey,
I wanted to know if it's possible to define a SinkFunction as a WindowFunction 
as well.
For example, I would like the sink to be invoked every 5 minute or once 500 
events reached the sink.
Is there a way to do this inside the sink implementation? Or do I have to 
create the windows prior in the pipeline?
Because if I have multiple sinks that that only for one of them I need a 
Window, the second solution might be problematic.

Thanks :)


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Issue with single job yarn flink cluster HA

2020-03-22 Thread Dinesh J
Attaching the job manager log for reference.

2020-03-22 11:39:02,693 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@host1:28681/user/dispatcher.
2020-03-22 11:39:02,724 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:02,724 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:02,791 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:02,792 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:02,861 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:02,861 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:02,931 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:02,931 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:03,001 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:03,002 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:03,071 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:03,071 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:03,141 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:03,141 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:03,211 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:03,211 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:03,281 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: host1/ipaddress1:28681
2020-03-22 11:39:03,282 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host1:28681]
has failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
host1/ipaddress1:28681]
2020-03-22 11:39:03,351 WARN  

Issue with single job yarn flink cluster HA

2020-03-22 Thread Dinesh J
Hi all,
We have single job yarn flink cluster setup with High Availability.
Sometimes job manager failure successfully restarts next attempt from
current checkpoint.
But occasionally we are getting below error.

{"errors":["Service temporarily unavailable due to an ongoing leader
election. Please refresh."]}

Hadoop version using : Hadoop 2.7.1.2.4.0.0-169

Flink version: flink-1.7.2

Zookeeper version: 3.4.6-169--1


*Below is the flink configuration*

high-availability: zookeeper

high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181

high-availability.storageDir: hdfs:///flink/ha

high-availability.zookeeper.path.root: /flink

yarn.application-attempts: 10

state.backend: rocksdb

state.checkpoints.dir: hdfs:///flink/checkpoint

state.savepoints.dir: hdfs:///flink/savepoint

jobmanager.execution.failover-strategy: region

restart-strategy: failure-rate

restart-strategy.failure-rate.max-failures-per-interval: 3

restart-strategy.failure-rate.failure-rate-interval: 5 min

restart-strategy.failure-rate.delay: 10 s



Can someone let know if I am missing something or is it a known issue?

Is it something related to hostname ip mapping issue or zookeeper version issue?

Thanks,

Dinesh


Re: flink sql如何支持每隔5分钟触发当日零点到当前5分钟的聚合计算

2020-03-22 Thread Jark Wu
如果更新频率很高的话,可以开启 minibatch [1][2],减少输出量。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation

On Sun, 22 Mar 2020 at 13:15, Tianwang Li  wrote:

> Thanks,
>
> *针对存储点,是你说的那样的。*
>
> 我还关注了另外一点,存储的更新频率。
> 我自己验证了一下,
> 在视图uv_per_10min,上的group by 聚合,因为求的UV,在max值不变的时,不会输出更新ES。
> *这里如果是PV,更新ES的频率还是很高的。*
>
> INSERT INTO cumulative_uv
> SELECT time_str, MAX(uv)
> FROM uv_per_10min
> GROUP BY time_str;
>
>
>
>
>
> Jark Wu  于2020年3月22日周日 上午10:52写道:
>
> > Hi,
> >
> > time_str 在前面已经处理过了,处理成了 10:00, 10:10, 10:20... 这种10分钟间隔的点,所以按照 time_str
> > 分组的话,一天下来也就 24*6 个点。
> > 在 Flink SQL 中,并不一定要 GROUP BY TUMLBE 才能做类似窗口聚合的操作,直接 GROUP BY hour/min/ts
> > 也能达到类似的效果。
> > 只不过前者不会输出更新,且能自动清理 state,后者会输出更新且不会自动清理 state。
> >
> > Best,
> > Jark
> >
> > On Sat, 21 Mar 2020 at 11:24, Tianwang Li  wrote:
> >
> > > Hi, Jark , 看了你的文章,有一点不是很清楚。
> > >
> > > 基于 uv_per_10min 再根据分钟时间进行一次聚合,这样每10分钟只有一个点会存储在 Elasticsearch 中,对于
> > > Elasticsearch 和 Kibana 可视化渲染的压力会小很多。
> > >
> > > INSERT INTO cumulative_uv
> > > SELECT time_str, MAX(uv)
> > > FROM uv_per_10min
> > > GROUP BY time_str;
> > >
> > >
> > > 怎么实现按分钟聚合?没有明显的窗口设置。有什么内在特性?
> > >
> > >
> > > Jark Wu  于2020年3月20日周五 上午12:25写道:
> > >
> > > > Hi 你可以看下这篇文章是否满足的你需求:
> > > >
> > > >
> > >
> >
> http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql
> > > > #
> > > > <
> > >
> >
> http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql#
> > > >
> > > > 统计一天每10分钟累计独立用户数
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Thu, 19 Mar 2020 at 23:30, hiliuxg <736742...@qq.com> wrote:
> > > >
> > > > > hi all:有这样子一个场景,我想通过每隔5分钟统计当日零点到当前5分钟的pv和uv,用批处理大概大概表达如下:
> > > > > select 
> > > > > '2020-03-19' as dt ,
> > > > > '2020-03-19 12:05:00' as etltime ,
> > > > > count(1) as pv ,
> > > > > count(distinct userid) as uv
> > > > > from t_user_log
> > > > > where logintime = '2020-03-19 00:00:00' and
> > logintime <
> > > > > '2020-03-19 12:05:00'
> > > > >
> > > > >
> > > > > 这里,没法用flink sql 处理,当时这种场景又特别多,各位大神有好的方案处理吗?
> > > >
> > >
> > >
> > > --
> > > **
> > >  tivanli
> > > **
> > >
> >
>
>
> --
> **
>  tivanli
> **
>