Flink vs Kafka streams

2019-11-07 Thread Navneeth Krishnan
Hello All,

I have a streaming job running in production which is processing over 2
billion events per day and it does some heavy processing on each event. We
have been facing some challenges in managing flink in production like
scaling in and out, restarting the job with savepoint etc. Flink provides a
lot of features which seemed as an obvious choice at that time but now with
all the operational overhead we are thinking should we still use flink for
our stream processing requirements or choose kafka streams.

We currently deploy flink on ECR. Bringing up a new cluster for another
stream job is too expensive but on the flip side running it on the same
cluster becomes difficult since there are no ways to say this job has to be
run on a dedicated server versus this can run on a shared instance. Also
savepoint point, cancel and submit a new job results in some downtime. The
most critical part being there is no shared state among all tasks sort of a
global state. We sort of achieve this today using an external redis cache
but that incurs cost as well.

If we are moving to kafka streams, it makes our deployment life much
easier, each new stream job will be a microservice that can scale
independently. With global state it's much easier to share state without
using external cache. But the disadvantage is we have to rely on the
partitions for parallelism. Although this might initially sound easier,
when we need to scale much higher this will become a bottleneck.

Do you guys have any suggestions on this? We need to decide which way to
move forward and any suggestions would be of much greater help.

Thanks


Re: Monitor rocksDB memory usage

2019-11-07 Thread Yun Tang
Hi Lu

I think RocksDB native metrics [1] could help.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics

Best
Yun Tang

From: Lu Niu 
Date: Friday, November 8, 2019 at 8:18 AM
To: user 
Subject: Monitor rocksDB memory usage

Hi,

I read that rocksDB memory is managed off heap. Is there a way to monitor the 
memory usage there then?

Best
Lu


Re: 广播状态是否可以设置ttl过期时间

2019-11-07 Thread Yang Peng
哦哦,我目前的做法是在processElement方法里面注册Timer 然后再onTimer方法里面手动删除state中过期的数据;

Dian Fu  于2019年11月8日周五 上午10:42写道:

> 是的。不过及时对于keyed state,如果你没有用TTL state这个功能,也是有可能返回过期的state的。
>
> > 在 2019年11月8日,上午10:24,Yang Peng  写道:
> >
> > 嗯嗯,谢谢 付典老师,我理解的是虽然他不会删除但是也不应该返回过期的state值吧,应该是茶干老师说的那样只有keyed state 才支持
> > state的ttl吧;
> >
> > Dian Fu  于2019年11月7日周四 下午8:08写道:
> >
> >>
> >>
> 1.8.0之前,ttl保证的是在ttl到达之前,数据不会清空,但是不保证ttl到达之后,数据一定清空。1.8.0之后提供了更完善的功能,可以看一下这个文章:
> >> https://flink.apache.org/2019/05/19/state-ttl.html <
> >> https://flink.apache.org/2019/05/19/state-ttl.html>
> >>> 在 2019年11月7日,下午3:06,Yang Peng  写道:
> >>>
> >>>
> >>>
> >>> -- Forwarded message -
> >>> 发件人: yangpengklf007  >> yangpengklf...@gmail.com>>
> >>> Date: 2019年11月7日周四 下午3:00
> >>> Subject: 广播状态是否可以设置ttl过期时间
> >>> To: user-zh@flink.apache.org  <
> >> user-zh@flink.apache.org >
> >>>
> >>>
> >>>
> >>
> 如下图是我设置得测试代码:在类中定义一个mapstatedesc,然后在processBroadcastElement方法中获取广播数据放入到state中
> >> 设置ttl是1s然后让程序sleep10s
> >> 再次从state中获取数据发现数据依旧存在,是广播状态不支持ttl还是我设置的方式不对,有用过的同学吗,还望不吝赐教 谢谢
> >>>
> >>>
> >>>
> >>> yangpengklf007
> >>> yangpengklf...@gmail.com
> >>> <
> >>
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=yangpengklf007=yangpengklf007%40gmail.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22yangpengklf007%40gmail.com%22%5D
> >>>
> >>> 签名由 网易邮箱大师  定制
> >>
> >>
>
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-07 Thread vino yang
Hi Lei Nie,

You can use
`StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
job id.

Best,
Vino

Lei Nie  于2019年11月8日周五 上午8:38写道:

> Hello,
> I am currently executing streaming jobs via StreamExecutionEnvironment. Is
> it possible to retrieve the Flink job ID/YARN ID within the context of a
> job? I'd like to be able to automatically register the job such that
> monitoring jobs can run (REST api requires for example job id).
>
> Thanks
>


Re: 广播状态是否可以设置ttl过期时间

2019-11-07 Thread Yang Peng
好的 谢谢茶干老师 我正在验证

Yun Tang  于2019年11月7日周四 下午10:11写道:

> Hi
>
> Broadcast State 可以看做一种operator state,只能在DefaultOperatorStateBackend里面创建
> [1],TTL state目前仅是对keyed state来说的 [2]。
>
> [1]
> https://github.com/apache/flink/blob/809533e5b5c686e2d21b64361d22178ccb92ec26/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java#L149
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
>
>
> On 11/7/19, 8:08 PM, "Dian Fu"  wrote:
>
>
> 1.8.0之前,ttl保证的是在ttl到达之前,数据不会清空,但是不保证ttl到达之后,数据一定清空。1.8.0之后提供了更完善的功能,可以看一下这个文章:
> https://flink.apache.org/2019/05/19/state-ttl.html <
> https://flink.apache.org/2019/05/19/state-ttl.html>
> > 在 2019年11月7日,下午3:06,Yang Peng  写道:
> >
> >
> >
> > -- Forwarded message -
> > 发件人: yangpengklf007  yangpengklf...@gmail.com>>
> > Date: 2019年11月7日周四 下午3:00
> > Subject: 广播状态是否可以设置ttl过期时间
> > To: user-zh@flink.apache.org  <
> user-zh@flink.apache.org >
> >
> >
> >
> 如下图是我设置得测试代码:在类中定义一个mapstatedesc,然后在processBroadcastElement方法中获取广播数据放入到state中
> 设置ttl是1s然后让程序sleep10s
> 再次从state中获取数据发现数据依旧存在,是广播状态不支持ttl还是我设置的方式不对,有用过的同学吗,还望不吝赐教 谢谢
> >
> >
> >
> > yangpengklf007
> > yangpengklf...@gmail.com
> >  <
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=yangpengklf007=yangpengklf007%40gmail.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22yangpengklf007%40gmail.com%22%5D
> >
> > 签名由 网易邮箱大师  定制
>
>
>
>


Re: 广播状态是否可以设置ttl过期时间

2019-11-07 Thread Yang Peng
嗯嗯,谢谢 付典老师,我理解的是虽然他不会删除但是也不应该返回过期的state值吧,应该是茶干老师说的那样只有keyed state 才支持
state的ttl吧;

Dian Fu  于2019年11月7日周四 下午8:08写道:

>
> 1.8.0之前,ttl保证的是在ttl到达之前,数据不会清空,但是不保证ttl到达之后,数据一定清空。1.8.0之后提供了更完善的功能,可以看一下这个文章:
> https://flink.apache.org/2019/05/19/state-ttl.html <
> https://flink.apache.org/2019/05/19/state-ttl.html>
> > 在 2019年11月7日,下午3:06,Yang Peng  写道:
> >
> >
> >
> > -- Forwarded message -
> > 发件人: yangpengklf007  yangpengklf...@gmail.com>>
> > Date: 2019年11月7日周四 下午3:00
> > Subject: 广播状态是否可以设置ttl过期时间
> > To: user-zh@flink.apache.org  <
> user-zh@flink.apache.org >
> >
> >
> >
> 如下图是我设置得测试代码:在类中定义一个mapstatedesc,然后在processBroadcastElement方法中获取广播数据放入到state中
> 设置ttl是1s然后让程序sleep10s
> 再次从state中获取数据发现数据依旧存在,是广播状态不支持ttl还是我设置的方式不对,有用过的同学吗,还望不吝赐教 谢谢
> >
> >
> >
> > yangpengklf007
> > yangpengklf...@gmail.com
> >  <
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=yangpengklf007=yangpengklf007%40gmail.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22yangpengklf007%40gmail.com%22%5D
> >
> > 签名由 网易邮箱大师  定制
>
>


Re: flink's hard dependency on zookeeper for HA

2019-11-07 Thread vino yang
Hi Vishwas,

In the standalone cluster HA mode, Flink heavily depends on ZooKeeper. Not
only for leader election, but also for:


   - Checkpoint metadata info;
   - JobGraph store;
   - 

So you should make sure your ZooKeeper Cluster works normally. More details
please see[1][2].

Best,
Vino

[1]:
https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html

Vishwas Siravara  于2019年11月7日周四 上午12:07写道:

> Hi all,
> I am using flink 1.7.2 as a standalone cluster in high availability mode
> with zookeeper. I have noticed that all flink processes go down once
> zookeeper goes down ? Is this expected behavior since the leader election
> has already happened and the job has been running for several hours.
>
>
> Best,
> Vishwas
>


Retrieving Flink job ID/YARN Id programmatically

2019-11-07 Thread Lei Nie
Hello,
I am currently executing streaming jobs via StreamExecutionEnvironment. Is
it possible to retrieve the Flink job ID/YARN ID within the context of a
job? I'd like to be able to automatically register the job such that
monitoring jobs can run (REST api requires for example job id).

Thanks


Monitor rocksDB memory usage

2019-11-07 Thread Lu Niu
Hi,

I read that rocksDB memory is managed off heap. Is there a way to monitor
the memory usage there then?

Best
Lu


Unable to retrieve Kafka consumer group offsets

2019-11-07 Thread Harrison Xu
I am using Flink 1.9.0 and KafkaConsumer010 (Kafka 0.10.1.1). Attempting to
retrieve the offset lag of Flink kafka consumers results in the below
error. I saw a separate thread about this in the mailing list in 2017 - is
this not fixed? Are there workarounds?


> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092 --group test --describe
> Error while executing consumer group command Group test with protocol type
> '' is not a valid consumer group


Re: How can I get the backpressure signals inside my function or operator?

2019-11-07 Thread Yuval Itzchakov
Hi,

We've been dealing with a similar problem of downstream consumers causing
backpressure. One idea that a colleague of mine suggested is measuring the
time it takes to call Collector[T].out. Since this method is used to push
the records downstream, it will also actively block in case the buffer is
full and there are no more floating buffers to allocate, hence causing the
backpressure.

Thus, if you know the average time it takes this function to be invoked
when there's no backpressure, you can make an educated guess on the time it
takes when there is pressure (you'll need to measure these times in your
source/operator), and actively slow down the number of records being pushed
downstream.

Yuval.

On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, 
wrote:

> cool! I got to use it.
> Now I have to get the jobID and vertice ID inside the operator.
>
> I forgot to mention. I am using Flink 1.9.1
>
> Thanks!
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Nov 7, 2019 at 4:59 AM Zhijiang 
> wrote:
>
>> You can refer to this document [1] for the rest API details.
>> Actually the backpreesure uri refers to "
>> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether
>> it is easy to get the jobid and vertexid.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Felipe Gutierrez 
>> Send Time:2019 Nov. 7 (Thu.) 00:06
>> To:Chesnay Schepler 
>> Cc:Zhijiang ; user 
>> Subject:Re: How can I get the backpressure signals inside my function or
>> operator?
>>
>> If I can trigger the sample via rest API it is good for a POC. Then I can
>> read from any in-memory storage using a separated thread within the
>> operator. But what is the rest api that gives to me the ratio value from
>> backpressure?
>>
>> Thanks
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler 
>> wrote:
>>
>> I don't think there is a truly sane way to do this.
>>
>> I could envision a separate application triggering samples via the REST
>> API, writing the results into kafka which your operator can read. This is
>> probably the most reasonable solution I can come up with.
>>
>> Any attempt at accessing the TaskExecutor or metrics from within the
>> operator are inadvisable; you'd be encroaching into truly hacky territory.
>>
>> You could also do your own backpressure sampling within your operator
>> (separate thread within the operator executing the same sampling logic),
>> but I don't know how easy it would be to re-use Flink code.
>>
>> On 06/11/2019 13:40, Felipe Gutierrez wrote:
>> Does anyone know in which metric I can rely on to know if a given
>> operator is activating the backpressure?
>> Or how can I call the same java object that the Flink UI calls to give me
>> the ratio of backpressure?
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>> Hi Zhijiang,
>>
>> thanks for your reply. Yes, you understood correctly.
>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
>> on the operator might be because of the way Flink runtime architecture was
>> designed. But I was wondering what kind of signal I can get. I guess some
>> backpressure message I could get because backpressure works to slow down
>> the upstream operators.
>>
>> For example, I can see the ratio per sub-task on the web interface [1].
>> It means the physical operators. Is there any message flowing backward that
>> I can get? Is there anything that makes me able to not rely on some
>> external storage?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang 
>> wrote:
>> Hi Felipe,
>>
>> That is an interesting idea to control the upstream's output based on
>> downstream's input.
>>
>> If I understood correctly, the preAggregate operator would trigger flush
>> output while the reduce operator is idle/hungry. In contrast, the 
>> preAggregate
>> would continue aggregating data in the case of back pressure.
>>
>> I think this requirement is valid, but unfortunately I guess you can not
>> get the back pressure signal from the operator level. AIK only the upper
>> task level can get the input/output state to decide whether 

Re: 广播状态是否可以设置ttl过期时间

2019-11-07 Thread Yun Tang
Hi

Broadcast State 可以看做一种operator state,只能在DefaultOperatorStateBackend里面创建 [1],TTL 
state目前仅是对keyed state来说的 [2]。 

[1] 
https://github.com/apache/flink/blob/809533e5b5c686e2d21b64361d22178ccb92ec26/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java#L149
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
 



On 11/7/19, 8:08 PM, "Dian Fu"  wrote:


1.8.0之前,ttl保证的是在ttl到达之前,数据不会清空,但是不保证ttl到达之后,数据一定清空。1.8.0之后提供了更完善的功能,可以看一下这个文章:https://flink.apache.org/2019/05/19/state-ttl.html
 
> 在 2019年11月7日,下午3:06,Yang Peng  写道:
> 
> 
> 
> -- Forwarded message -
> 发件人: yangpengklf007 mailto:yangpengklf...@gmail.com>>
> Date: 2019年11月7日周四 下午3:00
> Subject: 广播状态是否可以设置ttl过期时间
> To: user-zh@flink.apache.org  
mailto:user-zh@flink.apache.org>>
> 
> 
> 
如下图是我设置得测试代码:在类中定义一个mapstatedesc,然后在processBroadcastElement方法中获取广播数据放入到state中 
设置ttl是1s然后让程序sleep10s 
再次从state中获取数据发现数据依旧存在,是广播状态不支持ttl还是我设置的方式不对,有用过的同学吗,还望不吝赐教 谢谢
> 
> 
>   
> yangpengklf007
> yangpengklf...@gmail.com
>  

> 签名由 网易邮箱大师  定制





Re: Till Rohrmann - Can you please share your code for FF - SF - Flink as a lib

2019-11-07 Thread Till Rohrmann
Quick update, the link should be working now.

Cheers,
Till

On Thu, Nov 7, 2019 at 1:30 PM Till Rohrmann  wrote:

> Hi,
>
> the code I've based my presentation on can be found here [1]. Be aware,
> though, that for demo purposes I used a patched Flink version which scaled
> down the running job. This is currently not yet properly supported by Flink.
>
> [1] https://github.com/GJL/ffber2018-flink-as-a-library
>
> Cheers,
> Till
>
> On Thu, Nov 7, 2019 at 6:50 AM arpit8622  wrote:
>
>> https://www.youtube.com/watch?v=WeHuTRwicSw
>> 
>>
>> Basically i wanted to check the job/task manager k8s yaml you used for
>> above
>> demo.
>>
>> It will be very helpful for the community.
>>
>> If its already commited somewhere can you please direct me to the link.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Flink Filters have state?

2019-11-07 Thread Timothy Victor
I have a FilterFunction implementation which accepts an argument in its
constructor which it stores as an instance member.For example:

class ThresholdFilter implements FilterFunction  {

  private final MyThreshold threshold;

  private int numElementsSeen;

  public ThresholdFilter(MyThreshold threshold) {
this.threshold = threshold;
  }

  

}

The filter uses the threshold in deciding whether or not to filter the
incoming element.

All this works but I have some gaps in my understanding.

1.   How is this filter stored and recovered in the case of a failure.   Is
it just serialized to a POJO and stored in the configured state backend?

2.  When recovered will it maintain the state of all members (e.g. note
that I have a numElementsSeen member in the filter which will keep
incrementi for each element recevied).

3.  Is this sort of thing even advisable for a filter?  I'm guessing
Filters are meant to be reusable across operator instances.  In which case
the state could be wrong after recovery?

Thanks in advance

Tim


Re: Till Rohrmann - Can you please share your code for FF - SF - Flink as a lib

2019-11-07 Thread Till Rohrmann
Hi,

the code I've based my presentation on can be found here [1]. Be aware,
though, that for demo purposes I used a patched Flink version which scaled
down the running job. This is currently not yet properly supported by Flink.

[1] https://github.com/GJL/ffber2018-flink-as-a-library

Cheers,
Till

On Thu, Nov 7, 2019 at 6:50 AM arpit8622  wrote:

> https://www.youtube.com/watch?v=WeHuTRwicSw
> 
>
> Basically i wanted to check the job/task manager k8s yaml you used for
> above
> demo.
>
> It will be very helpful for the community.
>
> If its already commited somewhere can you please direct me to the link.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


flink 读带认证的hbase 问题

2019-11-07 Thread venn
各位大佬:

请问:flink on yarn 模式(standalone 模式下也不行)下 读带
kerberos 认证的 hbase,返回认证成功了,但是还是不能查询,一直报 “Caused by:
GSSExecption: No valid credentials provided (Mechanism level: Failed to find
any Kerberos tgt)”

发现认证之后,当前用户(UserGroupInformation.getLoginUser )和登陆用户
(UserGroupInformation.getCurrentUser )不一样

认证之前,当前用户和登陆用户都是  admin (auth:SIMPLE)#admin 是登
陆系统的用户

认证之后,当前用户还是 admin (auth:SIMPLE) ,登陆用户变成了认证的那个用户
xxx (auth: KERBEROS)

程序在IDE  里面可以正常执行,当前用户和登陆用户都是用一个用户,
已确定keytab 文件 是没有问题的 

配置应该没有问题,因为其实已经认证成功了,但是看起来执行程序的用户和认证的用
户不是同一个用户,请问各位大佬有了解的吗? 

 

非常感谢各位大佬

 

报错如下:

FATAL org.apache.hadoop.ipc.RpcClient  - SASL authentication failed. The
most likely cause is missing or invalid credentials. Consider 'kinit'.

 

javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to find
any Kerberos tgt)]

 

Caused by: GSSException: No valid credentials provided (Mechanism level:
Failed to find any Kerberos tgt)

 



Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-07 Thread Dian Fu
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
 

> 在 2019年11月7日,下午7:06,Chennet Steven  写道:
> 
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
> 
> From stevenchen
> webchat 38798579
> 
> 
> 发件人: wenlong.lwl 
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org 
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
> 
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
> 
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:
> 
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>> 
>> 
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>> 
>> 
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>> 
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>> 
>>  override def open(context: FunctionContext): Unit = {
>>// Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>//getRuntimeContext.getState(desc)
>>val a = this.hashCode()
>>print(s"hashCode:$a")
>>super.open(context)
>>  }
>> 
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>val acc = new IntDiffSumAccumulator()
>>acc.f0 = 0
>>acc.f1 = false
>>acc
>>  }
>> 
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>accumulator.f0 += value
>>accumulator.f1 = true
>>  }
>> 
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>if (accumulator.f1) {
>> 
>>  accumulator.f0
>>} else {
>>  Int.MinValue
>>}
>>  }
>> 
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>val iter = its.iterator()
>>while (true) {
>>  val a = iter.next()
>>  if (a.f1) {
>>acc.f0 += a.f0
>>acc.f1 = true
>>  }
>>}
>>  }
>> 
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>acc.f0 = 0
>>acc.f1 = false
>>  }
>> 
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>> 
>> 
>> From stevenchen
>> webchat 38798579
>> 
>> 
>> 



回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-07 Thread Chennet Steven
在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊?

From stevenchen
 webchat 38798579


发件人: wenlong.lwl 
发送时间: Thursday, November 7, 2019 2:13:43 PM
收件人: user-zh@flink.apache.org 
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。

On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:

> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
>   override def open(context: FunctionContext): Unit = {
> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
> //getRuntimeContext.getState(desc)
> val a = this.hashCode()
> print(s"hashCode:$a")
> super.open(context)
>   }
>
>   override def createAccumulator(): IntDiffSumAccumulator = {
> val acc = new IntDiffSumAccumulator()
> acc.f0 = 0
> acc.f1 = false
> acc
>   }
>
>   def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
> accumulator.f0 += value
> accumulator.f1 = true
>   }
>
>   override def getValue(accumulator: IntDiffSumAccumulator): Int = {
> if (accumulator.f1) {
>
>   accumulator.f0
> } else {
>   Int.MinValue
> }
>   }
>
>   def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
> val iter = its.iterator()
> while (true) {
>   val a = iter.next()
>   if (a.f1) {
> acc.f0 += a.f0
> acc.f1 = true
>   }
> }
>   }
>
>   def resetAccumulator(acc: IntDiffSumAccumulator) = {
> acc.f0 = 0
> acc.f1 = false
>   }
>
>   override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
>  webchat 38798579
>
>
>


Re: RocksDB and local file system

2019-11-07 Thread Congxian Qiu
Hi

The path will store the checkpoints, and Flink will fetch the checkpoint
files to restore the state if any failure occurred.  If you specify the
local file system, when restoring from checkpoint, Flink may can't find the
checkpoint files, and can't restore from last checkpoints.

Best,
Congxian


vino yang  于2019年11月7日周四 上午10:57写道:

> Hi Jaqie,
>
> For testing, you can use the local file system pattern (e.g. "file:///").
> Technically speaking, it's OK to specify the string path provided by you.
>
> However, in the production environment, we do not recommend using the
> local file system. Because it does not provide high availability.
>
> Best,
> Vino
>
> Jaqie Chan  于2019年11月6日周三 下午11:29写道:
>
>> Hello
>>
>> I am using Flink rocksDb state backend, the documentation seems to imply
>> i can use a regular file system such as: file:///data/flink/checkpoints,
>> but the code javadoc only mentions hdfs or s3 option here.
>>
>> I am wondering if it's possible to use local file system with flink
>> rocksdb backend.
>>
>>
>> Thanks
>>
>> 嘉琪
>>
>


Re: RocksDB state on HDFS seems not being cleanned up

2019-11-07 Thread Yun Tang
Yes, just sum all file size within checkpoint meta to get the full checkpoint 
size (this would omit some byte stream state handles, but nearly accurate).

BTW, I think user-mail list is the better place for this email-thread, already 
sent this mail to user-mail list.

Best
Yun Tang

From: shuwen zhou 
Date: Thursday, November 7, 2019 at 12:02 PM
To: Yun Tang 
Cc: dev , Till Rohrmann 
Subject: Re: RocksDB state on HDFS seems not being cleanned up

Hi Yun,
Thank you for your detailed explanation,It brings me a lot to research. I think
1. I should try reduce number of "state.checkpoints.num-retained", maybe to 3, 
which could decrease amount of shared folder.
2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer is 
yes, maybe. I could have use the state process API you mentioned to figure it 
out and get back to you.
3. I have a look in file 
/flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, there are a lot file 
names like 
hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03,
 sum those file's size up is the total size of each chekpoint, am I correct?
4. My checkpoint interval is 16 minutes.







On Wed, 6 Nov 2019 at 15:57, Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Shuwen

Since you just have 10 “chk-“ folders as expected and when subsuming 
checkpoints, the “chk-” folder would be removed after we successfully removed 
shared state [1]. That is to say, I think you might not have too many orphan 
states files left. To ensure this, you could use state process API [2] to load 
your checkpoints and compare all the files under “shared” folder to see whether 
there existed too many orphan files. If this is true, we might think of the 
custom compaction filter future of FRocksDB.

Secondly, your judgment of “20GB each checkpoint” might not be accurate when 
RocksDB incremental checkpoint is enabled, the UI showed is only the 
incremental size [3], I suggest you to count your files’s size within your 
checkpoint meta to know the accurate checkpoint size for each checkpoint.

Last but not least, RocksDB’s future of compaction filter to delete expired 
data only happened during compaction [4], I’m afraid you might need to look up 
your rocksDB’s LOG file to see the frequency of compaction on task managers. 
And I think the increasing size might be related with the interval of your 
checkpoints, what the interval when you executing checkpoints?


[1] 
https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
[3] 
https://issues.apache.org/jira/browse/FLINK-13390
[4] 
https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61

Best
Yun Tang

From: shuwen zhou mailto:jaco...@gmail.com>>
Date: Wednesday, November 6, 2019 at 12:02 PM
To: dev mailto:d...@flink.apache.org>>, Yun Tang 
mailto:myas...@live.com>>, Till Rohrmann 
mailto:trohrm...@apache.org>>
Subject: Re: RocksDB state on HDFS seems not being cleanned up

Hi Yun and Till,
Thank you for your response.
For @Yun
1. No, I just renamed the checkpoint directory name since the directory name 
contains company data. Sorry for the confusion.
2. Yes, I set

state.checkpoints.num-retained: 10
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

In flink.conf

I was expecting, shared folder will no longer contains outdated state, since my 
TTL is set to