Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Gowri Sundaram
Hi Congxian,
Thank you so much for your response! We will go ahead and do a POC to test
out how Flink performs at scale.

Regards,
- Gowri

On Wed, May 6, 2020 at 8:34 AM Congxian Qiu  wrote:

> Hi
>
> From my experience, you should care the state size for a single task(not
> the whole job state size), the download speed for single thread is almost
> 100 MB/s (this may vary in different env), and I do not have much
> performance for loading state into RocksDB(we use an internal KV store in
> my company), but loading state into RocksDB will not slower than
> downloading from my experience.
>
> Best,
> Congxian
>
>
> Gowri Sundaram  于2020年5月3日周日 下午11:25写道:
>
>> Hi Congxian,
>> Thank you so much for your response, that really helps!
>>
>> From your experience, how long does it take for Flink to redistribute
>> terabytes of state data on node addition / node failure.
>>
>> Thanks!
>>
>> On Sun, May 3, 2020 at 6:56 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>>
>>> 1. From my experience, Flink can support such big state, you can set
>>> appropriate parallelism for the stateful operator. for RocksDB you may need
>>> to care about the disk performance.
>>> 2. Inside Flink, the state is separated by key-group, each
>>> task/parallelism contains multiple key-groups.  Flink does not need to
>>> restart when you add a node to the cluster, but every time restart from
>>> savepoint/checkpoint(or failover), Flink needs to redistribute the
>>> checkpoint data, this can be omitted if it's failover and local recovery[1]
>>> is enabled
>>> 3. for upload/download state, you can ref to the multiple thread
>>> upload/download[2][3] for speed up them
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
>>> [2] https://issues.apache.org/jira/browse/FLINK-10461
>>> [3] https://issues.apache.org/jira/browse/FLINK-11008
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>>>
 Hello all,
 We have read in multiple
 
 sources  that Flink has been
 used for use cases with terabytes of application state.

 We are considering using Flink for a similar use case with* keyed
 state in the range of 20 to 30 TB*. We had a few questions regarding
 the same.


- *Is Flink a good option for this kind of scale of data* ? We are
considering using RocksDB as the state backend.
- *What happens when we want to add a node to the cluster *?
   - As per our understanding, if we have 10 nodes in our cluster,
   with 20TB of state, this means that adding a node would require the 
 entire
   20TB of data to be shipped again from the external checkpoint remote
   storage to the taskmanager nodes.
   - Assuming 1Gb/s network speed, and assuming all nodes can read
   their respective 2TB state parallely, this would mean a *minimum
   downtime of half an hour*. And this is assuming the throughput
   of the remote storage does not become the bottleneck.
   - Is there any way to reduce this estimated downtime ?


 Thank you!

>>>


Autoscaling vs backpressure

2020-05-05 Thread Manish G
Hi,

As flink doesn't provide  out-of-box support for autoscaling, can
backpressure be considered as an alternative to it?
Autoscaling allows us to add/remove nodes as load goes up/down.
With backpressure, if load goes up system would signal upstream to release
data slowly. So we don't need to add more hardware horizontally.
Is it correct conceptually and practically?

Manish


Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread Jingsong Li
Hi,

> sink 表中没有任何主键或唯一键

这个时候更合理的方式应该是抛出异常,不过实现上可能有些不好搞

> 回撤导致的结果变成 0 ,就会执行 delete , 否则就是update

你理解的完全正确

Best
Jingsong Lee

On Wed, May 6, 2020 at 12:39 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> Thanks  Jingsong Lee.
>
> 我用的是 MySQL,sink 表中没有任何主键或唯一键.
> 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。
>
> 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog
> 试验了下,左边标上了是第几条 kafka 消息导致的行为:
>
>  +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1
> 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1
>
> 第1条消息:执行一个 INSERT
> 第2条消息:执行了 一个 DELETE, 一个  INSERT
> 第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
> 第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE
>
>
> 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是INSERT ON DUPLICATE UPDATE
>
> 不知道我这样理解是否正确。
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Jingsong Li
> Send Time: 2020-05-06 11:35
> Receiver: user-zh
> Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
> Hi,
>
> 问题一:删除数据可不单单只是retract stream的功能。upsert
> stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
> stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
>
> 问题二:正确数据应该是:
> 1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> 2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  ( 删除
> zhongtong 1)
> 3  {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2  (
> 删除yuantong 1)
> 4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
> zhongtong 1( 删除yuantong 2)
>
> 你用了什么dialect?是不是mysql?
> Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
> 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> >
> > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> > tms_company 是有变化的。
> > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
> >
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> > 发件人: Michael Ran
> > 发送时间: 2020-04-30 17:23
> > 收件人: user-zh
> > 主题: Re:FlinkSQL Retraction 问题原理咨询
> >
> >
> >
> > 指定的更新键是tms_company?
> >
> >
> > 结果是:
> > yuantong:2
> > zhongtong:2
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" <
> wangl...@geekplus.com.cn>
> > 写道:
> > >
> > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> > RDS, RDS 表没有主键,也没有唯一键。
> > >
> > >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> > order_cnt from
> > >(select order_id, LAST_VALUE(tms_company) AS tms_company from
> > dwd_table group by order_id)
> > > group by tms_company;
> > >
> > >
> > >总共发送了 4 条消息,顺序如下:
> > >
> > >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> > >
> > >2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1
> > (上一条记录被删除了)
> > >
> > >3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> > yuantong 2  (增加了条记录,没有删除)
> > >
> > >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> > yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除)
> > >
> > >
> > >问题一:
> > >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> >
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> > >
> > >问题二:
> > >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> > >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> > >
> > >谢谢,
> > >王磊
> > >
> > >
> > >
> > >wangl...@geekplus.com.cn
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread wangl...@geekplus.com.cn

Thanks  Jingsong Lee.

我用的是 MySQL,sink 表中没有任何主键或唯一键.
如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。

我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 
kafka 消息导致的行为:

 +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - 
yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1

第1条消息:执行一个 INSERT 
第2条消息:执行了 一个 DELETE, 一个  INSERT 
第3条消息:执行了一个  INSERT ON DUPLICATE UPDATE
第4条消息:执行了两个  INSERT ON DUPLICATE UPDATE


我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是INSERT ON DUPLICATE UPDATE

不知道我这样理解是否正确。

谢谢,
王磊




wangl...@geekplus.com.cn

Sender: Jingsong Li
Send Time: 2020-05-06 11:35
Receiver: user-zh
Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
Hi,
 
问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
 
问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1( 删除yuantong 2)
 
你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
 
Best,
Jingsong Lee
 
On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wangl...@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" 
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >(select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除)
> >
> >
> >问题一:
> >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wangl...@geekplus.com.cn
>
 
 
-- 
Best, Jingsong Lee


Re: flink sql 处理时间 时区问题

2020-05-05 Thread Jark Wu
Yes. This is the same problem with CURRENT_TIMESTAMP which was asked before
in mailing list and JIRA.
Changing the return type to WITH LOCAL TIME ZONE is not a small work, we
should make event-time and watermark support this type.
But I think this is in a high priority and should be fixed in the next
release.

Best,
Jark

On Wed, 6 May 2020 at 09:47, Jingsong Li  wrote:

> Hi,
>
> 这可能是个Bug。
>
> Blink中默认使用timestamp WITHOUT time zone,所以它是无时区的。
> 而proctime目前还是带时区的产生了时间,我理解可能是应该产生无时区的时间。
>
> CC: @Jark Wu  @Zhenghua Gao 
>
> Best,
> Jingsong Lee
>
> On Tue, May 5, 2020 at 5:43 PM 祝尚 <17626017...@163.com> wrote:
>
>> 同问,等待大佬回答
>>
>> > 2020年5月1日 下午5:26,hb <343122...@163.com> 写道:
>> >
>> >
>> >
>> > ``` 代码
>> >  val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >  val settings: EnvironmentSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
>> settings)
>> >
>> >
>> >  val t2 = env.fromElements("a", "b", "c")
>> >  t2.toTable(tEnv, 'f1, 'proc.proctime).toAppendStream[Row].print()
>> > ```
>> >
>> >
>> >  // 时区上海, 当前时间为 2020-05-01 15点
>> >  // 结果显示 a,2020-05-01T09:11:05.633, 处理时间相差8小时, 请问如何让 proctime 结果显示不差8小时.
>>
>>
>
> --
> Best, Jingsong Lee
>


??????What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread ????
Hi LakeShen,


Default use yarn's tmp dir.


Best,
fanrui


----
??:"LakeShen"

What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread LakeShen
Hi community,

Now I have a question about flink checkpoint local directory , our flink
version is 1.6, job mode is

flink on yarn per job . I saw the flink source code , and I find the flink
checkpoint local directory is

/tmp when you didn't config the "state.backend.rocksdb.localdir". But I go
into the /tmp dir ,I

couldn't find the flink checkpoint state local directory.

What is the RocksDB local directory in flink checkpointing?  I am looking
forward to your reply.

Best,
LakeShen


What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread LakeShen
Hi community,

Now I have a question about flink checkpoint local directory , our flink
version is 1.6, job mode is

flink on yarn per job . I saw the flink source code , and I find the flink
checkpoint local directory is

/tmp when you didn't config the "state.backend.rocksdb.localdir". But I go
into the /tmp dir ,I

couldn't find the flink checkpoint state local directory.

What is the RocksDB local directory in flink checkpointing?  I am looking
forward to your reply.

Best,
LakeShen


Re:回复: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread Michael Ran



1.flink 状态或者内存维护了所有结果。

2.当group by  count 结果值(tms_company=1),新来一条记录变成(tms_company=2)

   tms_company=1 (旧,false)
tms_company=2 (新,true)


3. 内存里面就会把旧的舍弃掉,用新的参与后续计算


4.如果存储(mysql 之类的),会生成对应的SQL 进行更新掉







在 2020-05-06 10:36:35,"wangl...@geekplus.com.cn"  写道:
>
>更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 
>tms_company 是有变化的。
>我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。 
>
>
>
>
>wangl...@geekplus.com.cn
>
>发件人: Michael Ran
>发送时间: 2020-04-30 17:23
>收件人: user-zh
>主题: Re:FlinkSQL Retraction 问题原理咨询
> 
> 
> 
>指定的更新键是tms_company?
> 
> 
>结果是:
>yuantong:2
>zhongtong:2
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn"  写道:
>>
>>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, 
>>RDS 表没有主键,也没有唯一键。
>>
>>INSERT INTO table_out select  tms_company,  count(distinct order_id) as 
>>order_cnt from 
>>(select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table 
>> group by order_id) 
>> group by tms_company;
>>
>>
>>总共发送了 4 条消息,顺序如下:
>>
>>1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
>>
>>2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  
>>(上一条记录被删除了)
>>
>>3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 
>>2  (增加了条记录,没有删除)
>>
>>4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1, yuantong 
>>2, yuantong 1, zhongtong 1(增加了两条记录,没有删除)
>>
>>
>>问题一:
>>第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 
>> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
>>  这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>>
>>问题二:
>>   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
>>   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>>
>>谢谢,
>>王磊
>>
>>
>>
>>wangl...@geekplus.com.cn 


Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread Jingsong Li
Hi,

问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。

问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1( 删除yuantong 2)

你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?

Best,
Jingsong Lee

On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wangl...@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" 
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >(select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}  数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1(增加了两条记录,没有删除)
> >
> >
> >问题一:
> >第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wangl...@geekplus.com.cn
>


-- 
Best, Jingsong Lee


Re: Flink监控: promethues获取到有的metrics没有包含flink 对应的job_name或者job_id

2020-05-05 Thread Yun Tang
Hi

flink_jobmanager_Status 这种metrics属于jobmanager层级的metrics,这种metrics与job 
level的metrics,从概念上来说是不一样的。因为Flink是支持一个JM里面同时运行多个作业的,但是JM的JVM实际上只有一个,所以如果给JM的metrics增加其从属的job_id
 的tag是不符合语义的。当然,如果一个host上有多个JM,现在Flink不太好区分,目前只有TM级别的tm_id可以区分不同的TM。
如果非要加上job_name 或者 job_id 才能识别的话,只能按照你分享的文章中修改reporter的代码。不过我觉得相比于增加job_name 
这种tag,我更建议增加 cluster_name 这种tag。

祝好
唐云


From: 俞剑波 
Sent: Thursday, April 30, 2020 22:00
To: user-zh@flink.apache.org 
Subject: Re: Flink监控: promethues获取到有的metrics没有包含flink 对应的job_name或者job_id

请问可以怎么根据java代码去设置这个东西,希望大佬能教一下,非常感谢!!!

972684638  于2020年4月30日周四 下午7:28写道:

> metrics.reporter.promgateway.jobName这个配置,可以通过java代码来设置,任务启动时,将它设成job id即可;
> 另外,把suffix那个配置去掉;
>
> 接下来,就根据exported_job去统计即可;
>
>
>
> ---原始邮件---
> 发件人: "俞剑波" 发送时间: 2020年4月30日(星期四) 晚上7:19
> 收件人: "user-zh" 主题: Re: Flink监控: promethues获取到有的metrics没有包含flink 对应的job_name或者job_id
>
>
> 再说一下我的详细情况,集群的是*per job*模式,指标是都采集到了,然后说明一下我的配置和遇到的情况:
> 1.在flink-conf.yaml是这么配置的
> metrics.reporter.promgateway.class:
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
> metrics.reporter.promgateway.host: 10.20.0.200
> metrics.reporter.promgateway.port: 9091
> metrics.reporter.promgateway.jobName: FlinkMonitor
> metrics.reporter.promgateway.randomJobNameSuffix: true
> metrics.reporter.promgateway.deleteOnShutdown: false
>
> 2.在promethues端都能够获取的到参数的值,但是获取不到flink对应的job_Name或者job_id的信息:
> 拿*flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed* 这个参数举例获取到的值:
>
> {exported_job="FlinkMonitor00c46b7355bca443825de2b01ce2d3a2",host="hadoop224",instance="
> 10.20.0.200:9091",job="push_metrics"}
> 再拿另外一个参数*flink_jobmanager_job_numberOfCompletedCheckpoints*
> 举例:它就有jobname,和job_id
>
> {exported_job="FlinkMonitor0ac46e79cf5d6fc2990db1574c2b0bf2",host="hadoop212",instance="
> 10.20.0.200:9091",job="push_metrics",*job_id*
> ="30f4075c798d6f3bd73c2bfe7bc54cf4",*job_name*="tj_history_log_itcourse"}
>
> 经过发现得出一个结论,凡是“*flink_jobmanager_Status_*”开头或者“*flink_jobmanager_Status_*
> ”发过来的都没有job_name和job_id
>
> 3.看过一个帖子也是per-job模式,它采用的influxdb,是去改源码:
> https://blog.csdn.net/u013516966/article/details/103171484
>
> 4.希望有遇到这样问题并解决的大佬小伙伴能够帮个忙,说一下怎么解决。
>
> 俞剑波 
>  我添加了这个参数配置发现这个只是解决了标签冲突,还是没有这个job_name:
> 
> 
> 不加参数:flink_jobmanager_Status_JVM_Memory_Heap_Used{exported_job="myJobYJB4eea972f622437b738875b3e8e811a56",host="localhost",instance="pushgateway",job="pushgateway"}
>  加了参数:
> 
> flink_jobmanager_Status_JVM_Memory_Heap_Used{host="localhost",job="myJobYJB4eea972f622437b738875b3e8e811a56"}
> 
>  972684638  
>  你看看是不是因为没有没配honor_labels: true
> 
> 
> 
>  ---原始邮件---
>  发件人: "俞剑波"  发送时间: 2020年4月30日(星期四) 下午4:32
>  收件人: "user-zh"  主题: Re: Flink监控: promethues获取到有的metrics没有包含flink
> 对应的job_name或者job_id
> 
> 
>  是的,我使用了!请问大佬有解决办法吗,真的卡很多天了,希望能够解决一下,非常感谢
> 
>  972684638  
>  gt; 我想知道,你是否使用了pushgateway?
>  gt;
>  gt;
>  gt;
>  gt; ---原始邮件---
>  gt; 发件人: "俞剑波"  gt; 发送时间: 2020年4月30日(星期四) 下午4:01
>  gt; 收件人: "user-zh"  gt; 主题: Flink监控: promethues获取到有的metrics没有包含flink
> 对应的job_name或者job_id
>  gt;
>  gt;
>  gt; flink集群是per
>  gt;
>  gt;
> 
> job模式,所以当有多个并行度,任务就会有多个taskmanager在多个机器上面。采用flink-metrics-prometheus的方式会遇到一个问题,就是收集taskmanager的jvm信息时,promethues返回的记录里面没有flink对应的job_name或者job_id,导致不能将数据汇总起来,请问各位有遇到这个问题吗?是怎么解决的?非常感谢!
> 
> 


Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Congxian Qiu
Hi

>From my experience, you should care the state size for a single task(not
the whole job state size), the download speed for single thread is almost
100 MB/s (this may vary in different env), and I do not have much
performance for loading state into RocksDB(we use an internal KV store in
my company), but loading state into RocksDB will not slower than
downloading from my experience.

Best,
Congxian


Gowri Sundaram  于2020年5月3日周日 下午11:25写道:

> Hi Congxian,
> Thank you so much for your response, that really helps!
>
> From your experience, how long does it take for Flink to redistribute
> terabytes of state data on node addition / node failure.
>
> Thanks!
>
> On Sun, May 3, 2020 at 6:56 PM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> 1. From my experience, Flink can support such big state, you can set
>> appropriate parallelism for the stateful operator. for RocksDB you may need
>> to care about the disk performance.
>> 2. Inside Flink, the state is separated by key-group, each
>> task/parallelism contains multiple key-groups.  Flink does not need to
>> restart when you add a node to the cluster, but every time restart from
>> savepoint/checkpoint(or failover), Flink needs to redistribute the
>> checkpoint data, this can be omitted if it's failover and local recovery[1]
>> is enabled
>> 3. for upload/download state, you can ref to the multiple thread
>> upload/download[2][3] for speed up them
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
>> [2] https://issues.apache.org/jira/browse/FLINK-10461
>> [3] https://issues.apache.org/jira/browse/FLINK-11008
>>
>> Best,
>> Congxian
>>
>>
>> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>>
>>> Hello all,
>>> We have read in multiple
>>> 
>>> sources  that Flink has been
>>> used for use cases with terabytes of application state.
>>>
>>> We are considering using Flink for a similar use case with* keyed state
>>> in the range of 20 to 30 TB*. We had a few questions regarding the same.
>>>
>>>
>>>- *Is Flink a good option for this kind of scale of data* ? We are
>>>considering using RocksDB as the state backend.
>>>- *What happens when we want to add a node to the cluster *?
>>>   - As per our understanding, if we have 10 nodes in our cluster,
>>>   with 20TB of state, this means that adding a node would require the 
>>> entire
>>>   20TB of data to be shipped again from the external checkpoint remote
>>>   storage to the taskmanager nodes.
>>>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>>>   their respective 2TB state parallely, this would mean a *minimum
>>>   downtime of half an hour*. And this is assuming the throughput of
>>>   the remote storage does not become the bottleneck.
>>>   - Is there any way to reduce this estimated downtime ?
>>>
>>>
>>> Thank you!
>>>
>>


Re: multiple joins in one job

2020-05-05 Thread Benchao Li
Yes. The watermark will be propagated correctly, which is the min of two
inputs.

lec ssmi  于2020年5月6日周三 上午9:46写道:

> Even if the time attribute field is retained, will the  related watermark
> be retained?
> If not, and there is no sql syntax to declare watermark again, it is
> equivalent to not being able to do multiple joins in one job.
>
> Benchao Li  于2020年5月5日周二 下午9:23写道:
>
>> You cannot select more than one time attribute, the planner will give you
>> an Exception if you did that.
>>
>>
>> lec ssmi  于2020年5月5日周二 下午8:34写道:
>>
>>> As  you said, if   I  select  all  the  time  attribute  fields   from
>>> both  ,  which  will be  the  final  one?
>>>
>>> Benchao Li  于 2020年5月5日周二 17:26写道:
>>>
 Hi lec,

 You don't need to specify time attribute again like `TUMBLE_ROWTIME`,
 you just select  the time attribute field
 from one of the input, then it will be time attribute automatically.

 lec ssmi  于2020年5月5日周二 下午4:42写道:

> But  I  have  not  found  there  is  any  syntax to  specify   time
>  attribute  field  and  watermark  again  with  pure  sql.
>
> Fabian Hueske  于 2020年5月5日周二 15:47写道:
>
>> Sure, you can write a SQL query with multiple interval joins that
>> preserve event-time attributes and watermarks.
>> There's no need to feed data back to Kafka just to inject it again to
>> assign new watermarks.
>>
>> Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi <
>> shicheng31...@gmail.com>:
>>
>>> I mean using pure sql statement to make it . Can it be possible?
>>>
>>> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>>>
 Hi,

 If the interval join emits the time attributes of both its inputs,
 you can use either of them as a time attribute in a following operator
 because the join ensures that the watermark will be aligned with both 
 of
 them.

 Best, Fabian

 Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
 shicheng31...@gmail.com>:

> Thanks for your replay.
> But as I known, if   the time attribute  will be retained and  the
> time attribute field  of both streams is selected in the result after
> joining, who is the final time attribute variable?
>
> Benchao Li  于2020年4月30日周四 下午8:25写道:
>
>> Hi lec,
>>
>> AFAIK, time attribute will be preserved after time interval join.
>> Could you share your DDL and SQL queries with us?
>>
>> lec ssmi  于2020年4月30日周四 下午5:48写道:
>>
>>> Hi:
>>>I need to join multiple stream tables  using  time interval
>>> join.  The problem is that the time attribute will disappear  after 
>>> the jon
>>> , and  pure  sql cannot declare the time attribute field again . 
>>> So, to
>>> make is success,  I need to insert  the last result of join to 
>>> kafka ,and
>>> consume it and join it with another stream table  in another flink 
>>> job
>>> . This seems troublesome.
>>> Any good idea?
>>>
>>>
>>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking 
>> University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

 --

 Benchao Li
 School of Electronics Engineering and Computer Science, Peking University
 Tel:+86-15650713730
 Email: libenc...@gmail.com; libenc...@pku.edu.cn


>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink sql 处理时间 时区问题

2020-05-05 Thread Jingsong Li
Hi,

这可能是个Bug。

Blink中默认使用timestamp WITHOUT time zone,所以它是无时区的。
而proctime目前还是带时区的产生了时间,我理解可能是应该产生无时区的时间。

CC: @Jark Wu  @Zhenghua Gao 

Best,
Jingsong Lee

On Tue, May 5, 2020 at 5:43 PM 祝尚 <17626017...@163.com> wrote:

> 同问,等待大佬回答
>
> > 2020年5月1日 下午5:26,hb <343122...@163.com> 写道:
> >
> >
> >
> > ``` 代码
> >  val env = StreamExecutionEnvironment.getExecutionEnvironment
> >  val settings: EnvironmentSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
> settings)
> >
> >
> >  val t2 = env.fromElements("a", "b", "c")
> >  t2.toTable(tEnv, 'f1, 'proc.proctime).toAppendStream[Row].print()
> > ```
> >
> >
> >  // 时区上海, 当前时间为 2020-05-01 15点
> >  // 结果显示 a,2020-05-01T09:11:05.633, 处理时间相差8小时, 请问如何让 proctime 结果显示不差8小时.
>
>

-- 
Best, Jingsong Lee


Re: multiple joins in one job

2020-05-05 Thread lec ssmi
Even if the time attribute field is retained, will the  related watermark
be retained?
If not, and there is no sql syntax to declare watermark again, it is
equivalent to not being able to do multiple joins in one job.

Benchao Li  于2020年5月5日周二 下午9:23写道:

> You cannot select more than one time attribute, the planner will give you
> an Exception if you did that.
>
>
> lec ssmi  于2020年5月5日周二 下午8:34写道:
>
>> As  you said, if   I  select  all  the  time  attribute  fields   from
>> both  ,  which  will be  the  final  one?
>>
>> Benchao Li  于 2020年5月5日周二 17:26写道:
>>
>>> Hi lec,
>>>
>>> You don't need to specify time attribute again like `TUMBLE_ROWTIME`,
>>> you just select  the time attribute field
>>> from one of the input, then it will be time attribute automatically.
>>>
>>> lec ssmi  于2020年5月5日周二 下午4:42写道:
>>>
 But  I  have  not  found  there  is  any  syntax to  specify   time
  attribute  field  and  watermark  again  with  pure  sql.

 Fabian Hueske  于 2020年5月5日周二 15:47写道:

> Sure, you can write a SQL query with multiple interval joins that
> preserve event-time attributes and watermarks.
> There's no need to feed data back to Kafka just to inject it again to
> assign new watermarks.
>
> Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi <
> shicheng31...@gmail.com>:
>
>> I mean using pure sql statement to make it . Can it be possible?
>>
>> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>>
>>> Hi,
>>>
>>> If the interval join emits the time attributes of both its inputs,
>>> you can use either of them as a time attribute in a following operator
>>> because the join ensures that the watermark will be aligned with both of
>>> them.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>>> shicheng31...@gmail.com>:
>>>
 Thanks for your replay.
 But as I known, if   the time attribute  will be retained and  the
 time attribute field  of both streams is selected in the result after
 joining, who is the final time attribute variable?

 Benchao Li  于2020年4月30日周四 下午8:25写道:

> Hi lec,
>
> AFAIK, time attribute will be preserved after time interval join.
> Could you share your DDL and SQL queries with us?
>
> lec ssmi  于2020年4月30日周四 下午5:48写道:
>
>> Hi:
>>I need to join multiple stream tables  using  time interval
>> join.  The problem is that the time attribute will disappear  after 
>> the jon
>> , and  pure  sql cannot declare the time attribute field again . So, 
>> to
>> make is success,  I need to insert  the last result of join to kafka 
>> ,and
>> consume it and join it with another stream table  in another flink 
>> job
>> . This seems troublesome.
>> Any good idea?
>>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking 
> University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-05 Thread Jingsong Li
Hi Peter,

The troublesome is how to know the "ending" for a bucket in streaming job.
In 1.11, we are trying to implement a watermark-related bucket ending
mechanism[1] in Table/SQL.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck 
wrote:

> I am replacing an M/R job with a Streaming job using the StreamingFileSink
> and there is a requirement to generate an empty _SUCCESS file like the old
> Hadoop job. I have to implement a similar Batch job to read from backup
> files in case of outages or downtime.
>
> The Batch job question was answered here and appears to be still relevant
> although if someone could confirm for me that would be great.
> https://stackoverflow.com/a/39413810
>
> The question of the Streaming job came up back in 2018 here:
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3cff74eed5-602f-4eaa-9bc1-6cdf56611...@gmail.com%3E
>
> But the solution to use or extend the BucketingSink class seems out of
> date now that BucketingSink has been deprecated.
>
> Is there a way to implement a similar solution for StreamingFileSink?
>
> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
> future.
>
> Thank you,
> Peter
>


-- 
Best, Jingsong Lee


Re: Cannot start native K8s

2020-05-05 Thread Yang Wang
Hi Dongwon Kim,

I think it is a known issue. The native kubernetes integration could not
work with jdk 8u252
due to okhttp issue[1]. Currently, you could upgrade your jdk to a new
version to work around.


[1]. https://issues.apache.org/jira/browse/FLINK-17416

Dongwon Kim  于2020年5月6日周三 上午7:15写道:

> Hi,
>
> I'm using Flink-1.10 and tested everything [1] successfully.
> While trying [2], I got the following message.
> Can anyone help please?
>
> [root@DAC-E04-W06 bin]# ./kubernetes-session.sh
>> 2020-05-06 08:10:49,411 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.address, DAC-E04-W06
>> 2020-05-06 08:10:49,412 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2020-05-06 08:10:49,412 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.heap.size, 1024m
>> 2020-05-06 08:10:49,412 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.memory.process.size, 24g
>> 2020-05-06 08:10:49,413 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.numberOfTaskSlots, 24
>> 2020-05-06 08:10:49,413 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: parallelism.default, 1
>> 2020-05-06 08:10:49,413 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability, zookeeper
>> 2020-05-06 08:10:49,413 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.zookeeper.path.root, /flink
>> 2020-05-06 08:10:49,414 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
>> 2020-05-06 08:10:49,414 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
>> 2020-05-06 08:10:49,414 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.execution.failover-strategy, region
>> 2020-05-06 08:10:49,415 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: rest.port, 8082
>> 2020-05-06 08:10:50,386 ERROR
>> org.apache.flink.kubernetes.cli.KubernetesSessionCli  - Error while
>> running the Flink session.
>> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
>>  for kind: [Service]  with name:
>> [flink-cluster-5c12bd50-a540-4614-96d0-549785a8bc62]  in namespace:
>> [default]  failed.
>> at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
>> at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
>> at
>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
>> at
>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
>> at
>> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
>> at
>> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
>> at
>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
>> at
>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at
>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
>> Caused by: java.net.SocketException: Broken pipe (Write failed)
>> at java.net.SocketOutputStream.socketWrite0(Native Method)
>> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
>> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
>> at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
>> at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
>> at
>> sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:894)
>> at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:865)
>> at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
>> at org.apache.flink.kubernetes.shadded.okio.Okio$1.write(Okio.java:79)
>> at
>> org.apache.flink.kubernetes.shadded.okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
>> at
>> org.apache.flink.kubernetes.shadded.okio.RealBufferedSink.flush(RealBufferedSink.java:224)
>> at
>> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Writer.settings(Http2Writer.java:203)
>> at
>> 

Export user metrics with Flink Prometheus endpoint

2020-05-05 Thread Eleanore Jin
Hi all,

I just wonder is it possible to use Flink Metrics endpoint to allow
Prometheus to scrape user defined metrics?

Context:
In addition to Flink metrics, we also collect some application level
metrics using opencensus. And we run opencensus agent as side car in
kubernetes pod to collect metrics (opencensus agent talks to task manager
container via rpcs)

The issue with this approach is: it looks like opencensus agent keeps
staled metrics, causing the metrics reporting inaccurate, and this project
is not actively maintained anymore.

So I wonder if it is possible to use Flink metrics endpoint for user
defined metrics.

Thanks a lot!
Eleanore


Export user metrics with Flink Prometheus endpoint

2020-05-05 Thread Eleanore Jin
Hi all,

I just wonder is it possible to use Flink Metrics endpoint to allow
Prometheus to scrape user defined metrics?

Context:
In addition to Flink metrics, we also collect some application level
metrics using opencensus. And we run opencensus agent as side car in
kubernetes pod to collect metrics (opencensus agent talks to task manager
container via rpcs)

The issue with this approach is: it looks like opencensus agent keeps
staled metrics, causing the metrics reporting inaccurate, and this project
is not actively maintained anymore.

So I wonder if it is possible to use Flink metrics endpoint for user
defined metrics.

Thanks a lot!
Eleanore


Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-05 Thread Nick Bendtner
Hi guys,
In our flink job we use java source for deserializing a message from kafka
using a kafka deserializer. Signature is as below.


public class CustomAvroDeserializationSchema implements
KafkaDeserializationSchema>

The other parts of the streaming job are in scala. When data has to be
serialized I get this exception




*java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot
be cast to scala.Product at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*

Here is how I provide type info for serialization in the java
deserialization class:

@Override
public TypeInformation> getProducedType() {
return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
GenericRecordAvroTypeInfo(this
.writer));

Here is how I add the kafka source in scala :

private[flink] def sourceType(
  deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
  properties: Properties): FlinkKafkaConsumer[(EventMetaData,
GenericRecord)] = {
  val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
source.asJava,
deserialization,
properties)
  consumer
}

Any idea thoughts on how to interoperate between java tuple2 and scala case
class ? Also using 1.9.1 version of flink-connector-kafka while the rest of
the cluster uses 1.7.2 version of flink.

Best,
Nick.


MongoDB as a Sink;

2020-05-05 Thread Aissa Elaffani
Hello Guys,
I am looking for some help concerning my flink sink, i want te output to be
stocked in MongoDB database. As far as I know, there is no sink
conector for MongoDB, and I need to implement one by my self, and i don't
know how to do that. Can you please help me in this ?


Cannot start native K8s

2020-05-05 Thread Dongwon Kim
Hi,

I'm using Flink-1.10 and tested everything [1] successfully.
While trying [2], I got the following message.
Can anyone help please?

[root@DAC-E04-W06 bin]# ./kubernetes-session.sh
> 2020-05-06 08:10:49,411 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, DAC-E04-W06
> 2020-05-06 08:10:49,412 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-05-06 08:10:49,412 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.size, 1024m
> 2020-05-06 08:10:49,412 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.memory.process.size, 24g
> 2020-05-06 08:10:49,413 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 24
> 2020-05-06 08:10:49,413 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2020-05-06 08:10:49,413 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability, zookeeper
> 2020-05-06 08:10:49,413 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.zookeeper.path.root, /flink
> 2020-05-06 08:10:49,414 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
> 2020-05-06 08:10:49,414 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
> 2020-05-06 08:10:49,414 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-05-06 08:10:49,415 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: rest.port, 8082
> 2020-05-06 08:10:50,386 ERROR
> org.apache.flink.kubernetes.cli.KubernetesSessionCli  - Error while
> running the Flink session.
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
>  for kind: [Service]  with name:
> [flink-cluster-5c12bd50-a540-4614-96d0-549785a8bc62]  in namespace:
> [default]  failed.
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
> Caused by: java.net.SocketException: Broken pipe (Write failed)
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
> at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
> at
> sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:894)
> at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:865)
> at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
> at org.apache.flink.kubernetes.shadded.okio.Okio$1.write(Okio.java:79)
> at
> org.apache.flink.kubernetes.shadded.okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
> at
> org.apache.flink.kubernetes.shadded.okio.RealBufferedSink.flush(RealBufferedSink.java:224)
> at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Writer.settings(Http2Writer.java:203)
> at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Connection.start(Http2Connection.java:515)
> at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Connection.start(Http2Connection.java:505)
> at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.RealConnection.startHttp2(RealConnection.java:298)
> at
> 

Flink pipeline;

2020-05-05 Thread Aissa Elaffani
Hello Guys,
I am new to the real-time streaming field, and I am trying to build a BIG
DATA architecture for processing real-time streaming. I have some sensors
that generate data in json format, they are sent to Apache kafka cluster
then i want to consume them with Apache flinkin ordre to do some
aggregation. The probleme is that the data coming from kafka contains " the
sensor ID , the equipement ID in wiche it is installed, and the status of
the equipment..", knowing that the each sensor is installed in an
equipement, and the equipement is linked to an workshop that it self linked
to factory. So i need an other data source for the workshop and factories,
because i want to do aggregation on factories, and the data sent by the
sensors contains just the sensorIDand the equipementID...
Guys I am new to the this field, and i am stuck in this. Can someone please
help me to achieve my goal, and explain to me how can i do that. And how
can i do this complexed aggregation??And if there is any optmisation to do?
Sorry for disturbing you !!!
AISSA


Overriding hadoop core-site.xml keys using the flink-fs-hadoop-shaded assemblies

2020-05-05 Thread Jeff Henrikson
Has anyone had success overriding hadoop core-site.xml keys using the 
flink-fs-hadoop-shaded assemblies?  If so, what versions were known to work?


Using btrace, I am seeing a bug in the hadoop shaded dependencies 
distributed with 1.10.0.  Some (but not all) of the core-site.xml keys 
cannot be overridden.


Thanks,


Jeff Henrikson


Flink - Hadoop Connectivity - Unable to read file

2020-05-05 Thread Samik Mukherjee
Hi All,

I am trying to get some file from HDFS which is locally installed. But I am
not able to. I tried with both these ways. But all the time the program is
ending with "Process finished with exit code 239." Any help will be helpful-

public class Processor {


  public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.*getExecutionEnvironment*();
//env.setParallelism(1);

Job job = Job.*getInstance*();
HadoopInputFormat hadoopInputFormat = new
HadoopInputFormat(new TextInputFormat(),
LongWritable.class, Text.class, job);
TextInputFormat.*addInputPath*(job, new
Path("hdfs://localhost:9000/newfolder/testdata1"));
DataSet> text =
env.createInput(hadoopInputFormat);
DataSet> words = text.flatMap(new Tokenizer());



words.print();
env.execute("Processor");
}


public static final class Tokenizer extends
RichFlatMapFunction, Tuple2> {
@Override
public void flatMap(Tuple2 value,
org.apache.flink.util.Collector> out) throws
Exception {
// normalize and split the line
String line = value.f1.toString();
String[] tokens = line.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
}








  public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.*getExecutionEnvironment*();
//env.setParallelism(1);

DataSet> input;

input = env.createInput(HadoopInputs.*readHadoopFile*(new
TextInputFormat(),
LongWritable.class, Text.class,
"hdfs://localhost:9000/newfolder/testdata1"));

DataSet stringInput = input.map(new
MapFunction, String>() {
@Override
public String map(Tuple2
longWritableTextTuple2) throws Exception {
return longWritableTextTuple2.f1.toString();
}
});

stringInput.print();

env.execute("Processor");
}


Re: table.show() in Flink

2020-05-05 Thread Fabian Hueske
There's also the Table API approach if you want to avoid typing a "full"
SQL query:

Table t = tEnv.from("myTable");

Cheers,
Fabian

Am Di., 5. Mai 2020 um 16:34 Uhr schrieb Őrhidi Mátyás <
matyas.orh...@gmail.com>:

> Thanks guys for the prompt answers!
>
> On Tue, May 5, 2020 at 2:49 PM Kurt Young  wrote:
>
>> A more straightforward way after FLIP-84 would be:
>> TableResult result = tEnv.executeSql("select xxx ...");
>> result.print();
>>
>> And if you are using 1.10 now, you can
>> use TableUtils#collectToList(table) to collect the
>> result to a list, and then print rows by yourself.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, May 5, 2020 at 8:44 PM Jark Wu  wrote:
>>
>>> Hi Matyas,
>>>
>>> AFAIK, currently, this is the recommended way to print result of table.
>>> In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new
>>> APIs to do the fluent printing like this.
>>>
>>> Table table2 = tEnv.sqlQuery("select yy ...");
>>> TableResult result2 = table2.execute();
>>> result2.print();
>>>
>>> cc @Godfrey, please correct if I misunderstand the above API.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>
>>> On Tue, 5 May 2020 at 20:19, Őrhidi Mátyás 
>>> wrote:
>>>
 Dear Flink Community,

 I'm missing Spark's table.show() method in Flink. I'm using the
 following alternative at the moment:

 Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
 tableEnv.toAppendStream(results, Row.class).print();

 Is it the recommended way to print the content of a table?


 Thanks,

 Matyas






Re: table.show() in Flink

2020-05-05 Thread Őrhidi Mátyás
Thanks guys for the prompt answers!

On Tue, May 5, 2020 at 2:49 PM Kurt Young  wrote:

> A more straightforward way after FLIP-84 would be:
> TableResult result = tEnv.executeSql("select xxx ...");
> result.print();
>
> And if you are using 1.10 now, you can use TableUtils#collectToList(table)
> to collect the
> result to a list, and then print rows by yourself.
>
> Best,
> Kurt
>
>
> On Tue, May 5, 2020 at 8:44 PM Jark Wu  wrote:
>
>> Hi Matyas,
>>
>> AFAIK, currently, this is the recommended way to print result of table.
>> In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new
>> APIs to do the fluent printing like this.
>>
>> Table table2 = tEnv.sqlQuery("select yy ...");
>> TableResult result2 = table2.execute();
>> result2.print();
>>
>> cc @Godfrey, please correct if I misunderstand the above API.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>
>> On Tue, 5 May 2020 at 20:19, Őrhidi Mátyás 
>> wrote:
>>
>>> Dear Flink Community,
>>>
>>> I'm missing Spark's table.show() method in Flink. I'm using the
>>> following alternative at the moment:
>>>
>>> Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
>>> tableEnv.toAppendStream(results, Row.class).print();
>>>
>>> Is it the recommended way to print the content of a table?
>>>
>>>
>>> Thanks,
>>>
>>> Matyas
>>>
>>>
>>>
>>>


Re: multiple joins in one job

2020-05-05 Thread Benchao Li
You cannot select more than one time attribute, the planner will give you
an Exception if you did that.


lec ssmi  于2020年5月5日周二 下午8:34写道:

> As  you said, if   I  select  all  the  time  attribute  fields   from
> both  ,  which  will be  the  final  one?
>
> Benchao Li  于 2020年5月5日周二 17:26写道:
>
>> Hi lec,
>>
>> You don't need to specify time attribute again like `TUMBLE_ROWTIME`, you
>> just select  the time attribute field
>> from one of the input, then it will be time attribute automatically.
>>
>> lec ssmi  于2020年5月5日周二 下午4:42写道:
>>
>>> But  I  have  not  found  there  is  any  syntax to  specify   time
>>>  attribute  field  and  watermark  again  with  pure  sql.
>>>
>>> Fabian Hueske  于 2020年5月5日周二 15:47写道:
>>>
 Sure, you can write a SQL query with multiple interval joins that
 preserve event-time attributes and watermarks.
 There's no need to feed data back to Kafka just to inject it again to
 assign new watermarks.

 Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi <
 shicheng31...@gmail.com>:

> I mean using pure sql statement to make it . Can it be possible?
>
> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>
>> Hi,
>>
>> If the interval join emits the time attributes of both its inputs,
>> you can use either of them as a time attribute in a following operator
>> because the join ensures that the watermark will be aligned with both of
>> them.
>>
>> Best, Fabian
>>
>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>> shicheng31...@gmail.com>:
>>
>>> Thanks for your replay.
>>> But as I known, if   the time attribute  will be retained and  the
>>> time attribute field  of both streams is selected in the result after
>>> joining, who is the final time attribute variable?
>>>
>>> Benchao Li  于2020年4月30日周四 下午8:25写道:
>>>
 Hi lec,

 AFAIK, time attribute will be preserved after time interval join.
 Could you share your DDL and SQL queries with us?

 lec ssmi  于2020年4月30日周四 下午5:48写道:

> Hi:
>I need to join multiple stream tables  using  time interval
> join.  The problem is that the time attribute will disappear  after 
> the jon
> , and  pure  sql cannot declare the time attribute field again . So, 
> to
> make is success,  I need to insert  the last result of join to kafka 
> ,and
> consume it and join it with another stream table  in another flink job
> . This seems troublesome.
> Any good idea?
>
>
>

 --

 Benchao Li
 School of Electronics Engineering and Computer Science, Peking 
 University
 Tel:+86-15650713730
 Email: libenc...@gmail.com; libenc...@pku.edu.cn


>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Restore from savepoint with Iterations

2020-05-05 Thread ashish pok
Let me see if I can do artificial throttle somewhere. Volume of data is really 
high and hence trying to avoid rounds in Kafka too. Looks like options are “not 
so elegant” until FLIP-15. Thanks for pointers again!!!


On Monday, May 4, 2020, 11:06 PM, Ken Krugler  
wrote:

Hi Ashish,
The workaround we did was to throttle data flowing in the iteration (in code), 
though not sure if that’s possible for your situation.
You could remove the iteration by writing to a Kafka topic at the end of the 
part of your workflow that is currently an iteration, and then consuming from 
that same topic as your “iteration" source.
— Ken


On May 4, 2020, at 7:32 PM, Ashish Pokharel  wrote:

Hi Ken,
Thanks for the quick response!
I came across FLIP-15 on my next google search after I sent email :) It 
DEFINITELY looks that way. As I was watching logs and nature of how job gets 
stuck it does look like buffer is blocked. But FLIP-15 has not moved further 
though. So there are no workarounds at all at this point? Perhaps a technique 
to block Kafka Consumer for some time? Even that may get me going but looks 
like there is probability of this happening during the normal processing as 
your use case demonstrates. I am using iteration with no timeouts for prod job, 
using timeouts only in unit testing.Theory was in prod input stream will be 
indefinite and sometime long lull of no event might happen during maintenance, 
backlog etc. I really would like to avoid a bloat in the DAG by repeating same 
functions with filters and side outputs. Other than obvious repetition, it will 
increase the site of states by a factor. Even those slowly moving dimensions 
are not light (around half billion every day) :) 


On May 4, 2020, at 10:13 PM, Ken Krugler  wrote:

Hi Ashish,
Wondering if you’re running into the gridlock problem I mention on slide #25 
here: 
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink
If the iteration path has too much data in it, then the network buffer at the 
head of the iteration can fill up, and it never clears out because the operator 
consuming those buffers is blocked writing to the next operator in the 
iteration, and so on back to the head.
We ran into this when outlinks from web pages caused fan-out/amplification of 
the data being iterated, but maybe you hit it with restoring from state.
— Ken


On May 4, 2020, at 6:41 PM, Ashish Pokharel  wrote:
Hi all,

Hope everyone is doing well!

I am running into what seems like a deadlock (application stalled) situation 
with a Flink streaming job upon restore from savepoint. Job has a slowly moving 
stream (S1) that needs to be “stateful” and a continuous stream (S2) which is 
“joined” with slow moving stream (S1). Some level of loss/repetition is 
acceptable in continuous stream (S2) and hence can rely on something like Kafka 
consumer states upon restarts etc. Continuous stream (S2) however needs to be 
iterated through states from slowly moving streams (S1) a few times (mostly 2). 
States are fair sized (ends up being 15GB on HDFS). When job is restarted with 
no continuous data (S2) on topic job starts up, restores states and does it’s 
initial checkpoint within 3 minutes. However, when app is started from 
savepoint and continuous stream (S2) is actually present in Kafka it seems like 
application comes to a halt. Looking at progress of checkpoints, it seems like 
every attempt is stuck after until some timeouts happen at around 10 mins. If 
iteration on stream is removed app can successfully start and checkpoint even 
when continuous stream (S2) is flowing in as well. Unfortunately we are working 
on a hosted environment for both data and platform, hence debugging with thread 
dumps etc will be challenging. 

I couldn’t find a known issue on this but was wondering if anyone has seen such 
behavior or know of any issues in past. It does look like checkpointing has to 
be set to forced to get an iterative job to checkpoint in the first place (an 
option that is marked deprecated already - working on 1.8.2 version as of now). 
I do understand challenges around consistent checkpointing of iterative stream. 
As I mentioned earlier, what I really want to maintain for the most part are 
states of slowly moving dimensions. Iterations does solve the problem at hand 
(multiple loops of logic) pretty gracefully but not being able to restore from 
savepoint will be a show stopper. 

Will appreciate any pointer / suggestions.

Thanks in advance, 

Ashish

--Ken Kruglerhttp://www.scaleunlimited.comcustom big 
data solutions & trainingHadoop, Cascading, Cassandra & Solr


--Ken Kruglerhttp://www.scaleunlimited.comcustom big 
data solutions & trainingHadoop, Cascading, Cassandra & Solr





Re: table.show() in Flink

2020-05-05 Thread Kurt Young
A more straightforward way after FLIP-84 would be:
TableResult result = tEnv.executeSql("select xxx ...");
result.print();

And if you are using 1.10 now, you can use TableUtils#collectToList(table)
to collect the
result to a list, and then print rows by yourself.

Best,
Kurt


On Tue, May 5, 2020 at 8:44 PM Jark Wu  wrote:

> Hi Matyas,
>
> AFAIK, currently, this is the recommended way to print result of table.
> In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new
> APIs to do the fluent printing like this.
>
> Table table2 = tEnv.sqlQuery("select yy ...");
> TableResult result2 = table2.execute();
> result2.print();
>
> cc @Godfrey, please correct if I misunderstand the above API.
>
> Best,
> Jark
>
> [1]:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>
> On Tue, 5 May 2020 at 20:19, Őrhidi Mátyás 
> wrote:
>
>> Dear Flink Community,
>>
>> I'm missing Spark's table.show() method in Flink. I'm using the following
>> alternative at the moment:
>>
>> Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
>> tableEnv.toAppendStream(results, Row.class).print();
>>
>> Is it the recommended way to print the content of a table?
>>
>>
>> Thanks,
>>
>> Matyas
>>
>>
>>
>>


Re: table.show() in Flink

2020-05-05 Thread Jark Wu
Hi Matyas,

AFAIK, currently, this is the recommended way to print result of table.
In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new APIs
to do the fluent printing like this.

Table table2 = tEnv.sqlQuery("select yy ...");
TableResult result2 = table2.execute();
result2.print();

cc @Godfrey, please correct if I misunderstand the above API.

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

On Tue, 5 May 2020 at 20:19, Őrhidi Mátyás  wrote:

> Dear Flink Community,
>
> I'm missing Spark's table.show() method in Flink. I'm using the following
> alternative at the moment:
>
> Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
> tableEnv.toAppendStream(results, Row.class).print();
>
> Is it the recommended way to print the content of a table?
>
>
> Thanks,
>
> Matyas
>
>
>
>


Re: Reading from sockets using dataset api

2020-05-05 Thread Arvid Heise
Hi Kaan,

explicitly mapping to physical nodes is currently not supported and would
need some workarounds. I have readded user mailing list (please always also
include it in response); maybe someone can help you with that.

Best,

Arvid

On Thu, Apr 30, 2020 at 10:12 AM Kaan Sancak  wrote:

> One quick question tho, on each generator node I am opening 24 sockets
> (number of cores that I have). Is there a way to guarantee that while doing
> the map function, each of the slave nodes distributes this 24 socket ports
> between its task slots(each slave also have 24 slave),
> Sorry, I have asked a lot questions.
>
> Stay safe!
> Kaan
>
> On Thu, Apr 30, 2020 at 3:06 AM Kaan Sancak  wrote:
>
>> Hi Arvid,
>> As you said, I am only interested in batch processing right now. And it
>> seems to be working fine now.
>>
>> Thanks for your help!
>> Best
>> Kaan
>>
>> On Thu, Apr 30, 2020 at 2:31 AM Arvid Heise  wrote:
>>
>>> Hi Kaan,
>>>
>>> not entirely sure I understand your solution. I gathered that you create
>>> a dataset of TCP addresses and then use flatMap to fetch and output the
>>> data?
>>>
>>> If so, then I think it's a good solution for batch processing (DataSet).
>>> It doesn't work in DataStream because it doesn't play well with
>>> checkpointing, but you seem to be interested only in batch. It's also not
>>> the first time, I have seen this pattern being used in batch.
>>>
>>> In general, if it works and is fast enough, it's always a good solution
>>> ;). No need to make it more complicated if you can solve it with simpler
>>> means and you can maintain it more easily.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak  wrote:
>>>
 Hi Arvid,

 I have implemented a zmq listener class without extending any class of
 Flink.
 The listener has a constructor with the port number.

 Then in the execution I have created a dateset of string which has the
 port numbers.
 Then I used a flattop function, which returned Tuple2. I
 opened the tcp sockets using localhost, so matching was quite easy.

 This seemed to work for me. What do you think about this
 implementation. Do you see any drawback?

 Best
 Kaan

 On Apr 29, 2020, at 7:40 AM, Arvid Heise  wrote:

 Hi Kaan,

 seems like ZMQ is using TCP and not HTTP. So I guess the easiest way
 would be to use a ZMQ Java binding to access it [1].

 But of course, it's much more complicated to write an iterator logic
 for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes
 the socket and you can just read as much as possible.

 [1] https://zeromq.org/languages/java/

 On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak 
 wrote:

> Hi Arvid,
>
> I am sorry for the late response. I had some deadlines, but I am back
> to work now.
> I have been trying to implement what we have talked. But I am having
> problems on the implementation.
> I have been using ZMQ to open sockets, because that is inheritenly
> supported in my graph generator. But, I couldn’t make the connection using
> input streams.
> Do you have any specific examples, where I can look and have a better
> idea on how to implement this?
>
> Best
> Kaan
>
> On Apr 24, 2020, at 4:37 AM, Arvid Heise  wrote:
>
> Hm, I confused sockets to work the other way around (so pulling like
> URLInputStream instead of listening). I'd go by providing the data on
> a port on each generator node. And then read from that in multiple 
> sources.
>
> I think the best solution is to implement a custom InputFormat and
> then use readInput. You could implement a subclass of
> GenericInputFormat. You might even use IteratorInputFormat like this:
>
> private static class URLInputIterator implements Iterator Long>>, Serializable {
>private final URL url;
>private Iterator> inner;
>
>private URLInputIterator(URL url) {
>   this.url = url;
>}
>
>private void readObject(ObjectInputStream in) throws IOException, 
> ClassNotFoundException {
>   InputStream inputStream = url.openStream();
>   inner = new BufferedReader(new InputStreamReader(inputStream, 
> StandardCharsets.UTF_8))
>  .lines()
>  .map(line -> {
> String[] parts = line.split(";");
> return new Tuple2<>(Long.parseLong(parts[0]), 
> Long.parseLong(parts[1]));
>  })
>  .iterator();
>}
>
>@Override
>public boolean hasNext() {
>   return inner.hasNext();
>}
>
>@Override
>public Tuple2 next() {
>   return inner.next();
>}
> }
>
> env.fromCollection(new URLInputIterator(new URL("gen_node1", )), 
> Types.TUPLE(Types.LONG, Types.LONG));
>
>

Re: multiple joins in one job

2020-05-05 Thread lec ssmi
As  you said, if   I  select  all  the  time  attribute  fields   from
both  ,  which  will be  the  final  one?

Benchao Li  于 2020年5月5日周二 17:26写道:

> Hi lec,
>
> You don't need to specify time attribute again like `TUMBLE_ROWTIME`, you
> just select  the time attribute field
> from one of the input, then it will be time attribute automatically.
>
> lec ssmi  于2020年5月5日周二 下午4:42写道:
>
>> But  I  have  not  found  there  is  any  syntax to  specify   time
>>  attribute  field  and  watermark  again  with  pure  sql.
>>
>> Fabian Hueske  于 2020年5月5日周二 15:47写道:
>>
>>> Sure, you can write a SQL query with multiple interval joins that
>>> preserve event-time attributes and watermarks.
>>> There's no need to feed data back to Kafka just to inject it again to
>>> assign new watermarks.
>>>
>>> Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi <
>>> shicheng31...@gmail.com>:
>>>
 I mean using pure sql statement to make it . Can it be possible?

 Fabian Hueske  于2020年5月4日周一 下午4:04写道:

> Hi,
>
> If the interval join emits the time attributes of both its inputs, you
> can use either of them as a time attribute in a following operator because
> the join ensures that the watermark will be aligned with both of them.
>
> Best, Fabian
>
> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
> shicheng31...@gmail.com>:
>
>> Thanks for your replay.
>> But as I known, if   the time attribute  will be retained and  the
>> time attribute field  of both streams is selected in the result after
>> joining, who is the final time attribute variable?
>>
>> Benchao Li  于2020年4月30日周四 下午8:25写道:
>>
>>> Hi lec,
>>>
>>> AFAIK, time attribute will be preserved after time interval join.
>>> Could you share your DDL and SQL queries with us?
>>>
>>> lec ssmi  于2020年4月30日周四 下午5:48写道:
>>>
 Hi:
I need to join multiple stream tables  using  time interval
 join.  The problem is that the time attribute will disappear  after 
 the jon
 , and  pure  sql cannot declare the time attribute field again . So, to
 make is success,  I need to insert  the last result of join to kafka 
 ,and
 consume it and join it with another stream table  in another flink job
 . This seems troublesome.
 Any good idea?



>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking 
>>> University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


table.show() in Flink

2020-05-05 Thread Őrhidi Mátyás
Dear Flink Community,

I'm missing Spark's table.show() method in Flink. I'm using the following
alternative at the moment:

Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
tableEnv.toAppendStream(results, Row.class).print();

Is it the recommended way to print the content of a table?


Thanks,

Matyas


Re: flink sql 处理时间 时区问题

2020-05-05 Thread 祝尚
同问,等待大佬回答

> 2020年5月1日 下午5:26,hb <343122...@163.com> 写道:
> 
> 
> 
> ``` 代码
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
> settings)
> 
> 
>  val t2 = env.fromElements("a", "b", "c")
>  t2.toTable(tEnv, 'f1, 'proc.proctime).toAppendStream[Row].print()
> ```
> 
> 
>  // 时区上海, 当前时间为 2020-05-01 15点
>  // 结果显示 a,2020-05-01T09:11:05.633, 处理时间相差8小时, 请问如何让 proctime 结果显示不差8小时.



Flink on Kubernetes unable to Recover from failure

2020-05-05 Thread Morgan Geldenhuys


Community,

I am currently doing some fault tolerance testing for Flink (1.10) 
running on Kubernetes (1.18) and am encountering an error where after a 
running job experiences a failure, the job fails completely.


A Flink session cluster has been created according to the documentation 
contained here: 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html. 
The job is then uploaded and deployed via the web interface and 
everything runs smoothly. The job has a parallelism of 24 with 3 worker 
nodes as fail overs in reserve. Each worker is assigned 1 task slot each 
(total of 27).


The next step would be inject an error for which I use the Pumba Chaos 
Testing tool (https://github.com/alexei-led/pumba) to pause a random 
worker process. This selection and pausing is done manually for the moment.


Looking at the error logs, Flink does detect the error after the timeout 
(The heartbeat timeout has been set to 20 seconds):


java.util.concurrent.TimeoutException: The heartbeat of TaskManager with 
id 768848f91ebdbccc8d518e910160414d  timed out.


After the failure has been detected, the system resets to the latest 
saved checkpoint and restarts. The system catches up nicely and resumes 
normal processing... however, after about 3 minutes, the following error 
occurs:


org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'/10.45.128.1:6121'. This might indicate that the remote task manager 
was lost.


The job fails, and is unable to restart because the number of task slots 
has been reduced to zero. Looking at the kubernetes cluster, all 
containers are running...


Has anyone else run into this error? What am I missing? The same thing 
happens when the containers are deleted.


Regards,
M.









Re: ML/DL via Flink

2020-05-05 Thread m@xi
Hello Becket,

I just watched your Flink Forward talk. Really interesting!

I leave the link here as it is related to the post.  AI Flow (FF20)
  

Best,
Max





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: multiple joins in one job

2020-05-05 Thread Benchao Li
Hi lec,

You don't need to specify time attribute again like `TUMBLE_ROWTIME`, you
just select  the time attribute field
from one of the input, then it will be time attribute automatically.

lec ssmi  于2020年5月5日周二 下午4:42写道:

> But  I  have  not  found  there  is  any  syntax to  specify   time
>  attribute  field  and  watermark  again  with  pure  sql.
>
> Fabian Hueske  于 2020年5月5日周二 15:47写道:
>
>> Sure, you can write a SQL query with multiple interval joins that
>> preserve event-time attributes and watermarks.
>> There's no need to feed data back to Kafka just to inject it again to
>> assign new watermarks.
>>
>> Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi <
>> shicheng31...@gmail.com>:
>>
>>> I mean using pure sql statement to make it . Can it be possible?
>>>
>>> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>>>
 Hi,

 If the interval join emits the time attributes of both its inputs, you
 can use either of them as a time attribute in a following operator because
 the join ensures that the watermark will be aligned with both of them.

 Best, Fabian

 Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
 shicheng31...@gmail.com>:

> Thanks for your replay.
> But as I known, if   the time attribute  will be retained and  the
> time attribute field  of both streams is selected in the result after
> joining, who is the final time attribute variable?
>
> Benchao Li  于2020年4月30日周四 下午8:25写道:
>
>> Hi lec,
>>
>> AFAIK, time attribute will be preserved after time interval join.
>> Could you share your DDL and SQL queries with us?
>>
>> lec ssmi  于2020年4月30日周四 下午5:48写道:
>>
>>> Hi:
>>>I need to join multiple stream tables  using  time interval
>>> join.  The problem is that the time attribute will disappear  after the 
>>> jon
>>> , and  pure  sql cannot declare the time attribute field again . So, to
>>> make is success,  I need to insert  the last result of join to kafka 
>>> ,and
>>> consume it and join it with another stream table  in another flink job
>>> . This seems troublesome.
>>> Any good idea?
>>>
>>>
>>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Autoscaling flink application

2020-05-05 Thread Manish G
Thanks. It would help.

On Tue, May 5, 2020 at 2:12 PM David Anderson  wrote:

> There's no explicit, out-of-the-box support for autoscaling, but it can be
> done.
>
> Autoscaling came up a few times at the recent Virtual Flink Forward,
> including a talk on Autoscaling Flink at Netflix [1] by Timothy Farkas.
>
> [1] https://www.youtube.com/watch?v=NV0jvA5ZDNc
>
> Regards,
> David
>
>
>
> On Mon, May 4, 2020 at 1:14 PM Manish G 
> wrote:
>
>> Hi,
>>
>> I understand task parallelism in flink, but is it possible to configure
>> dynamic horizontal scaling also.
>>
>> Manish
>>
>


Re: Autoscaling flink application

2020-05-05 Thread David Anderson
There's no explicit, out-of-the-box support for autoscaling, but it can be
done.

Autoscaling came up a few times at the recent Virtual Flink Forward,
including a talk on Autoscaling Flink at Netflix [1] by Timothy Farkas.

[1] https://www.youtube.com/watch?v=NV0jvA5ZDNc

Regards,
David



On Mon, May 4, 2020 at 1:14 PM Manish G 
wrote:

> Hi,
>
> I understand task parallelism in flink, but is it possible to configure
> dynamic horizontal scaling also.
>
> Manish
>


Re: multiple joins in one job

2020-05-05 Thread lec ssmi
But  I  have  not  found  there  is  any  syntax to  specify   time
 attribute  field  and  watermark  again  with  pure  sql.

Fabian Hueske  于 2020年5月5日周二 15:47写道:

> Sure, you can write a SQL query with multiple interval joins that preserve
> event-time attributes and watermarks.
> There's no need to feed data back to Kafka just to inject it again to
> assign new watermarks.
>
> Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi  >:
>
>> I mean using pure sql statement to make it . Can it be possible?
>>
>> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>>
>>> Hi,
>>>
>>> If the interval join emits the time attributes of both its inputs, you
>>> can use either of them as a time attribute in a following operator because
>>> the join ensures that the watermark will be aligned with both of them.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>>> shicheng31...@gmail.com>:
>>>
 Thanks for your replay.
 But as I known, if   the time attribute  will be retained and  the time
 attribute field  of both streams is selected in the result after joining,
 who is the final time attribute variable?

 Benchao Li  于2020年4月30日周四 下午8:25写道:

> Hi lec,
>
> AFAIK, time attribute will be preserved after time interval join.
> Could you share your DDL and SQL queries with us?
>
> lec ssmi  于2020年4月30日周四 下午5:48写道:
>
>> Hi:
>>I need to join multiple stream tables  using  time interval join.
>> The problem is that the time attribute will disappear  after the jon , 
>> and
>> pure  sql cannot declare the time attribute field again . So, to make is
>> success,  I need to insert  the last result of join to kafka ,and consume
>> it and join it with another stream table  in another flink job . This 
>> seems
>> troublesome.
>> Any good idea?
>>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: multiple joins in one job

2020-05-05 Thread Fabian Hueske
Sure, you can write a SQL query with multiple interval joins that preserve
event-time attributes and watermarks.
There's no need to feed data back to Kafka just to inject it again to
assign new watermarks.

Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi :

> I mean using pure sql statement to make it . Can it be possible?
>
> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>
>> Hi,
>>
>> If the interval join emits the time attributes of both its inputs, you
>> can use either of them as a time attribute in a following operator because
>> the join ensures that the watermark will be aligned with both of them.
>>
>> Best, Fabian
>>
>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>> shicheng31...@gmail.com>:
>>
>>> Thanks for your replay.
>>> But as I known, if   the time attribute  will be retained and  the time
>>> attribute field  of both streams is selected in the result after joining,
>>> who is the final time attribute variable?
>>>
>>> Benchao Li  于2020年4月30日周四 下午8:25写道:
>>>
 Hi lec,

 AFAIK, time attribute will be preserved after time interval join.
 Could you share your DDL and SQL queries with us?

 lec ssmi  于2020年4月30日周四 下午5:48写道:

> Hi:
>I need to join multiple stream tables  using  time interval join.
> The problem is that the time attribute will disappear  after the jon , and
> pure  sql cannot declare the time attribute field again . So, to make is
> success,  I need to insert  the last result of join to kafka ,and consume
> it and join it with another stream table  in another flink job . This 
> seems
> troublesome.
> Any good idea?
>
>
>

 --

 Benchao Li
 School of Electronics Engineering and Computer Science, Peking University
 Tel:+86-15650713730
 Email: libenc...@gmail.com; libenc...@pku.edu.cn




Re: Broadcast stream causing GC overhead limit exceeded

2020-05-05 Thread Fabian Hueske
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too
much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is
terminated with the GC Overhead Error. This typically happens when lots of
temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a
shortage of memory (although more memory can help to mitigate the issue a
bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or
collect events from the non-broadcasted side if the broadcast side doesn't
serve events.
However, the user-implemented operators (Beam or your code in this case)
often puts non-broadcasted events into state to wait for input from the
other side.
Since the error is not about lack of memory, the buffering in Flink state
might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
eleanore@gmail.com>:

> Hi All,
>
> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
> filter of the data from main stream.
>
> I have experienced OOM: GC overhead limit exceeded continuously.
>
> After did some experiments, I observed following behaviour:
> 1. run job without side input(broadcast stream): no OOM issue
> 2. run job with side input (kafka topic with 1 partition) with data
> available from this side input: no OOM issue
> 3. run job with side input (kafka topic with 1 partition) without any
> data from the side input: *OOM issue*
> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
> looks like due to the references hold by Broadcast stream
> [image: image.png]
>
> My question is: what is the behaviour from Broadcast stream if there is no
> data available? Does it cache the data from main stream and wait until data
> becoming available from Broadcast stream to process?
>
> Thanks a lot!
> Eleanore
>