Re: RocksDB CPU resource usage

2021-06-17 Thread Padarn Wilson
Thanks both for the suggestions, all good ideas. I will try some of the
profiling suggestions and report back.

On Thu, Jun 17, 2021 at 4:13 PM Yun Tang  wrote:

> Hi Padarn,
>
> From my experiences, de-/serialization might not consume 3x CPU usage, and
> the background compaction could also increase the CPU usage. You could use
> async-profiler [1] to figure out what really consumed your CPU usage as it
> could also detect the native RocksDB thread stack.
>
>
> [1] https://github.com/jvm-profiling-tools/async-profiler
>
> Best
> Yun Tang
>
> --
> *From:* Robert Metzger 
> *Sent:* Thursday, June 17, 2021 14:11
> *To:* Padarn Wilson 
> *Cc:* JING ZHANG ; user 
> *Subject:* Re: RocksDB CPU resource usage
>
> If you are able to execute your job locally as well (with enough data),
> you can also run it with a profiler and see the CPU cycles spent on
> serialization (you can also use RocksDB locally)
>
> On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson  wrote:
>
> Thanks Robert. I think it would be easy enough to test this hypothesis by
> making the same comparison with some simpler state inside the aggregation
> window.
>
> On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:
>
> Depending on the datatypes you are using, seeing 3x more CPU usage seems
> realistic.
> Serialization can be quite expensive. See also:
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> Maybe it makes sense to optimize there a bit.
>
> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>
> Hi Padarn,
> After switch stateBackend from filesystem to rocksdb, all reads/writes
> from/to backend have to go through de-/serialization to retrieve/store the
> state objects, this may cause more cpu cost.
> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
> To find out the reason, we need more profile on CPU cost, such as Flame
> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
> in Flink[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> Best,
> JING ZHANG
>
> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>
> Hi all,
>
> We have a job that we just enabled rocksdb on (instead of file backend),
> and see that the CPU usage is almost 3x greater on (we had to increase
> taskmanagers 3x to get it to run.
>
> I don't really understand this, is there something we can look at to
> understand why CPU use is so high? Our state mostly consists of aggregation
> windows.
>
> Cheers,
> Padarn
>
>


Re: Re: Re: Upgrade job topology in checkpoint

2021-06-17 Thread Padarn Wilson
Thanks Yun,

Agreed, it seemed unlikely to be state, I just wanted to confirm that this
was unexpected before ruling it out.

Thanks,
Padarn

On Thu, Jun 17, 2021 at 10:45 AM Yun Gao  wrote:

> Hi Padarn,
>
> From the current description it seems to me that the issue does not
> related to
> the state ? I think we may first check if the operator logic is right and
> whether
> the precedent tasks have indeed emitted records to the new sink.
>
> Best,
> Yun
>
> --Original Mail ------
> *Sender:*Padarn Wilson 
> *Send Date:*Wed Jun 16 12:27:43 2021
> *Recipients:*Yun Gao , user 
> *Subject:*Re: Re: Upgrade job topology in checkpoint
>
>> We added a new sink to the job graph and redeployed - but the new sink
>> did not receive any records, as though it were not connected to the graph
>> (possible it was a code bug, but I was trying to understand if this make
>> sense given the implementation)
>>
>> re-including mailing list, excluded by accident
>>
>> Padarn
>>
>> On Wed, Jun 16, 2021 at 10:59 AM Yun Gao  wrote:
>>
>>> Hi Padarn,
>>>
>>> Sorry I might not fully got the mean of new topology was ignored.
>>> Do you mean the topology is not the same as expected ?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Padarn Wilson 
>>> *Send Date:*Tue Jun 15 21:45:17 2021
>>> *Recipients:*Yun Gao 
>>> *Subject:*Re: Upgrade job topology in checkpoint
>>>
>>>> Thanks Yun,
>>>>
>>>> Yes we do indeed retain checkpoints, but we were unable to restore with
>>>> new topology from them for some reason. It seemed like the new topology was
>>>> ignored totally which was surprising to me.
>>>>
>>>> Padarn
>>>>
>>>> On Tue, Jun 15, 2021 at 7:35 PM Yun Gao  wrote:
>>>>
>>>>> Hi Padarn,
>>>>>
>>>>> By default the checkpoint would be disposed when the job finished or
>>>>> failed,
>>>>> they would be retained only when explicitly required [1].
>>>>>
>>>>> From the implementation perspective I think users could be able to
>>>>> change topology when restored
>>>>> from external checkpoint, but I think Flink would not guarantee this
>>>>> functionality.
>>>>>
>>>>> Best,
>>>>> Yun
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
>>>>>
>>>>> --Original Mail --
>>>>> *Sender:*Padarn Wilson 
>>>>> *Send Date:*Sat Jun 12 12:19:56 2021
>>>>> *Recipients:*user 
>>>>> *Subject:*Upgrade job topology in checkpoint
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm looking for some clarity about changing job topology as described
>>>>>> here:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/upgrading/#application-topology
>>>>>>
>>>>>> My question is simple: Does this only apply to savepoints? Or can it
>>>>>> also work for checkpoints? (also why if not)
>>>>>>
>>>>>> Cheers,
>>>>>> Padarn
>>>>>>
>>>>>


Re: RocksDB CPU resource usage

2021-06-16 Thread Padarn Wilson
Thanks Robert. I think it would be easy enough to test this hypothesis by
making the same comparison with some simpler state inside the aggregation
window.

On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:

> Depending on the datatypes you are using, seeing 3x more CPU usage seems
> realistic.
> Serialization can be quite expensive. See also:
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> Maybe it makes sense to optimize there a bit.
>
> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>
>> Hi Padarn,
>> After switch stateBackend from filesystem to rocksdb, all reads/writes
>> from/to backend have to go through de-/serialization to retrieve/store the
>> state objects, this may cause more cpu cost.
>> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
>> To find out the reason, we need more profile on CPU cost, such as Flame
>> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
>> in Flink[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>
>> Best,
>> JING ZHANG
>>
>> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>>
>>> Hi all,
>>>
>>> We have a job that we just enabled rocksdb on (instead of file backend),
>>> and see that the CPU usage is almost 3x greater on (we had to increase
>>> taskmanagers 3x to get it to run.
>>>
>>> I don't really understand this, is there something we can look at to
>>> understand why CPU use is so high? Our state mostly consists of aggregation
>>> windows.
>>>
>>> Cheers,
>>> Padarn
>>>
>>


Re: Re: Upgrade job topology in checkpoint

2021-06-15 Thread Padarn Wilson
We added a new sink to the job graph and redeployed - but the new sink did
not receive any records, as though it were not connected to the graph
(possible it was a code bug, but I was trying to understand if this make
sense given the implementation)

re-including mailing list, excluded by accident

Padarn

On Wed, Jun 16, 2021 at 10:59 AM Yun Gao  wrote:

> Hi Padarn,
>
> Sorry I might not fully got the mean of new topology was ignored.
> Do you mean the topology is not the same as expected ?
>
> Best,
> Yun
>
>
> --Original Mail ------
> *Sender:*Padarn Wilson 
> *Send Date:*Tue Jun 15 21:45:17 2021
> *Recipients:*Yun Gao 
> *Subject:*Re: Upgrade job topology in checkpoint
>
>> Thanks Yun,
>>
>> Yes we do indeed retain checkpoints, but we were unable to restore with
>> new topology from them for some reason. It seemed like the new topology was
>> ignored totally which was surprising to me.
>>
>> Padarn
>>
>> On Tue, Jun 15, 2021 at 7:35 PM Yun Gao  wrote:
>>
>>> Hi Padarn,
>>>
>>> By default the checkpoint would be disposed when the job finished or
>>> failed,
>>> they would be retained only when explicitly required [1].
>>>
>>> From the implementation perspective I think users could be able to
>>> change topology when restored
>>> from external checkpoint, but I think Flink would not guarantee this
>>> functionality.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
>>>
>>> --Original Mail --
>>> *Sender:*Padarn Wilson 
>>> *Send Date:*Sat Jun 12 12:19:56 2021
>>> *Recipients:*user 
>>> *Subject:*Upgrade job topology in checkpoint
>>>
>>>> Hi all,
>>>>
>>>> I'm looking for some clarity about changing job topology as described
>>>> here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/upgrading/#application-topology
>>>>
>>>> My question is simple: Does this only apply to savepoints? Or can it
>>>> also work for checkpoints? (also why if not)
>>>>
>>>> Cheers,
>>>> Padarn
>>>>
>>>


Checkpoint loading failure

2021-06-15 Thread Padarn Wilson
Hi all,

We have a job that has a medium size state (around 4GB) and after adding a
new part of the job graph (which should not impact the job too much) we
found that every single checkpoint restore has the following error:

Caused by: java.io.IOException: s3a://: Stream is closed!
> at
> org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
> at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
> at java.io.FilterInputStream.read(FilterInputStream.java:83)
> at
> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
> at
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
> at
> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
> ... 17 more


I haven't really got any clues on what this is caused by. You notice we are
using the Hadoop file system, but switching to Presto is a bit tricky for
us because of some of the bucket permissions that would need to change.

Anyone have tips on debugging (or solving this)?


RocksDB CPU resource usage

2021-06-15 Thread Padarn Wilson
Hi all,

We have a job that we just enabled rocksdb on (instead of file backend),
and see that the CPU usage is almost 3x greater on (we had to increase
taskmanagers 3x to get it to run.

I don't really understand this, is there something we can look at to
understand why CPU use is so high? Our state mostly consists of aggregation
windows.

Cheers,
Padarn


Upgrade job topology in checkpoint

2021-06-11 Thread Padarn Wilson
Hi all,

I'm looking for some clarity about changing job topology as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/upgrading/#application-topology

My question is simple: Does this only apply to savepoints? Or can it also
work for checkpoints? (also why if not)

Cheers,
Padarn


Re: Setting max parallelism via properties

2021-02-28 Thread Padarn Wilson
Thanks a lot Kezhu, this fits the bill perfectly.

Thanks,
Padarn



On Sun, Feb 28, 2021 at 7:00 PM Kezhu Wang  wrote:

> Hi Padarn,
>
> There is a configuration option “pipeline.max-parallelism”.
>
> It is not a cluster wide configuration but client/job/pipeline side
> configuration which means you should bring this configuration
> from flink conf file to pipeline generation stage.
>
>
> If I understand correctly, `flink-on-k8s-operator` uses `flink run`(I
> found this in `flinkcluster_submit_job_script.go`) to submit job to
> cluster. This command already cover the bridge work, so I think it should
> just work in your case.
>
>
> pipeline-max-parallelism:
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#pipeline-max-parallelism
>
>
> Best,
> Kezhu Wang
>
> On February 28, 2021 at 16:45:03, Padarn Wilson (pad...@gmail.com) wrote:
>
> Hi all,
>
> Sorry for the basic question, but is it possible to set max
> parallelism using the flink conf file, rather than explicitly in code:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>
> Need this for a PR I am working on for the flink operator:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425
>
>


Setting max parallelism via properties

2021-02-28 Thread Padarn Wilson
Hi all,

Sorry for the basic question, but is it possible to set max
parallelism using the flink conf file, rather than explicitly in code:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Need this for a PR I am working on for the flink operator:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425


Re: Additional options to S3 Filesystem: Interest?

2020-10-13 Thread Padarn Wilson
Great. Thanks.

On Tue, Oct 13, 2020 at 4:29 PM Arvid Heise  wrote:

> Hi Padarn,
>
> I assigned the ticket to you, so you can start working on it. Here are
> some contribution guidelines [1] in case it's your first contribution.
>
> Basically, you will need to open a PR which contains the ticket and
> component. So the prefix should be "[FLINK-19589][s3]" (also for your
> commits).
>
> Feel free to reach out to me if you have any questions about the process.
> All discussions about the feature should be on the ticket, so everyone can
> see it.
>
> [1] https://flink.apache.org/contributing/contribute-code.html
>
> On Tue, Oct 13, 2020 at 3:37 AM Padarn Wilson  wrote:
>
>> Thanks for the feedback. I've created a JIRA here
>> https://issues.apache.org/jira/browse/FLINK-19589.
>>
>> @Dan: This indeed would make it easier to set a lifetime property on
>> objects created by Flink, but actually if you want to apply it to all your
>> objects for a given bucket you can set bucket wide policies instead. The
>> reason I want this is that we have a shared bucket and wish to tag
>> different objects based on which pipeline is producing them.
>>
>> On Tue, Oct 13, 2020 at 4:13 AM Dan Diephouse  wrote:
>>
>>> We use the StreamingFileSink. An option to expire files after some time
>>> period would certainly be welcome. (I could probably figure out a way to do
>>> this from the S3 admin UI too though)
>>>
>>> On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:
>>>
>>>> Hi Flink Users,
>>>>
>>>> We need to expose some additional options for the s3 hadoop filesystem:
>>>> Specifically, we want to set object tagging and lifecycle. This would be a
>>>> fairly easy change and we initially thought to create a new Filsystem with
>>>> very minor changes to allow this.
>>>>
>>>> However then I wondered, would others use this? If it something that is
>>>> worth raising as a Flink issue and then contributing back upstream.
>>>>
>>>> Any others who would like to be able to set object tags for the s3
>>>> filesystem?
>>>>
>>>> Cheers,
>>>> Padarn
>>>>
>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Padarn Wilson
Thanks for the feedback. I've created a JIRA here
https://issues.apache.org/jira/browse/FLINK-19589.

@Dan: This indeed would make it easier to set a lifetime property on
objects created by Flink, but actually if you want to apply it to all your
objects for a given bucket you can set bucket wide policies instead. The
reason I want this is that we have a shared bucket and wish to tag
different objects based on which pipeline is producing them.

On Tue, Oct 13, 2020 at 4:13 AM Dan Diephouse  wrote:

> We use the StreamingFileSink. An option to expire files after some time
> period would certainly be welcome. (I could probably figure out a way to do
> this from the S3 admin UI too though)
>
> On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:
>
>> Hi Flink Users,
>>
>> We need to expose some additional options for the s3 hadoop filesystem:
>> Specifically, we want to set object tagging and lifecycle. This would be a
>> fairly easy change and we initially thought to create a new Filsystem with
>> very minor changes to allow this.
>>
>> However then I wondered, would others use this? If it something that is
>> worth raising as a Flink issue and then contributing back upstream.
>>
>> Any others who would like to be able to set object tags for the s3
>> filesystem?
>>
>> Cheers,
>> Padarn
>>
>
>
> --
> Dan Diephouse
> @dandiep
>


Additional options to S3 Filesystem: Interest?

2020-10-10 Thread Padarn Wilson
Hi Flink Users,

We need to expose some additional options for the s3 hadoop filesystem:
Specifically, we want to set object tagging and lifecycle. This would be a
fairly easy change and we initially thought to create a new Filsystem with
very minor changes to allow this.

However then I wondered, would others use this? If it something that is
worth raising as a Flink issue and then contributing back upstream.

Any others who would like to be able to set object tags for the s3
filesystem?

Cheers,
Padarn


Re: [External] Measuring Kafka consumer lag

2020-06-15 Thread Padarn Wilson
Thanks Robert.

Yes we monitor many of the Flink internal metric, which is why I was
surprised that we were unable to notice the warning signs before our
consumers notified us.

It would be nice to measure the topic vs consumer group offset of the flink
consumer.

On Tue, Jun 16, 2020 at 1:57 AM Robert Metzger  wrote:

> Hi Padarn,
> I usually recommend the approach you described: accessing/monitoring the
> lag via Flink's metrics system. Sometimes it also makes sense to consider
> application level metrics.
> I checked Youtube for past Flink Forward talks, but I couldn't find a
> video. I'm sure there were users talking about best practices for
> monitoring Flink in the past ...
>
> Best,
> Robert
>
> On Sun, Jun 14, 2020 at 5:47 AM Padarn Wilson 
> wrote:
>
>> Hi all,
>>
>> I'm looking for some advice on how other people measure consumer lag for
>> Kafka consumers. Recently we had an application that looked like it was
>> performing identically to before, but all of a sudden the throughput of the
>> job decreased dramatically. However it was not clear from our Flink
>> metrics, only from the lag in time vs watermark time that our consumers
>> were measuring.
>>
>> How do people approach measuring this?
>>
>> Thanks,
>> Padarn
>>
>>
>> By communicating with Grab Inc and/or its subsidiaries, associate
>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>> have consented to the processing of your personal data as set out in the
>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>
>> This email contains confidential information and is only for the intended
>> recipient(s). If you are not the intended recipient(s), please do not
>> disseminate, distribute or copy this email Please notify Grab Group
>> immediately if you have received this by mistake and delete this email from
>> your system. Email transmission cannot be guaranteed to be secure or
>> error-free as any information therein could be intercepted, corrupted,
>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>> not accept liability for any errors or omissions in the contents of this
>> email arises as a result of email transmission. All intellectual property
>> rights in this email and attachments therein shall remain vested in Grab
>> Group, unless otherwise provided by law.
>>
>

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 
<https://grab.com/privacy/>


This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



[External] Measuring Kafka consumer lag

2020-06-13 Thread Padarn Wilson
Hi all,

I'm looking for some advice on how other people measure consumer lag for
Kafka consumers. Recently we had an application that looked like it was
performing identically to before, but all of a sudden the throughput of the
job decreased dramatically. However it was not clear from our Flink
metrics, only from the lag in time vs watermark time that our consumers
were measuring.

How do people approach measuring this?

Thanks,
Padarn

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



Re: Implicit Flink Context Documentation

2020-03-16 Thread Padarn Wilson
Thanks for the clarification. I'll dig in then!

On Mon, 16 Mar 2020, 3:47 pm Piotr Nowojski,  wrote:

> Hi,
>
> We are not maintaining internal docs. We have design docs for newly
> proposed features (previously informal design docs published on dev mailing
> list and recently as FLIP documents [1]), but keyed state is such an old
> concept that dates back so much into the past, that I’m pretty sure it pre
> dates any of that. So you would have to digg through the code if you want
> to understand it.
>
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> On 13 Mar 2020, at 16:14, Padarn Wilson  wrote:
>
> Thanks Piotr,
>
> Conceptually I understand (and use) the key'ed state quite a lot, but the
> implementation details are what I was looking for.
>
> It looks like
> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
> is what I'm looking for though. It would be cool if there were some
> internals design doc however? Quite hard to dig through the code as there
> is a log tied to how the execution of the job actually happens.
>
> Padarn
>
> On Fri, Mar 13, 2020 at 9:43 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Please take a look for example here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state
>> And the example in particular
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
>>
>> The part about "there is a specific key implicitly in context” might be
>> referring to the fact, that for every instance of `CountWindowAverage` that
>> will be running in the cluster, user doesn’t have to set the key context
>> explicility. Flink will set the the key context automatically for the
>> `ValueState> sum;` before any invocation of
>> `CountWindowAverage#flatMap` method.
>>
>> In other words, one parallel instance of `CountWindowAverage` function,
>> for two consecutive invocations of `CountWindowAverage#flatMap` can be
>> referring to different underlying value of `CountWindowAverage#sum` field.
>> For details you could take a look at
>> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
>> method and how it’s being used/implemented.
>>
>> I hope that helps.
>>
>> Piotrek
>>
>> On 13 Mar 2020, at 08:20, Padarn Wilson  wrote:
>>
>> Hi Users,
>>
>> I am trying to understand the details of how some aspects of Flink work.
>>
>> While understanding `keyed state` I kept coming up against a claim that 
>> `there
>> is a specific key implicitly in context` I would like to understand how
>> this works, which I'm guessing means understanding the details of the
>> runtime context: Is there any documentation or FLIP someone can recommend
>> on this?
>>
>>
>>
>


Re: Implicit Flink Context Documentation

2020-03-13 Thread Padarn Wilson
Thanks Piotr,

Conceptually I understand (and use) the key'ed state quite a lot, but the
implementation details are what I was looking for.

It looks like
`org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
is what I'm looking for though. It would be cool if there were some
internals design doc however? Quite hard to dig through the code as there
is a log tied to how the execution of the job actually happens.

Padarn

On Fri, Mar 13, 2020 at 9:43 PM Piotr Nowojski  wrote:

> Hi,
>
> Please take a look for example here:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state
> And the example in particular
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
>
> The part about "there is a specific key implicitly in context” might be
> referring to the fact, that for every instance of `CountWindowAverage` that
> will be running in the cluster, user doesn’t have to set the key context
> explicility. Flink will set the the key context automatically for the
> `ValueState> sum;` before any invocation of
> `CountWindowAverage#flatMap` method.
>
> In other words, one parallel instance of `CountWindowAverage` function,
> for two consecutive invocations of `CountWindowAverage#flatMap` can be
> referring to different underlying value of `CountWindowAverage#sum` field.
> For details you could take a look at
> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
> method and how it’s being used/implemented.
>
> I hope that helps.
>
> Piotrek
>
> On 13 Mar 2020, at 08:20, Padarn Wilson  wrote:
>
> Hi Users,
>
> I am trying to understand the details of how some aspects of Flink work.
>
> While understanding `keyed state` I kept coming up against a claim that `there
> is a specific key implicitly in context` I would like to understand how
> this works, which I'm guessing means understanding the details of the
> runtime context: Is there any documentation or FLIP someone can recommend
> on this?
>
>
>


Implicit Flink Context Documentation

2020-03-13 Thread Padarn Wilson
Hi Users,

I am trying to understand the details of how some aspects of Flink work.

While understanding `keyed state` I kept coming up against a claim that `there
is a specific key implicitly in context` I would like to understand how
this works, which I'm guessing means understanding the details of the
runtime context: Is there any documentation or FLIP someone can recommend
on this?


Re: End of Window Marker

2019-09-02 Thread Padarn Wilson
Hi Fabian,

> but each partition may only be written by a single task

Sorry I think I misunderstand something here then: If I have a topic with
one partition, but multiple sink tasks (or parallelism > 1).. this means
the data must all be shuffled to the single task writing that partition?

Padarn

On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske  wrote:

> Hi Padarn,
>
> Regarding your throughput concerns: A sink task may write to multiple
> partitions, but each partition may only be written by a single task.
>
> @Eduardo: Thanks for sharing your approach! Not sure if I understood it
> correctly, but I think that the approach does not guarantee that all
> results of a window are emitted before the end-of-window marker is written.
> Since the sink operator and the single-task-operator are separate
> operators, the output records might get stuck (or be bufffered) in one of
> the sink tasks and the single-task would still emit an end-of-window marker
> record because it doesn't know about the sink task.
>
> Best,
> Fabian
>
> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <
> eduardo.winpe...@gmail.com>:
>
>> Hi,
>>
>> I'll chip in with an approach I'm trying at the moment that seems to
>> work, and I say seems because I'm only running this on a personal project.
>>
>> Personally, I don't have anything against end-of-message markers per
>> partition, Padarn you seem to not prefer this option as it overloads the
>> meaning of the output payload. My approach is equally valid when producing
>> watermarks/end-of-message markers on a side output though.
>>
>> The main problem of both approaches is knowing when the window has
>> finished across all partitions without having to wait for the start of the
>> next window.
>>
>> I've taken the approach of sending all output messages of the window to
>> 1. the sink but also 2. a single task operator. The single task operator
>> registers an event time based timer at the time of the end of the window.
>> You have the confidence of the task's timer triggering only once at the
>> right time because all the post-window watermarks go through to the same
>> task. At that point I make the task send an end-of-message marker to every
>> partition. I don't need to send the count because Kafka messages are
>> ordered. AND IF you prefer to not overload the semantic of your original
>> Kafka topic you can post the message to a separate location of your choice.
>>
>> While this does mean that the end of marker message only gets sent
>> through once the window has finished across all substreams (as opposed to
>> per stream), it does mean you don't need to wait for the next window to
>> start AND the watermark gap between substreams should never grow that much
>> anyway.
>>
>> This approach should be particularly useful when the number of partitions
>> or keying mechanism is different between the input and output topics.
>>
>> Hopefully that doesn't sound like a terrible idea.
>>
>> eduardo
>>
>>
>>
>>
>> On Wed, 28 Aug 2019, 02:54 Padarn Wilson,  wrote:
>>
>>> Hi again Fabian,
>>>
>>> Thanks for pointing this out to me. In my case there is no need for
>>> keyed writing - but I do wonder if having each kafka task write only to a
>>> single partition would significantly affect performance.
>>>
>>> Actually now that I think about it, the approach to just wait for the
>>> first records of the next window is also subject to the problem you mention
>>> above: a producer lagging behind the rest could end up with a partition
>>> containing element out of ‘window order’.
>>>
>>> I was also thinking this problem is very similar to that of checkpoint
>>> barriers. I intended to dig into the details of the exactly once Kafka sink
>>> for some inspiration.
>>>
>>> Padarn
>>>
>>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske 
>>> wrote:
>>>
>>>> Hi Padarn,
>>>>
>>>> Yes, this is quite tricky.
>>>> The "problem" with watermarks is that you need to consider how you
>>>> write to Kafka.
>>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition
>>>> is written by multiple producers), you need to broadcast the watermarks to
>>>> each partition, i.e., each partition would receive watermarks from each
>>>> parallel sink task. So in order to reason about the current watermark of a
>>>> partition, you need to observe them and take the minimum WM acro

Re: End of Window Marker

2019-08-27 Thread Padarn Wilson
Hi again Fabian,

Thanks for pointing this out to me. In my case there is no need for keyed
writing - but I do wonder if having each kafka task write only to a single
partition would significantly affect performance.

Actually now that I think about it, the approach to just wait for the first
records of the next window is also subject to the problem you mention
above: a producer lagging behind the rest could end up with a partition
containing element out of ‘window order’.

I was also thinking this problem is very similar to that of checkpoint
barriers. I intended to dig into the details of the exactly once Kafka sink
for some inspiration.

Padarn

On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske  wrote:

> Hi Padarn,
>
> Yes, this is quite tricky.
> The "problem" with watermarks is that you need to consider how you write
> to Kafka.
> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is
> written by multiple producers), you need to broadcast the watermarks to
> each partition, i.e., each partition would receive watermarks from each
> parallel sink task. So in order to reason about the current watermark of a
> partition, you need to observe them and take the minimum WM across all
> current sink task WMs.
> Things become much easier, if each partition is only written by a single
> task but this also means that data is not key-partitioned in Kafka.
> In that case, the sink task only needs to write a WM message to each of
> its assigned partitions.
>
> Hope this helps,
> Fabian
>
>
> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson  >:
>
>> Hi Fabian, thanks for your input
>>
>> Exactly. Actually my first instinct was to see if it was possible to
>> publish the watermarks somehow - my initial idea was to insert regular
>> watermark messages into each partition of the stream, but exposing this
>> seemed quite troublesome.
>>
>> > In that case, you could have a ProcessFunction that is chained before
>> the sink and which counts the window results per time slice and emits the
>> result when the watermark passes to a side output.
>> All side output messages are collected by a single task and can be
>> published to a Kafka topic or even be made available via Queryable State.
>>
>> I understand the idea here (and exactly once semantics are probably fine
>> for my use case), but counting events seems a bit fragile. I'm not totally
>> confident the consumer can guarantee it won't read duplicates (its a golang
>> kafka library that seems to have some quirks).
>>
>> I think ideally each partition of the kafka topic would have some regular
>> information about watermarks. Perhaps the kafka producer can be modified to
>> support this.
>>
>> Padarn
>>
>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske  wrote:
>>
>>> Hi Padarn,
>>>
>>> What you describe is essentially publishing Flink's watermarks to an
>>> outside system.
>>> Flink processes time windows, by waiting for a watermark that's past the
>>> window end time. When it receives such a WM it processes and emits all
>>> ended windows and forwards the watermark.
>>> When a sink received a WM for say 12:45:15, you know that all window
>>> results with until 12:45:00 have been emitted.
>>> Hence, the watermark tells you about the completeness of data.
>>>
>>> However, using this information is not so easy, mostly because of the
>>> failure semantics.
>>> Things become much easier if you produce to Kafka with exactly-once
>>> semantics.
>>>
>>> In that case, you could have a ProcessFunction that is chained before
>>> the sink and which counts the window results per time slice and emits the
>>> result when the watermark passes to a side output.
>>> All side output messages are collected by a single task and can be
>>> published to a Kafka topic or even be made available via Queryable State.
>>>
>>> For at-least once output, it's much harder because you'll have
>>> duplicates in the output after a job recovered.
>>>
>>> Best, Fabian
>>>
>>> I think you have two options to let the consuming app know about the
>>> progress.
>>> You can either
>>>
>>> The ProcessFunction could count per window end timestamp how many
>>> records passed and forward that information via a side output.
>>> You could then
>>>
>>>
>>> Essentially, you'd like to publish Flink's watermarks to an outside
>>> system (possibly via Kafka).
>>>
>>>
>>> Am Mo., 12. Aug. 2019 um 14:33

Re: End of Window Marker

2019-08-16 Thread Padarn Wilson
Hi Fabian, thanks for your input

Exactly. Actually my first instinct was to see if it was possible to
publish the watermarks somehow - my initial idea was to insert regular
watermark messages into each partition of the stream, but exposing this
seemed quite troublesome.

> In that case, you could have a ProcessFunction that is chained before the
sink and which counts the window results per time slice and emits the
result when the watermark passes to a side output.
All side output messages are collected by a single task and can be
published to a Kafka topic or even be made available via Queryable State.

I understand the idea here (and exactly once semantics are probably fine
for my use case), but counting events seems a bit fragile. I'm not totally
confident the consumer can guarantee it won't read duplicates (its a golang
kafka library that seems to have some quirks).

I think ideally each partition of the kafka topic would have some regular
information about watermarks. Perhaps the kafka producer can be modified to
support this.

Padarn

On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske  wrote:

> Hi Padarn,
>
> What you describe is essentially publishing Flink's watermarks to an
> outside system.
> Flink processes time windows, by waiting for a watermark that's past the
> window end time. When it receives such a WM it processes and emits all
> ended windows and forwards the watermark.
> When a sink received a WM for say 12:45:15, you know that all window
> results with until 12:45:00 have been emitted.
> Hence, the watermark tells you about the completeness of data.
>
> However, using this information is not so easy, mostly because of the
> failure semantics.
> Things become much easier if you produce to Kafka with exactly-once
> semantics.
>
> In that case, you could have a ProcessFunction that is chained before the
> sink and which counts the window results per time slice and emits the
> result when the watermark passes to a side output.
> All side output messages are collected by a single task and can be
> published to a Kafka topic or even be made available via Queryable State.
>
> For at-least once output, it's much harder because you'll have duplicates
> in the output after a job recovered.
>
> Best, Fabian
>
> I think you have two options to let the consuming app know about the
> progress.
> You can either
>
> The ProcessFunction could count per window end timestamp how many records
> passed and forward that information via a side output.
> You could then
>
>
> Essentially, you'd like to publish Flink's watermarks to an outside system
> (possibly via Kafka).
>
>
> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson  >:
>
>> Hello Users,
>>
>> I have a question that is perhaps not best solved within Flink: It has to
>> do with notifying a downstream application that a Flink window has
>> completed.
>>
>> The (simplified) scenario is this:
>> - We have a Flink job that consumes from Kafka, does some preprocessing,
>> and then has a sliding window of 10 minutes and slide time of 1 minute.
>> - The number of keys in each slide is not fixed
>> - The output of the window is then output to Kafka, which is read by a
>> downstream application.
>>
>> What I want to achieve is that the downstream application can someone
>> know when it has read all of the data for a single window, without waiting
>> for the next window to arrive.
>>
>> Some options I've considered:
>> - Producing a second window over the window results that counts the
>> output size, which can then be used by the downstream application to see
>> when it has received the same number: This seems fragile, as there it
>> relies on there being no loss or duplication of data. Its also an extra
>> window and Kafka stream which is a tad messy.
>> - Somehow adding an 'end of window' element to each partitions of the
>> Kafka topic which can be read by the consumer: This seems a bit messy
>> because it mixes different types of events into the same Kafka stream, and
>> there is no really simple way to do this in Flink
>> - Package the whole window output into a single message and make this the
>> unit of transaction: This is possible, but the message would be quite large
>> then (at least 10s of mb), as the volume of this stream is quite large.
>> - Assume that if the consumer has no elements to read, or if the next
>> window has started to be read, then it has read the whole window: This
>> seems reasonable, and if it wasn't for the fact that my consumer on the
>> application end was a bit inflexible right now, it is probably the solution
>> I would use.
>>
>> Any further/better ideas?
>>
>> Thanks
>> Padarn
>>
>


Re: [External] Flink 1.7.1 on EMR metrics

2019-06-02 Thread Padarn Wilson
Doh - stupid mistake. Thanks for pointing that out.

On Sun, Jun 2, 2019 at 12:52 PM Bowen Li  wrote:

> To answer your question on your debugging code, your reporter has a bug:
>
> log.info("STATSD SENDING: ", name, value);
>
> should be ->
>
> log.info("STATSD SENDING: {} {}", name, value);
>
>
> -
>
>
>
>
> On Sat, Jun 1, 2019 at 7:30 PM Padarn Wilson  wrote:
>
>> Thanks both: Using the the inbuilt Slf4j reporter is a great idea, I
>> will do this.
>>
>> @Peter.Groesbeck - appreciate  the config. This looks very similar to
>> what I had, but if it is working for you perhaps there is something else
>> missing from our EMR setup. Will go back and double check the connectivity
>> from all the instances.
>>
>>
>>
>> On Thu, May 30, 2019 at 9:42 PM Peter Groesbeck <
>> peter.groesb...@gmail.com> wrote:
>>
>>> Hi Padarn for what it's worth I am using DataDog metrics on EMR with
>>> Flink 1.7.1 and this here my flink-conf configuration:
>>>
>>> - Classification: flink-conf
>>>   ConfigurationProperties:
>>> metrics.reporter.dghttp.class: 
>>> org.apache.flink.metrics.datadog.DatadogHttpReporter
>>> metrics.reporter.dghttp.apikey: 
>>> metrics.reporter.dghttp.tags: 
>>> 'service:myservice,env:prod,region:us-east-1'
>>> metrics.scope.jm: 'jobmanager'
>>> metrics.scope.jm.job: 'jobmanager'
>>> metrics.scope.operator: 'taskmanager..'
>>> metrics.scope.task: 'taskmanager..'
>>> metrics.scope.tm: 'taskmanager'
>>> metrics.scope.tm.job: 'taskmanager'
>>>   Configurations: []
>>>
>>>
>>> On Thu, May 30, 2019 at 6:46 AM Yun Tang  wrote:
>>>
>>>> Hi Padarn
>>>>
>>>> If you want to verify why no metrics sending out, how about using the
>>>> built-in Slf4j reporter [1] which would record metrics in logs.
>>>> If you could view the metrics after enabled slf4j-reporter, you could
>>>> then compare the configurations.
>>>>
>>>> Best
>>>> Yun Tang
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>>>
>>>> --
>>>> *From:* Padarn Wilson 
>>>> *Sent:* Thursday, May 30, 2019 18:20
>>>> *To:* user
>>>> *Subject:* [External] Flink 1.7.1 on EMR metrics
>>>>
>>>> Hello all,
>>>>
>>>> I am trying to run Flink 1.7.1 on EMR and having some trouble with
>>>> metric reporting.
>>>>
>>>> I was using the DataDogHttpReporter, but have also tried the
>>>> StatsDReporter, but with both was seeing no metrics being collected.
>>>>
>>>> To debug this I implemented my own reporter (based on StatsDReporter)
>>>> and logged the name of the metric being sent:
>>>>
>>>> private void send(final String name, final String value) {
>>>>log.info("STATSD SENDING: ", name, value);
>>>>try {
>>>>   String formatted = String.format("%s:%s|g", name, value);
>>>>   byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
>>>>   socket.send(new DatagramPacket(data, data.length, this.address));
>>>>}
>>>>catch (IOException e) {
>>>>   LOG.error("unable to send packet to statsd at '{}:{}'", 
>>>> address.getHostName(), address.getPort());
>>>>}
>>>> }
>>>>
>>>>
>>>> This code is certainly reached, because in my log I see a lot of this:
>>>>
>>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter   
>>>>  - STATSD SENDING:
>>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter   
>>>>  - STATSD SENDING:
>>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter   
>>>>  - STATSD SENDING:
>>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter   
>>>>  - STATSD SENDING:
>>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter   
>>>>  - STATSD SENDING:
>>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter   
>>>>

Re: [External] Flink 1.7.1 on EMR metrics

2019-06-01 Thread Padarn Wilson
Thanks both: Using the the inbuilt Slf4j reporter is a great idea, I will
do this.

@Peter.Groesbeck - appreciate  the config. This looks very similar to what
I had, but if it is working for you perhaps there is something else missing
from our EMR setup. Will go back and double check the connectivity from all
the instances.



On Thu, May 30, 2019 at 9:42 PM Peter Groesbeck 
wrote:

> Hi Padarn for what it's worth I am using DataDog metrics on EMR with Flink
> 1.7.1 and this here my flink-conf configuration:
>
> - Classification: flink-conf
>   ConfigurationProperties:
> metrics.reporter.dghttp.class: 
> org.apache.flink.metrics.datadog.DatadogHttpReporter
> metrics.reporter.dghttp.apikey: 
> metrics.reporter.dghttp.tags: 
> 'service:myservice,env:prod,region:us-east-1'
> metrics.scope.jm: 'jobmanager'
> metrics.scope.jm.job: 'jobmanager'
> metrics.scope.operator: 'taskmanager..'
> metrics.scope.task: 'taskmanager..'
> metrics.scope.tm: 'taskmanager'
> metrics.scope.tm.job: 'taskmanager'
>   Configurations: []
>
>
> On Thu, May 30, 2019 at 6:46 AM Yun Tang  wrote:
>
>> Hi Padarn
>>
>> If you want to verify why no metrics sending out, how about using the
>> built-in Slf4j reporter [1] which would record metrics in logs.
>> If you could view the metrics after enabled slf4j-reporter, you could
>> then compare the configurations.
>>
>> Best
>> Yun Tang
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>
>> --
>> *From:* Padarn Wilson 
>> *Sent:* Thursday, May 30, 2019 18:20
>> *To:* user
>> *Subject:* [External] Flink 1.7.1 on EMR metrics
>>
>> Hello all,
>>
>> I am trying to run Flink 1.7.1 on EMR and having some trouble with metric
>> reporting.
>>
>> I was using the DataDogHttpReporter, but have also tried the
>> StatsDReporter, but with both was seeing no metrics being collected.
>>
>> To debug this I implemented my own reporter (based on StatsDReporter) and
>> logged the name of the metric being sent:
>>
>> private void send(final String name, final String value) {
>>log.info("STATSD SENDING: ", name, value);
>>try {
>>   String formatted = String.format("%s:%s|g", name, value);
>>   byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
>>   socket.send(new DatagramPacket(data, data.length, this.address));
>>}
>>catch (IOException e) {
>>   LOG.error("unable to send packet to statsd at '{}:{}'", 
>> address.getHostName(), address.getPort());
>>}
>> }
>>
>>
>> This code is certainly reached, because in my log I see a lot of this:
>>
>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>> 2019-05-30 10:18:40,352 INFO  com.grab.statsd.StatsDReporter 
>>- STATSD SENDING:
>>
>> As you can see, the name and value for the metric being reported is empty.
>>
>>
>> And the logs show everything initialized fine with no error:
>>
>> 2019-05-30 10:18:30,342 INFO  
>> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Configuring 
>> stsd with {port=8125, host=127.0.0.1, class=com.grab.statsd.StatsDReporter}.
>> 2019-05-30 10:18:30,344 INFO  com.grab.statsd.StatsDReporter 
>>- Configured StatsDReporter with {host:127.0.0.1, port:8125}
>> 2019-05-30 10:18:30,344 INFO  
>> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Periodically 
>> reporting metrics in intervals of 10 SECONDS for reporter stsd of type 
>> com.grab.statsd.StatsDReporter.
>>
>>
>>
>>
>> Has anyone else  tried to work with Flink and metrics on EMR 1.7.1 (latest 
>> version on EMR). If so, any pointers as to what could be set up incorrectly?
>>
>>
>

[External] Flink 1.7.1 on EMR metrics

2019-05-30 Thread Padarn Wilson
Hello all,

I am trying to run Flink 1.7.1 on EMR and having some trouble with metric
reporting.

I was using the DataDogHttpReporter, but have also tried the
StatsDReporter, but with both was seeing no metrics being collected.

To debug this I implemented my own reporter (based on StatsDReporter) and
logged the name of the metric being sent:

private void send(final String name, final String value) {
   log.info("STATSD SENDING: ", name, value);
   try {
  String formatted = String.format("%s:%s|g", name, value);
  byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
  socket.send(new DatagramPacket(data, data.length, this.address));
   }
   catch (IOException e) {
  LOG.error("unable to send packet to statsd at '{}:{}'",
address.getHostName(), address.getPort());
   }
}


This code is certainly reached, because in my log I see a lot of this:

2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:
2019-05-30 10:18:40,352 INFO  com.grab.statsd.StatsDReporter
 - STATSD SENDING:

As you can see, the name and value for the metric being reported is empty.


And the logs show everything initialized fine with no error:

2019-05-30 10:18:30,342 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl   -
Configuring stsd with {port=8125, host=127.0.0.1,
class=com.grab.statsd.StatsDReporter}.
2019-05-30 10:18:30,344 INFO  com.grab.statsd.StatsDReporter
 - Configured StatsDReporter with {host:127.0.0.1,
port:8125}
2019-05-30 10:18:30,344 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl   -
Periodically reporting metrics in intervals of 10 SECONDS for reporter
stsd of type com.grab.statsd.StatsDReporter.




Has anyone else  tried to work with Flink and metrics on EMR 1.7.1
(latest version on EMR). If so, any pointers as to what could be set
up incorrectly?

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to the processing of your 
personal data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email Please notify Grab Group immediately if you have received 
this by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.



Re: Flink metrics missing from UI 1.7.2

2019-03-23 Thread Padarn Wilson
Aha! This is almost certainly it. I remembered thinking something like this
might be a problem. I'll need to change the deployment a bit to add this
(not straightforward to edit the YAML in my case, but thanks!

On Sun, Mar 24, 2019 at 10:01 AM dawid <
apache-flink-user-mailing-list-arch...@davidhaglund.se> wrote:

> Padarn Wilson-2 wrote
> > I am running Fink 1.7.2 on Kubernetes in a setup with task manager and
> job
> > manager separate.
> >
> > I'm having trouble seeing the metrics from my Flink job in the UI
> > dashboard. Actually I'm using the Datadog reporter to expose most of my
> > metrics, but latency tracking does not seem to be exported.
> >
> > Is there anything extra that needs to be enabled to make sure metrics are
> > exported and viewable to the Flink UI?
>
> With Flink 1.7 on Kubernetes you need to make sure the task managers are
> registering to the job manager with their IP addresses and not the
> hostnames, see the taskmanager-deployment.yaml manifest in [1], with the
> K8S_POD_IP environment variable and setting
> -Dtaskmanager.host=$(K8S_POD_IP).
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html#appendix
>
> /David
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink metrics missing from UI 1.7.2

2019-03-23 Thread Padarn Wilson
Thanks David. I cannot see the metrics there, so let me play around a bit
more and make sure they are enabled correctly.

On Sat, Mar 23, 2019 at 9:19 PM David Anderson  wrote:

> > I have done this (actually I do it in my flink-conf.yaml), but I am not
> seeing any metrics at all in the Flink UI,
> > let alone the latency tracking. The latency tracking itself does not
> seem to be exported to datadog (should it be?)
>
> The latency metrics are job metrics, and are not shown in the Flink UI.
> They are available via the REST API, and I believe they should also be
> exported to datadog. You will find them at
>
> http://localhost:8081/jobs//metrics
>
> with IDs like
>
>
> latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.ea632d67b7d595e5b851708ae9ad79d6.operator_subtask_index.0.latency_p90
>
> On Sat, Mar 23, 2019 at 1:53 PM Padarn Wilson  wrote:
>
>> Thanks David.
>>
>> I have done this (actually I do it in my flink-conf.yaml), but I am not
>> seeing any metrics at all in the Flink UI, let alone the latency tracking.
>> The latency tracking itself does not seem to be exported to datadog (should
>> it be?)
>>
>>
>>
>> On Sat, Mar 23, 2019 at 8:43 PM David Anderson 
>> wrote:
>>
>>> Because latency tracking is expensive, it is turned off by default. You
>>> turn it on by setting the interval; that looks something like this:
>>>
>>> env.getConfig().setLatencyTrackingInterval(1000);
>>>
>>> The full set of configuration options is described in the docs:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#metrics
>>>
>>> David Anderson | Training Coordinator
>>> Follow us @VervericaData
>>>
>>> --
>>> Join Flink Forward - The Apache Flink Conference
>>> Stream Processing | Event Driven | Real Time
>>>
>>>
>>> On Sat, Mar 23, 2019 at 1:03 PM Padarn Wilson  wrote:
>>> >
>>> > Hi User,
>>> >
>>> > I am running Fink 1.7.2 on Kubernetes in a setup with task manager and
>>> job manager separate.
>>> >
>>> > I'm having trouble seeing the metrics from my Flink job in the UI
>>> dashboard. Actually I'm using the Datadog reporter to expose most of my
>>> metrics, but latency tracking does not seem to be exported.
>>> >
>>> > Is there anything extra that needs to be enabled to make sure metrics
>>> are exported and viewable to the Flink UI?
>>> >
>>> > Thanks
>>>
>>>>


Flink metrics missing from UI 1.7.2

2019-03-23 Thread Padarn Wilson
Hi User,

I am running Fink 1.7.2 on Kubernetes in a setup with task manager and job
manager separate.

I'm having trouble seeing the metrics from my Flink job in the UI
dashboard. Actually I'm using the Datadog reporter to expose most of my
metrics, but latency tracking does not seem to be exported.

Is there anything extra that needs to be enabled to make sure metrics are
exported and viewable to the Flink UI?

Thanks


Re: Setting source vs sink vs window parallelism with data increase

2019-03-23 Thread Padarn Wilson
Well.. it turned out I was registering millions of timers by accident,
which was why garbage collection was blowing up. Oops. Thanks for your help
again.

On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson  wrote:

> Thanks a lot for your suggestion. I’ll dig into it and update for the
> mailing list if I find anything useful.
>
> Padarn
>
> On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski  wrote:
>
>> Re-adding user mailing list.
>>
>>
>> Hi,
>>
>> If it is a GC issue, only GC logs or some JVM memory profilers (like
>> Oracle’s Mission Control) can lead you to the solution. Once you confirm
>> that it’s a GC issue, there are numerous resources online how to analyse
>> the cause of the problem. For that, it is difficult to use CPU
>> profiling/Flink Metrics, since GC issues caused by one thread, can cause
>> performance bottlenecks in other unrelated places.
>>
>> If that’s not a GC issue, you can use Flink metrics (like number of
>> buffered input/output data) to find Task that’s causing a bottleneck. Then
>> you can use CPU profiler to analyse why is that happening.
>>
>> Piotrek
>>
>> On 6 Mar 2019, at 02:52, Padarn Wilson  wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your feedback. Makes sense about the checkpoint barriers -
>> this definitely could be the cause of a problem.
>>
>> I would advice profiling your job to find out what’s going on.
>>
>>
>> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
>> for tools with which to do this?
>>
>> The main thing I'm trying to pin down is:
>> 1) Is it the downstream processing from the expansion of records that
>> causes a problem, or
>> 2) Is is the shuffle of all the records after the expansion which is
>> taking a large time - if so, is there anything I can do to mitigate this
>> other than trying to ensure less shuffle.
>>
>> Thanks,
>> Padarn
>>
>>
>> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Do you mind elaborating on this? What technology would you propose as an
>>> alternative, and why would this increase checkpointing time?
>>>
>>>
>>> The problem is that when Flink starts checkpoint and inject checkpoint
>>> barriers, those checkpoint barriers travel through the Job Graph. The
>>> quicker they can do that the better. How fast does it take depends on the
>>> amount of buffered data before checkpoint barriers (currently all of such
>>> records must be processed before checkpoint barrier is passed down stream).
>>> The more buffered records and the more time it takes to process those
>>> records, the longer the checkpoint take time. Obviously if one stage in the
>>> job is multiplying the amount of records, it can in a way multiply the
>>> amount of “buffered work” that needs to be processed before checkpoint
>>> barriers pass through.
>>>
>>> However it might not be the case for you. To analyse what’s going on you
>>> would need to look at various Flink metrics, like checkpoint times, back
>>> pressured tasks, state of the output/input buffers of the tasks, etc.
>>> However #2, those are secondary issues. First of all you should try to pin
>>> point the cause of long GC pauses. If it comes from your code, you should
>>> fix this first. If that either isn’t the issue or doesn’t solve it,
>>> generally speaking I would advice profiling your job to find out what’s
>>> going on.
>>>
>>> Piotrek
>>>
>>> On 5 Mar 2019, at 02:00, Padarn Wilson  wrote:
>>>
>>> Hi Piotr,
>>>
>>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>>> this may significantly increase checkpointing time.
>>>
>>>
>>> Do you mind elaborating on this? What technology would you propose as an
>>> alternative, and why would this increase checkpointing time?
>>>
>>> However you might want to first identify what’s causing long GC times.
>>>>
>>>
>>> My current plan is to try and enable GC logs and see if I can get
>>> something meaningful from them.
>>>
>>> Thanks a lot,
>>> Padarn
>>>
>>>
>>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi,
>>>>
&g

Re: Setting source vs sink vs window parallelism with data increase

2019-03-06 Thread Padarn Wilson
Thanks a lot for your suggestion. I’ll dig into it and update for the
mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski  wrote:

> Re-adding user mailing list.
>
>
> Hi,
>
> If it is a GC issue, only GC logs or some JVM memory profilers (like
> Oracle’s Mission Control) can lead you to the solution. Once you confirm
> that it’s a GC issue, there are numerous resources online how to analyse
> the cause of the problem. For that, it is difficult to use CPU
> profiling/Flink Metrics, since GC issues caused by one thread, can cause
> performance bottlenecks in other unrelated places.
>
> If that’s not a GC issue, you can use Flink metrics (like number of
> buffered input/output data) to find Task that’s causing a bottleneck. Then
> you can use CPU profiler to analyse why is that happening.
>
> Piotrek
>
> On 6 Mar 2019, at 02:52, Padarn Wilson  wrote:
>
> Hi Piotr,
>
> Thanks for your feedback. Makes sense about the checkpoint barriers - this
> definitely could be the cause of a problem.
>
> I would advice profiling your job to find out what’s going on.
>
>
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
> for tools with which to do this?
>
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that
> causes a problem, or
> 2) Is is the shuffle of all the records after the expansion which is
> taking a large time - if so, is there anything I can do to mitigate this
> other than trying to ensure less shuffle.
>
> Thanks,
> Padarn
>
>
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>>
>> The problem is that when Flink starts checkpoint and inject checkpoint
>> barriers, those checkpoint barriers travel through the Job Graph. The
>> quicker they can do that the better. How fast does it take depends on the
>> amount of buffered data before checkpoint barriers (currently all of such
>> records must be processed before checkpoint barrier is passed down stream).
>> The more buffered records and the more time it takes to process those
>> records, the longer the checkpoint take time. Obviously if one stage in the
>> job is multiplying the amount of records, it can in a way multiply the
>> amount of “buffered work” that needs to be processed before checkpoint
>> barriers pass through.
>>
>> However it might not be the case for you. To analyse what’s going on you
>> would need to look at various Flink metrics, like checkpoint times, back
>> pressured tasks, state of the output/input buffers of the tasks, etc.
>> However #2, those are secondary issues. First of all you should try to pin
>> point the cause of long GC pauses. If it comes from your code, you should
>> fix this first. If that either isn’t the issue or doesn’t solve it,
>> generally speaking I would advice profiling your job to find out what’s
>> going on.
>>
>> Piotrek
>>
>> On 5 Mar 2019, at 02:00, Padarn Wilson  wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>
>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>> However you might want to first identify what’s causing long GC times.
>>>
>>
>> My current plan is to try and enable GC logs and see if I can get
>> something meaningful from them.
>>
>> Thanks a lot,
>> Padarn
>>
>>
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> What Flink version are you using?
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>>
>>> However you might want to first identify what’s causing long GC times.
>>> If there are long GC pause, this should be the first thing to fix.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2019, at 08:19, Padarn Wilson  wrote:
>>>
>>> Hi all again - following up on this I think I've identified my problem
>>> as being something else, but would appreciate if a

Re: Setting source vs sink vs window parallelism with data increase

2019-03-01 Thread Padarn Wilson
Hi all again - following up on this I think I've identified my problem as
being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for
old generation starts to take a very long time:
[image: Screen Shot 2019-03-02 at 3.01.57 PM.png]
here the* purple line is young generation time*, this is ever increasing,
but grows slowly, while the *blue is old generation*.
This in itself is not a problem, but as soon as the next checkpoint is
triggered after this happens you see the following:
[image: Screen Shot 2019-03-02 at 3.02.48 PM.png]
It looks like the checkpoint hits a cap, but this is only because the
checkpoints start to timeout and fail (these are the alignment time per
operator)

I do notice that my state is growing quite larger over time, but I don't
have a good understanding of what would cause this to happen with the JVM
old generation metric, which appears to be the leading metric before a
problem is noticed. Other metrics such as network buffers also show that at
the checkpoint time things start to go haywire and the situation never
recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson  wrote:

> Hi all,
>
> I'm trying to process many records, and I have an expensive operation I'm
> trying to optimize. Simplified it is something like:
>
> Data: (key1, count, time)
>
> Source -> Map(x -> (x, newKeyList(x.key1))
> -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
> -> Keyby(_.key1).TublingWindow().apply..
> -> Sink
>
> In the Map -> Flatmap, what is happening is that each key is mapping to a
> set of keys, and then this is set as the new key. This effectively increase
> the size of the stream by 16x
>
> What I am trying to figure out is how to set the parallelism of my
> operators. I see in some comments that people suggest your source, sink and
> aggregation should have different parallelism, but I'm not clear on exactly
> why, or what this means for CPU utilization.
> (see for example
> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
> )
>
> Also, it isn't clear to me the best way to handle this increase in data
> within the stream itself.
>
> Thanks
>


Re: Flink Standalone cluster - production settings

2019-02-28 Thread Padarn Wilson
Are you able to give some detail on in which cases you might be better off
setting higher (or lower) parallelism for an operator?

On Thu, Feb 21, 2019 at 9:54 PM Hung  wrote:

> / Each job has 3 asynch operators
> with Executors with thread counts of 20,20,100/
>
> Flink handles parallelisms for you. If you want a higher parallelism of a
> operator, you can call setParallelism()
> for example,
>
> flatMap(new Mapper1()).setParallelism(20)
> flatMap(new Mapper2()).setParallelism(20)
> flatMap(new Mapper3()).setParallelism(100)
>
> You can check the official document here
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/parallel.html#setting-the-parallelism
>
> /Currently we are using parallelism = 1/
> I guess you set the job level parallelism
>
> I would suggest you replace Executors with the use of Flink parallelisms.
> It
> would be more efficient so
> you don't create the other thread pool although you already have one that
> flink provides you(I maybe not right describing this concept)
>
> Cheers,
>
> Sendoh
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Setting source vs sink vs window parallelism with data increase

2019-02-28 Thread Padarn Wilson
Hi all,

I'm trying to process many records, and I have an expensive operation I'm
trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
-> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
-> Keyby(_.key1).TublingWindow().apply..
-> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a
set of keys, and then this is set as the new key. This effectively increase
the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my
operators. I see in some comments that people suggest your source, sink and
aggregation should have different parallelism, but I'm not clear on exactly
why, or what this means for CPU utilization.
(see for example
https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
)

Also, it isn't clear to me the best way to handle this increase in data
within the stream itself.

Thanks


Re: Collapsing watermarks after keyby

2019-02-28 Thread Padarn Wilson
I created a small test to see if I could replicate this... but I couldn't
:-) Below is my code that provides a counter example. It is not very clean,
but perhaps it is useful for someone else in the future:


class SessionWindowTest extends FunSuite with Matchers {

  test("Should advance watermark correctly") {


val startTime = 0L

val elements1 = List[Tester](
  Tester("id1:a", "id2:a", startTime),
  Tester("id1:b", "id2:a", startTime+1),
  Tester("id1:b", "id2:a", startTime+100),
  Tester("id1:a", "id2:a", startTime+1)
)

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.getConfig.disableSysoutLogging()

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Tester] {

  override def extractTimestamp(element: Tester,
previousElementTimestamp: Long): Long = {
element.time
  }
  override def checkAndGetNextWatermark(lastElement: Tester,
extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
  }
}

val stream = streamEnv.addSource(new SourceFunction[Tester]() {
  def run(ctx: SourceFunction.SourceContext[Tester]) {
elements1.foreach {
  ctx.collect
}
  }
  override def cancel(): Unit = {}
}).assignTimestampsAndWatermarks(new PunctuatedAssigner)


val sessionsStream = stream
.keyBy(_.id1)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(2)))
  .apply(
(key: String, windowInfo, iter: Iterable[Tester], collector:
Collector[Tester]) => {
  val elements = iter.toList
  println("Session window. Elements:", elements)
  println(windowInfo)
  collector.collect(elements.reverse.head)
})

val countStream = sessionsStream
.keyBy(_.id2)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply(
(key: String, windowInfo, iter: Iterable[Tester], collector:
Collector[Tester]) => {
  val elements = iter.toList
  println("Tumbling window. Elements:", elements,
windowInfo.getStart, windowInfo.getEnd)
  collector.collect(elements.reverse.head)
})

    sessionsStream.print()
countStream.print()

streamEnv.execute()

  }

}


On Tue, Feb 26, 2019 at 10:49 PM Padarn Wilson  wrote:

> Okay. I think I still must misunderstand something here. I will work on
> building a unit test around this, hopefully this clears up my confusion.
>
> Thank you,
> Padarn
>
> On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann 
> wrote:
>
>> Operator's with multiple inputs emit the minimum of the input's
>> watermarks downstream. In case of a keyBy this means that the watermark is
>> sent to all downstream consumers.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson  wrote:
>>
>>> Just to add: by printing intermediate results I see that I definitely
>>> have more than five minutes of data, and by windowing without the session
>>> windows I see that event time watermarks do seem to be generated as
>>> expected.
>>>
>>> Thanks for your help and time.
>>>
>>> Padarn
>>>
>>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson  wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I will work on an example, but I’m a little confused by how keyBy and
>>>> watermarks work in this case. This documentation says (
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>>> ):
>>>>
>>>>
>>>> Some operators consume multiple input streams; a union, for example, or
>>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>>> operator’s current event time is the minimum of its input streams’ event
>>>> times. As its input streams update their event times, so does the operator.
>>>>
>>>>
>>>> This implies to me that the keyBy splits the watermark?
>>>>
>>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Padarn,
>>>>>
>>>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>>>> global. Therefore, I would suspect that it is rather a problem with
>>>>> generating watermarks at all. Could it be that your input data does not
>>>>> span a period longer than 5 minutes and also does not terminate? A

Re: Collapsing watermarks after keyby

2019-02-26 Thread Padarn Wilson
Okay. I think I still must misunderstand something here. I will work on
building a unit test around this, hopefully this clears up my confusion.

Thank you,
Padarn

On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann  wrote:

> Operator's with multiple inputs emit the minimum of the input's watermarks
> downstream. In case of a keyBy this means that the watermark is sent to all
> downstream consumers.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson  wrote:
>
>> Just to add: by printing intermediate results I see that I definitely
>> have more than five minutes of data, and by windowing without the session
>> windows I see that event time watermarks do seem to be generated as
>> expected.
>>
>> Thanks for your help and time.
>>
>> Padarn
>>
>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson  wrote:
>>
>>> Hi Till,
>>>
>>> I will work on an example, but I’m a little confused by how keyBy and
>>> watermarks work in this case. This documentation says (
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>> ):
>>>
>>>
>>> Some operators consume multiple input streams; a union, for example, or
>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>> operator’s current event time is the minimum of its input streams’ event
>>> times. As its input streams update their event times, so does the operator.
>>>
>>>
>>> This implies to me that the keyBy splits the watermark?
>>>
>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Padarn,
>>>>
>>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>>> global. Therefore, I would suspect that it is rather a problem with
>>>> generating watermarks at all. Could it be that your input data does not
>>>> span a period longer than 5 minutes and also does not terminate? Another
>>>> problem could be the CountTrigger which should not react to the window's
>>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>>> I think this will cause the window to not fire. Maybe a working example
>>>> program with example input could be helpful for further debugging.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson  wrote:
>>>>
>>>>> Hi Flink Mailing List,
>>>>>
>>>>> Long story short - I want to somehow collapse watermarks at an
>>>>> operator across keys, so that keys with dragging watermarks do not drag
>>>>> behind. Details below:
>>>>>
>>>>> ---
>>>>>
>>>>> I have an application in which I want to perform the follow sequence
>>>>> of steps: Assume my data is made up of data that has: (time, user,
>>>>> location, action)
>>>>>
>>>>> -> Read source
>>>>> -> KeyBy (UserId, Location)
>>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>>> Session)
>>>>> -> TriggerOnFirst event
>>>>> -> KeyBy (Location)
>>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>>> -> Count
>>>>>
>>>>> The end intention is to count the number of unique users in a given
>>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>>> counted once.
>>>>>
>>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>>> has the following `TriggerResult" funtion:
>>>>>
>>>>> @Override
>>>>> public TriggerResult onElement(Object element, long timestamp, W window, 
>>>>> TriggerContext ctx) throws Exception {
>>>>>   ReducingState count = ctx.getPartitionedState(stateDesc);
>>>>>   count.add(1L);
>>>>>   if (count.get() == maxCount) {
>>>>> return TriggerResult.FIRE_AND_PURGE;
>>>>>   } else if (count.get() > maxCount) {
>>>>> return TriggerResult.PURGE;
>>>>>   }
>>>>>   return TriggerResult.CONTINUE;
>>>>>
>>>>> }
>>>>>
>>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>>> because (I assume) there are some users with sessions windows that are not
>>>>> closed, and so the watermark for those keys is running behind and so the
>>>>> SlidingEventTimeWindow watermark is held back too.
>>>>>
>>>>> What I feel like I want to achieve is essentially setting the
>>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with
>>>>> lateness) of the input keys, rather than the minimum, but I cannot tell if
>>>>> this is possible, and if not, what another approach could be.
>>>>>
>>>>> Thanks,
>>>>> Padarn
>>>>>
>>>>


Re: Collapsing watermarks after keyby

2019-02-26 Thread Padarn Wilson
Just to add: by printing intermediate results I see that I definitely have
more than five minutes of data, and by windowing without the session
windows I see that event time watermarks do seem to be generated as
expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson  wrote:

> Hi Till,
>
> I will work on an example, but I’m a little confused by how keyBy and
> watermarks work in this case. This documentation says (
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
> ):
>
>
> Some operators consume multiple input streams; a union, for example, or
> operators following a *keyBy(…)*or *partition(…)* function. Such an
> operator’s current event time is the minimum of its input streams’ event
> times. As its input streams update their event times, so does the operator.
>
>
> This implies to me that the keyBy splits the watermark?
>
> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann 
> wrote:
>
>> Hi Padarn,
>>
>> Flink does not generate watermarks per keys. Atm watermarks are always
>> global. Therefore, I would suspect that it is rather a problem with
>> generating watermarks at all. Could it be that your input data does not
>> span a period longer than 5 minutes and also does not terminate? Another
>> problem could be the CountTrigger which should not react to the window's
>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>> I think this will cause the window to not fire. Maybe a working example
>> program with example input could be helpful for further debugging.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson  wrote:
>>
>>> Hi Flink Mailing List,
>>>
>>> Long story short - I want to somehow collapse watermarks at an operator
>>> across keys, so that keys with dragging watermarks do not drag behind.
>>> Details below:
>>>
>>> ---
>>>
>>> I have an application in which I want to perform the follow sequence of
>>> steps: Assume my data is made up of data that has: (time, user, location,
>>> action)
>>>
>>> -> Read source
>>> -> KeyBy (UserId, Location)
>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>> Session)
>>> -> TriggerOnFirst event
>>> -> KeyBy (Location)
>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>> -> Count
>>>
>>> The end intention is to count the number of unique users in a given
>>> location - the EventTimeSessionWindow is used to make sure users are only
>>> counted once.
>>>
>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>> has the following `TriggerResult" funtion:
>>>
>>> @Override
>>> public TriggerResult onElement(Object element, long timestamp, W window, 
>>> TriggerContext ctx) throws Exception {
>>>   ReducingState count = ctx.getPartitionedState(stateDesc);
>>>   count.add(1L);
>>>   if (count.get() == maxCount) {
>>> return TriggerResult.FIRE_AND_PURGE;
>>>   } else if (count.get() > maxCount) {
>>> return TriggerResult.PURGE;
>>>   }
>>>   return TriggerResult.CONTINUE;
>>>
>>> }
>>>
>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>> because (I assume) there are some users with sessions windows that are not
>>> closed, and so the watermark for those keys is running behind and so the
>>> SlidingEventTimeWindow watermark is held back too.
>>>
>>> What I feel like I want to achieve is essentially setting the watermark
>>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
>>> the input keys, rather than the minimum, but I cannot tell if this is
>>> possible, and if not, what another approach could be.
>>>
>>> Thanks,
>>> Padarn
>>>
>>


Collapsing watermarks after keyby

2019-02-25 Thread Padarn Wilson
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator
across keys, so that keys with dragging watermarks do not drag behind.
Details below:

---

I have an application in which I want to perform the follow sequence of
steps: Assume my data is made up of data that has: (time, user, location,
action)

-> Read source
-> KeyBy (UserId, Location)
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given
location - the EventTimeSessionWindow is used to make sure users are only
counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has
the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W
window, TriggerContext ctx) throws Exception {
  ReducingState count = ctx.getPartitionedState(stateDesc);
  count.add(1L);
  if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
  } else if (count.get() > maxCount) {
return TriggerResult.PURGE;
  }
  return TriggerResult.CONTINUE;

}

But my final SlidingEventTimeWindow does not fire properly. This is because
(I assume) there are some users with sessions windows that are not closed,
and so the watermark for those keys is running behind and so the
SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of
the SlidingEventTimeWindow operator to be the maximum (with lateness) of
the input keys, rather than the minimum, but I cannot tell if this is
possible, and if not, what another approach could be.

Thanks,
Padarn


Re: StreamingFileSink causing AmazonS3Exception

2019-02-21 Thread Padarn Wilson
Thanks Kostas!

On Mon, Feb 18, 2019 at 5:10 PM Kostas Kloudas 
wrote:

> Hi Padarn,
>
> This is the jira issue:  https://issues.apache.org/jira/browse/FLINK-11187
> and the fix, as you can see, was first included in version 1.7.2.
>
> Cheers,
> Kostas
>
> On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson 
> wrote:
>
>> Hi Addison, Kostas, Steffan,
>>
>> I am also encountering this exact issue. I cannot find a JIRA ticket on
>> this, is there some planned work on implementing a fix?
>>
>> @Addison - Did you manage to find a fix that you could apply without
>> modifying the Flink codebase? If possible it would be better not patch the
>> code base and compile a custom image.
>>
>> Thanks,
>> Padarn
>>
>> On Tue, Dec 18, 2018 at 5:37 AM Addison Higham 
>> wrote:
>>
>>> Oh this is timely!
>>>
>>> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
>>> feedback there for what I believe to be a confirmed bug)
>>>
>>>
>>> I was just about to open up a flink issue for this after digging
>>> (really) deep and figuring out the issue over the weekend.
>>>
>>> The problem arises due the flink hands input streams to the
>>> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
>>> this stack trace:
>>>
>>> 2018-12-17 05:55:46,546 DEBUG
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
>>> FYI: failed to reset content inputstream before throwing up
>>> java.io.IOException: Resetting to invalid mark
>>>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a

Re: StreamingFileSink causing AmazonS3Exception

2019-02-17 Thread Padarn Wilson
Hi Addison, Kostas, Steffan,

I am also encountering this exact issue. I cannot find a JIRA ticket on
this, is there some planned work on implementing a fix?

@Addison - Did you manage to find a fix that you could apply without
modifying the Flink codebase? If possible it would be better not patch the
code base and compile a custom image.

Thanks,
Padarn

On Tue, Dec 18, 2018 at 5:37 AM Addison Higham  wrote:

> Oh this is timely!
>
> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
> feedback there for what I believe to be a confirmed bug)
>
>
> I was just about to open up a flink issue for this after digging (really)
> deep and figuring out the issue over the weekend.
>
> The problem arises due the flink hands input streams to the
> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
> this stack trace:
>
> 2018-12-17 05:55:46,546 DEBUG
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
> FYI: failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
> From this, you can see that for (some reason) AWS fails to write a
> multi-part chunk and then tries to reset the input stream in order to retry
> but fails (because the InputStream 

Re: Clarification around versioning and flink's state serialization

2018-12-21 Thread Padarn Wilson
Thanks for the clarification, that is clear now.

Look forward to seeing your slides, safe travels.

On Sat, Dec 22, 2018 at 8:25 AM Tzu-Li (Gordon) Tai 
wrote:

> 1. Correct. Under the hood, evolvability of schema relies on the type's
> serializer implementation to support it.
> In Flink 1.7, this had been done only for Avro's Flink built-in serializer
> (i.e. the AvroSerializer class) for now, so you don't need to provide a
> custom serializer for this.
> For any other types, that would be required for now; again, how to
> implement a custom serializer that works for schema evolution is covered in
> the documents.
>
> 2. Yes, disabling generic types will let the job fail if any data type is
> determined to be serialized by Kryo, let it be for on-wire data
> transmission or for state serialization.
>
> I'm currently still traveling because of the recent Flink Forward event;
> will send you a copy of the latest slides I presented about the topic once
> I get back.
>
> Cheers,
> Gordon
>
> On Fri, Dec 21, 2018, 10:42 PM Padarn Wilson  wrote:
>
>> Yes that helps a lot!
>>
>> Just to clarify:
>> - If using Avro types in 1.7, no explicit declaration of serializers
>> needs to be done to have state evolution. But all other evolvable types
>> (e.g Protobuf) still need to be registered and evolved manually?
>> - If specifying `disableGenericTypes` on my execution context, anything
>> that falls back to Kryo will cause an error.
>>
>> Would love to see more updated slides if you don't mind.
>>
>> Thanks for taking the time,
>> Padarn
>>
>>
>> On Fri, Dec 21, 2018 at 10:04 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> For the documents I would recommend reading through:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>>>
>>>
>>> On Fri, Dec 21, 2018, 9:55 PM Tzu-Li (Gordon) Tai >> wrote:
>>>
>>>> Hi,
>>>>
>>>> Yes, if Flink does not recognize your registered state type, it will by
>>>> default use Kryo for the serialization.
>>>> And generally speaking, Kryo does not have good support for evolvable
>>>> schemas compared to other serialization frameworks such as Avro or 
>>>> Protobuf.
>>>>
>>>> The reason why Flink defaults to Kryo for unrecognizable types has some
>>>> historical reasons due to the original use of Flink's type serialization
>>>> stack being used on the batch side, but IMO the short answer is that it
>>>> would make sense to have a different default serializer (perhaps Avro) for
>>>> snapshotting state in streaming programs.
>>>> However, I believe this would be better suited as a separate discussion
>>>> thread.
>>>>
>>>> The good news is that with Flink 1.7, state schema evolution is fully
>>>> supported out of the box for Avro types, such as GenericRecord or code
>>>> generated SpecificRecords.
>>>> If you want to have evolvable schema for your state types, then it is
>>>> recommended to use Avro as state types.
>>>> Support for evolving schema of other data types such as POJOs and Scala
>>>> case classes is also on the radar for future releases.
>>>>
>>>> Does this help answer your question?
>>>>
>>>> By the way, the slides your are looking at I would consider quite
>>>> outdated for the topic, since Flink 1.7 was released with much smoother
>>>> support for state schema evolution.
>>>> An updated version of the slides is not yet publicly available, but if
>>>> you want I can send you one privately.
>>>> Otherwise, the Flink docs for 1.7 would also be equally helpful.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On Fri, Dec 21, 2018, 8:11 PM Padarn Wilson >>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am trying to understand the situation with state serialization in
>>>>> flink. I'm looking at a number of sources, but slide 35 from here
>>>>> crystalizes my confusion:
>>>>>
>>>>>
>>>>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>>>>
>>>>> So, I understand that if 'Flink's own serialization stack' is unable
>>>>> to serialize a type you define, then it will fall back on Kryo generics. 
>>>>>

Re: Clarification around versioning and flink's state serialization

2018-12-21 Thread Padarn Wilson
Yes that helps a lot!

Just to clarify:
- If using Avro types in 1.7, no explicit declaration of serializers needs
to be done to have state evolution. But all other evolvable types (e.g
Protobuf) still need to be registered and evolved manually?
- If specifying `disableGenericTypes` on my execution context, anything
that falls back to Kryo will cause an error.

Would love to see more updated slides if you don't mind.

Thanks for taking the time,
Padarn


On Fri, Dec 21, 2018 at 10:04 PM Tzu-Li (Gordon) Tai 
wrote:

> For the documents I would recommend reading through:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>
>
> On Fri, Dec 21, 2018, 9:55 PM Tzu-Li (Gordon) Tai  wrote:
>
>> Hi,
>>
>> Yes, if Flink does not recognize your registered state type, it will by
>> default use Kryo for the serialization.
>> And generally speaking, Kryo does not have good support for evolvable
>> schemas compared to other serialization frameworks such as Avro or Protobuf.
>>
>> The reason why Flink defaults to Kryo for unrecognizable types has some
>> historical reasons due to the original use of Flink's type serialization
>> stack being used on the batch side, but IMO the short answer is that it
>> would make sense to have a different default serializer (perhaps Avro) for
>> snapshotting state in streaming programs.
>> However, I believe this would be better suited as a separate discussion
>> thread.
>>
>> The good news is that with Flink 1.7, state schema evolution is fully
>> supported out of the box for Avro types, such as GenericRecord or code
>> generated SpecificRecords.
>> If you want to have evolvable schema for your state types, then it is
>> recommended to use Avro as state types.
>> Support for evolving schema of other data types such as POJOs and Scala
>> case classes is also on the radar for future releases.
>>
>> Does this help answer your question?
>>
>> By the way, the slides your are looking at I would consider quite
>> outdated for the topic, since Flink 1.7 was released with much smoother
>> support for state schema evolution.
>> An updated version of the slides is not yet publicly available, but if
>> you want I can send you one privately.
>> Otherwise, the Flink docs for 1.7 would also be equally helpful.
>>
>> Cheers,
>> Gordon
>>
>>
>> On Fri, Dec 21, 2018, 8:11 PM Padarn Wilson > wrote:
>>
>>> Hi all,
>>>
>>> I am trying to understand the situation with state serialization in
>>> flink. I'm looking at a number of sources, but slide 35 from here
>>> crystalizes my confusion:
>>>
>>>
>>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>>
>>> So, I understand that if 'Flink's own serialization stack' is unable to
>>> serialize a type you define, then it will fall back on Kryo generics. In
>>> this case, I believe what I'm being told is that state compatibility is
>>> difficult to ensure, and schema evolution in your jobs is not possible.
>>>
>>> However on this slide, they say
>>> "
>>>Kryo is generally not  recommended ...
>>>
>>>Serialization frameworks with schema evolution support is
>>> recommended: Avro, Thrift
>>> "
>>> So is this implying that Flink's non-default serialization stack does
>>> not support schema evolution? In this case is it best practice to register
>>> custom serializers whenever possible.
>>>
>>> Thanks
>>>
>>>
>>> *Grab is hiring. Learn more at **https://grab.careers
>>> <https://grab.careers/>*
>>>
>>> By communicating with Grab Inc and/or its subsidiaries, associate
>>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>>> have consented to processing of your personal data as set out in the
>>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>>
>>> This email contains confidential information and is only for the
>>> intended recipient(s). If you are not the intended recipient(s), please do
>>> not disseminate, distribute or copy this email and notify Grab Group
>>> immediately if you have received this by mistake and delete this email from
>>> your system. Email transmission cannot be guaranteed to be secure or
>>> error-free as any information therein could be intercepted, corrupted,
>>> lost, destroyed, delayed or incomplete, or conta

Clarification around versioning and flink's state serialization

2018-12-21 Thread Padarn Wilson
Hi all,

I am trying to understand the situation with state serialization in flink.
I'm looking at a number of sources, but slide 35 from here crystalizes my
confusion:

https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink

So, I understand that if 'Flink's own serialization stack' is unable to
serialize a type you define, then it will fall back on Kryo generics. In
this case, I believe what I'm being told is that state compatibility is
difficult to ensure, and schema evolution in your jobs is not possible.

However on this slide, they say
"
   Kryo is generally not  recommended ...

   Serialization frameworks with schema evolution support is
recommended: Avro, Thrift
"
So is this implying that Flink's non-default serialization stack does not
support schema evolution? In this case is it best practice to register
custom serializers whenever possible.

Thanks

-- 
_Grab is hiring. Learn more at *https://grab.careers 
*_


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to processing of your personal 
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email and notify Grab Group immediately if you have received this 
by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.



Re: Checkpointing when reading from files?

2018-05-27 Thread Padarn Wilson
I'm a bit confused about this too actually. I think the above would work as
a solution if you want to continuously monitor a directory, but for a
"PROCESS_ONCE" readFile source I don't think you will get a checkpoint
emitted indicating the end of the stream.

My understanding of this is that there can be no checkpoints created
while the file directory

Trying to dig into the java code I found this:

case PROCESS_ONCE:
   synchronized (checkpointLock) {

  // the following check guarantees that if we restart
  // after a failure and we managed to have a successful
  // checkpoint, we will not reprocess the directory.

  if (globalModificationTime == Long.MIN_VALUE) {
 monitorDirAndForwardSplits(fileSystem, context);
 globalModificationTime = Long.MAX_VALUE;
  }
  isRunning = false;
   }
   break;

My understanding of this is that there can be no checkpoints created
while the file directory is read, and then once it is read the
isRunning flat is set to false, which means no new checkpoints are
emitted.

Is this correct? If so, is it possible to somehow force a checkpoint
to be emitted on the completion of the source?



On Tue, May 22, 2018 at 3:24 AM Amit Jain  wrote:

> Hi Alex,
>
> StreamingExecutionEnvironment#readFile is a helper function to create
> file reader data streaming source. It uses
> ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
> internally.
>
> As both file reader operator and monitoring function uses
> checkpointing so is readFile [1], you can go with first approach.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation-
>
>
> --
> Thanks,
> Amit
>
>
> On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI  wrote:
> > I want to add checkpointing to my program that reads from a set of files
> in
> > a directory. Without checkpointing I use readFile():
> >
> >
> >
> >   DataStream text = env.readFile(
> >
> >new TextInputFormat(new Path(inputPath)),
> >
> >inputPath,
> >
> >   inputProcessingMode,
> >
> >   1000);
> >
> >
> >
> > Should I use ContinuousFileMonitoringFunction /
> ContinuousFileReaderOperator
> > to add checkpointing? Or is there an easier way?
> >
> >
> >
> > How do I go from splits (that ContinuousFileMonitoringFunction provides)
> to
> > actual strings? I’m not clear how ContinuousFileReaderOperator can be
> used.
> >
> >
> >
> >   DataStreamSource split =
> > env.addSource(
> >
> >new ContinuousFileMonitoringFunction(
> >
> >  new TextInputFormat(new
> > Path(inputPath)),
> >
> >  inputProcessingMode,
> >
> >  1,
> >
> >  1000)
> >
> >   );
> >
> >
> >
> > Thanks,
> > Alex
>


AvroInputFormat Serialisation Issue

2018-05-14 Thread Padarn Wilson
Hi all - sorry this seems like a silly question, but I can't figure it out.

I'm using an AvroInputFormat in order to read an Avro file like this:

val textInputFormat = new AvroInputFormat[GenericRecord](infile,
classOf[GenericRecord])
val lines = env.readFile(textInputFormat, path)

This works fine in local mode, but when submitted to a flink cluster I get
serialisation errors that look like this:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$StringSchema
Serialization trace:
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
elementType (org.apache.avro.Schema$ArraySchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
... 7 more
Caused by: java.lang.IllegalAccessException: Class
com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a
member of class org.apache.avro.Schema$StringSchema with modifiers
"public"
at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
at 

Batch writing from Flink streaming job

2018-05-13 Thread Padarn Wilson
Hi all,

I am writing some some jobs intended to run using the DataStream API using
a Kafka source. However we also have a lot of data in Avro archives (of the
same Kafka source). I would like to be able to run the processing code over
parts of the archive so I can generate some "example output".

I've written the transformations needed to read the data from the archives
and process the data, but now I'm trying to figure out the best way to
write the results of this to some storage.

At the moment I can easily write to Json or CSV using the bucketing sink
(although I'm curious about using the watermark time rather than system
time to name the buckets), but I'd really like to store to something
smaller like Avro.

However I'm not sure this make sense. Writing to a compressed file format
in this way from a streaming job doesn't sound intuitively right. What
would make the most sense. I could write to some temporary database and
then pipe that into an archive, but this seems like a lot of trouble. Is
there a way to pipe the output directly into the batch API of flink?

Thanks


Window over events defined by a time range

2018-05-03 Thread Padarn Wilson
Hi all,

I'm trying to figure out what the "Flink" way of achieving what I'd like to
is.

Imagine I have the following three events in my stream

event1: {"start_time": 0, "end_time": 1, "value": "a"}
event2: {"start_time": 0, "end_time": 2, "value": "b"}
event3: {"start_time": 1, "end_time": 2, "value": "c"}

>From this I would like to create tumbling windows of length 1, that collect
all the events which overlap that window and make a list from the "values"
seen, so the result would be two windows

window1 [0, 1] = {"a", "b"}
window2 [1, 2] ={"b", "c"}

However my understanding is that because my original stream only has three
events, no matter how I create windows, I cannot have event 2 in both
windows.

I can think of how this could be done by mapping each event into a separate
event for the windows that it should fall into, i.e we split event 2 into

event2a: {"start_time": 0, "end_time": 1, "value": "b"}
event2b: {"start_time": 1, "end_time": 2, "value": "b"}

But this seems awfully cumbersome when the logic is more complicated.

Is there a natural way to do this window overlapping windows?

-- 
_Grab is hiring. Learn more at *https://grab.careers 
*_


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to processing of your personal 
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email and notify Grab Group immediately if you have received this 
by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.