Re: Trigger Firing for Late Window Elements

2018-10-20 Thread Scott Kidder
That makes sense, thank you, Hequn. I can see the tradeoff between using
allowedLateness on a window to trigger multiple firings, versus a window
with a watermark lagging some amount of time (e.g. 3 hours) that has only a
single firing.

Thanks again,

--
Scott Kidder

On Fri, Oct 19, 2018 at 7:51 PM Hequn Cheng  wrote:

> Hi Scott,
>
> Yes, the window trigger firing for every single late element.
>
> If you only want the window to be triggered once, you can:
> - Remove the allowedLateness()
> - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that
> lag behind the element.
>
> The code(scala) looks like:
>
>> class TimestampExtractor[T1, T2]
>>   extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](
>> Time.hours(3))  {
>>   override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
>> element._3.getTime
>>   }
>> }
>
>
> Pay attention to that this will increase the latency since only trigger
> firing for the last element.
>
> Best, Hequn
>
> On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder 
> wrote:
>
>> I'm using event-time windows of 1 hour that have an allowed lateness of
>> several hours. This supports the processing of access logs that can be
>> delayed by several hours. The windows aggregate data over the 1 hour period
>> and write to a database sink. Pretty straightforward.
>>
>> Will the event-time trigger lead to the window trigger firing for every
>> single late element? Suppose thousands of late elements arrive
>> simultaneously; I'd like to avoid having that lead to thousands of database
>> updates in a short period of time. Ideally, I could batch up the late
>> window changes and have it trigger when the window is finally closed or
>> some processing-time duration passes (e.g. once per minute).
>>
>> For reference, here's what the aggregate window definition looks like
>> with Flink 1.5.3:
>>
>> chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
>> .timeWindow(Time.hours(1))
>> .allowedLateness(Time.hours(3))
>> .aggregate(new EnvironmentAggregateWatchTimeFunction())
>> .uid("env-watchtime-stats")
>> .name("Env Watch-Time Stats")
>> .addSink(new EnvironmentWatchTimeDBSink());
>>
>>
>> Thank you,
>>
>> --
>> Scott Kidder
>>
>


Trigger Firing for Late Window Elements

2018-10-19 Thread Scott Kidder
I'm using event-time windows of 1 hour that have an allowed lateness of
several hours. This supports the processing of access logs that can be
delayed by several hours. The windows aggregate data over the 1 hour period
and write to a database sink. Pretty straightforward.

Will the event-time trigger lead to the window trigger firing for every
single late element? Suppose thousands of late elements arrive
simultaneously; I'd like to avoid having that lead to thousands of database
updates in a short period of time. Ideally, I could batch up the late
window changes and have it trigger when the window is finally closed or
some processing-time duration passes (e.g. once per minute).

For reference, here's what the aggregate window definition looks like with
Flink 1.5.3:

chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(3))
.aggregate(new EnvironmentAggregateWatchTimeFunction())
.uid("env-watchtime-stats")
.name("Env Watch-Time Stats")
.addSink(new EnvironmentWatchTimeDBSink());


Thank you,

--
Scott Kidder


Re: Committing Kafka Transactions during Savepoint

2018-07-27 Thread Scott Kidder
Thank you, Aljoscha! Are Kafka transactions committed when a running job
has been instructed to cancel with a savepoint (e.g. `flink cancel -s
`)? This is my primary use for savepoints. I would expect that when a
new job is submitted with the savepoint, as in the case of an application
upgrade, Flink withl create a new Kafka transaction and processing will be
exactly-once.

--Scott Kidder

On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek 
wrote:

> Hi,
>
> this has been in the back of my head for a while now. I finally created a
> Jira issue: https://issues.apache.org/jira/browse/FLINK-9983
>
> In there, I also outline a better fix that will take a bit longer to
> implement.
>
> Best,
> Aljoscha
>
> On 26. Jul 2018, at 23:04, Scott Kidder  wrote:
>
> I recently began using the exactly-once processing semantic with the Kafka
> 0.11 producer in Flink 1.4.2. It's been working great!
>
> Are Kafka transactions committed when creating a Flink savepoint? How does
> this affect the recovery behavior in Flink if, before the completion of the
> next checkpoint, the application is restarted and restores from a
> checkpoint taken before the savepoint? It seems like this might lead to the
> Kafka producer writing a message multiple times with different committed
> Kafka transactions.
>
> --
> Scott Kidder
>
>
>


Committing Kafka Transactions during Savepoint

2018-07-26 Thread Scott Kidder
I recently began using the exactly-once processing semantic with the Kafka
0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does
this affect the recovery behavior in Flink if, before the completion of the
next checkpoint, the application is restarted and restores from a
checkpoint taken before the savepoint? It seems like this might lead to the
Kafka producer writing a message multiple times with different committed
Kafka transactions.

--
Scott Kidder


Re: FW: high availability with automated disaster recovery using zookeeper

2018-07-16 Thread Scott Kidder
Hi Tovi, we run all services (Flink, Zookeeper, Hadoop HDFS, and Consul) in
a Kubernetes cluster in each data center. Kubernetes will automatically
restart/reschedule any services that crash or become unhealthy. This is a
little outside the scope of Flink, and I'd be happy to discuss it further
off-list.

Best,

--Scott Kidder

On Mon, Jul 16, 2018 at 5:11 AM Sofer, Tovi  wrote:

> Thank you Scott,
>
> Looks like a very elegant solution.
>
>
>
> How did you manage high availability in single data center?
>
>
>
> Thanks,
>
> Tovi
>
>
>
> *From:* Scott Kidder 
> *Sent:* יום ו 13 יולי 2018 01:13
> *To:* Sofer, Tovi [ICG-IT] 
> *Cc:* user@flink.apache.org
> *Subject:* Re: high availability with automated disaster recovery using
> zookeeper
>
>
>
> I've used a multi-datacenter Consul cluster used to coordinate
> service-discovery. When a service starts up in the primary DC, it registers
> itself in Consul with a key that has a TTL that must be periodically
> renewed. If the service shuts down or terminates abruptly, the key expires
> and is removed from Consul. A standby service in another DC can be started
> automatically after detecting the absence of the key in Consul in the
> primary DC. This could lead to submitting a job to the standby Flink
> cluster from the most recent savepoint that was copied by the offline
> process you mentioned. It should be pretty easy to automate all of this. I
> would not recommend setting up a multi-datacenter Zookeeper cluster; in my
> experience, Consul is much easier to work with.
>
>
>
> Best,
>
>
>
> --
>
> Scott Kidder
>
>
>
>
>
> On Mon, Jul 9, 2018 at 4:48 AM Sofer, Tovi  wrote:
>
> Hi all,
>
>
>
> We are now examining how to achieve high availability for Flink, and to
> support also automatic recovery in disaster scenario- when all DC goes down.
>
> We have DC1 which we usually want work to be done, and DC2 – which is more
> remote and we want work to go there only when DC1 is down.
>
>
>
> We examined few options and would be glad to hear feedback a suggestion
> for another way to achieve this.
>
> · Two zookeeper separate zookeeper and flink clusters on the two
> data centers.
>
> Only the cluster on DC1 are running, and state is copied to DC2 in offline
> process.
>
> To achieve automatic recovery we need to use some king of watch dog which
> will check DC1 availability , and if it is down will start DC2 (and same
> later if DC2 is down).
>
> Is there recommended tool for this?
>
> · Zookeeper “stretch cluster” cross data centers – with 2 nodes
> on DC1, 2 nodes on DC2 and one observer node.
>
> Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.
>
> This way when DC1 is down, zookeeper will notice this automatically and
> will transfer work to jobmanager2 on DC2.
>
> However we would like zookeeper leader, and flink jobmanager leader
> (primary one) to be from DC1 – unless it is down.
>
> Is there a way to achieve this?
>
>
>
> Thanks and regards,
>
> [image: citi_logo_mail]
>
> *Tovi Sofer*
>
> Software Engineer
> +972 (3) 7405756
>
> [image: Mail_signature_blue]
>
>
>
>


Re: high availability with automated disaster recovery using zookeeper

2018-07-12 Thread Scott Kidder
I've used a multi-datacenter Consul cluster used to coordinate
service-discovery. When a service starts up in the primary DC, it registers
itself in Consul with a key that has a TTL that must be periodically
renewed. If the service shuts down or terminates abruptly, the key expires
and is removed from Consul. A standby service in another DC can be started
automatically after detecting the absence of the key in Consul in the
primary DC. This could lead to submitting a job to the standby Flink
cluster from the most recent savepoint that was copied by the offline
process you mentioned. It should be pretty easy to automate all of this. I
would not recommend setting up a multi-datacenter Zookeeper cluster; in my
experience, Consul is much easier to work with.

Best,

--
Scott Kidder


On Mon, Jul 9, 2018 at 4:48 AM Sofer, Tovi  wrote:

> Hi all,
>
>
>
> We are now examining how to achieve high availability for Flink, and to
> support also automatic recovery in disaster scenario- when all DC goes down.
>
> We have DC1 which we usually want work to be done, and DC2 – which is more
> remote and we want work to go there only when DC1 is down.
>
>
>
> We examined few options and would be glad to hear feedback a suggestion
> for another way to achieve this.
>
> · Two zookeeper separate zookeeper and flink clusters on the two
> data centers.
>
> Only the cluster on DC1 are running, and state is copied to DC2 in offline
> process.
>
> To achieve automatic recovery we need to use some king of watch dog which
> will check DC1 availability , and if it is down will start DC2 (and same
> later if DC2 is down).
>
> Is there recommended tool for this?
>
> · Zookeeper “stretch cluster” cross data centers – with 2 nodes
> on DC1, 2 nodes on DC2 and one observer node.
>
> Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.
>
> This way when DC1 is down, zookeeper will notice this automatically and
> will transfer work to jobmanager2 on DC2.
>
> However we would like zookeeper leader, and flink jobmanager leader
> (primary one) to be from DC1 – unless it is down.
>
> Is there a way to achieve this?
>
>
>
> Thanks and regards,
>
> [image: citi_logo_mail]
>
> *Tovi Sofer*
>
> Software Engineer
> +972 (3) 7405756
>
> [image: Mail_signature_blue]
>
>
>


Re: Processing-Time Timers and Checkpoint State

2017-01-09 Thread Scott Kidder
I too was able to confirm that the issue was fixed in the 1.2
release-candidate 0 (zero) tag. I was unable to get it working with 1.1.4,
but with 1.2 it worked without any additional modifications.

Best,

--Scott Kidder

On Mon, Jan 9, 2017 at 7:03 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> I just verified that this does indeed work on master and the Flink 1.2
> branch. I'm also pushing a test that verifies this.
>
> Stephan is right that this doesn't work on Flink 1.1? Is that critical for
> you, if yes, then we should think about pushing out another Flink 1.x
> bugfix release.
>
> Cheers,
> Aljoscha
>
> On Fri, 30 Dec 2016 at 19:05 Scott Kidder <kidder.sc...@gmail.com> wrote:
>
>> Excellent, thank you Stephan!
>>
>> On Fri, Dec 30, 2016 at 6:58 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>> Hi!
>>
>> Yes, the processing time timers are persisted in the checkpoint and
>> restored on recovery. If a timer is in the past, it should fire immediately
>> after recovery.
>>
>> I think there may be a bug in the 1.1 branch that requires at least one
>> more time to be registered before the restored timers fire. 1.2 should
>> behave well.
>>
>> The issue for the bug: https://issues.apache.org/jira/browse/FLINK-4651
>>
>> + Aljoscha and Kostas - for more details
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Dec 29, 2016 at 10:36 PM, Scott Kidder <kidder.sc...@gmail.com>
>> wrote:
>>
>> Are processing-time timers retained in checkpoint & savepoint state and
>> run if a job is restarted? It's possible that the time at which timer was
>> intended to trigger at passes while the job is in the middle of being
>> restarted. Are processing-time timers persisted at all?
>>
>> Thanks!
>>
>> --Scott Kidder
>>
>>
>>
>>


Processing-Time Timers and Checkpoint State

2016-12-29 Thread Scott Kidder
Are processing-time timers retained in checkpoint & savepoint state and run
if a job is restarted? It's possible that the time at which timer was
intended to trigger at passes while the job is in the middle of being
restarted. Are processing-time timers persisted at all?

Thanks!

--Scott Kidder


Re: Alert on state change.

2016-12-19 Thread Scott Kidder
Hi Rudra,

You could accomplish this with a rolling-fold on the stream of stock
prices. The accumulator argument to the fold can track the last price that
triggered an alert and the timestamp of the alert. When evaluating a new
stock price it can compare the the price against the last one that
triggered an alert. If it meets your alerting criteria, then send an alert
and update the accumulator state to reflect the most recent stock price &
timestamp.

Best,

--Scott Kidder



On Mon, Dec 19, 2016 at 7:15 AM, Rudra Tripathy <rudra...@gmail.com> wrote:

> Hi All,
>
> I have a use case where I am monitoring price change.
>
> Lets s say the original price is $100
> in case of 20% rise , send the alert.
>
> In the stream I am getting updated prices.
>
> If in the next data $200 comes, send the alerts.
>
> Next I am getting 230 I would keep it but no alert
> . When I would get 240., I would send the alert.
>
> Is it possible to achieve using Flink.
>
> Thanks in advance,
> Rudra
>
>


Re: S3 checkpointing in AWS in Frankfurt

2016-11-23 Thread Scott Kidder
Hi Jonathan,

You might be better off creating a small Hadoop HDFS cluster just for the
purpose of storing Flink checkpoint & savepoint data. Like you, I tried
using S3 to persist Flink state, but encountered AWS SDK issues and felt
like I was going down an ill-advised path. I then created a small 3-node
HDFS cluster in the same region as my Flink hosts but distributed across 3
AZs. The checkpointing is very fast and, most importantly, just works.

Is there a firm requirement to use S3, or could you use HDFS instead?

Best,

--Scott Kidder

On Tue, Nov 22, 2016 at 11:52 PM, Jonathan Share <jon.sh...@gmail.com>
wrote:

> Hi,
>
> I'm interested in hearing if anyone else has experience with using Amazon
> S3 as a state backend in the Frankfurt region. For political reasons we've
> been asked to keep all European data in Amazon's Frankfurt region. This
> causes a problem as the S3 endpoint in Frankfurt requires the use of AWS
> Signature Version 4 "This new Region supports only Signature Version 4"
> [1] and this doesn't appear to work with the Hadoop version that Flink is
> built against [2].
>
> After some hacking we have managed to create a docker image with a build
> of Flink 1.2 master, copying over jar files from the hadoop
> 3.0.0-alpha1 package and this appears to work, for the most part but we
> still suffer from some classpath problems (conflicts between AWS API used
> in hadoop and those we want to use in out streams for interacting with
> Kinesis) and the whole thing feels a little fragile. Has anyone else tried
> this? Is there a simpler solution?
>
> As a follow-up question, we saw that with checkpointing on three
> relatively simple streams set to 1 second, our S3 costs were higher than
> the EC2 costs for our entire infrastructure. This seems slightly
> disproportionate. For now we have reduced checkpointing interval to 10
> seconds and that has greatly improved the cost projections graphed via
> Amazon Cloud Watch, but I'm interested in hearing other peoples experience
> with this. Is that the kind of billing level we can expect or is this a
> symptom of a mis-configuration? Is this a setup others are using? As we are
> using Kinesis as the source for all streams I don't see a huge risk with
> larger checkpoint intervals and our Sinks are designed to mostly tolerate
> duplicates (some improvements can be made).
>
> Thanks in advance
> Jonathan
>
>
> [1] https://aws.amazon.com/blogs/aws/aws-region-germany/
> [2] https://issues.apache.org/jira/browse/HADOOP-13324
>


Creating Job Savepoints at Regular Intervals

2016-11-22 Thread Scott Kidder
I'd like to create job savepoints at regular intervals to be used in the
event of a total job failure where it's not possible to restore from a
checkpoint. I'm aware that automatic savepoints are planned as part of
FLIP-10, but I need something more immediate (using Flink 1.1.3).

I'm curious how other Flink users are doing this. CRON job on the active
job-manager? Mostly just looking for a sanity-check.

Thanks!

--Scott Kidder


Re: Proper way of adding external jars

2016-11-14 Thread Scott Kidder
Hi Gyula,

I've typically added external library dependencies to my own application
JAR as shaded-dependencies. This ensures that all dependencies are included
with my application while being distributed to Flink Job Manager & Task
Manager instances.

Another approach is to place these external JARs in the 'lib' sub-directory
of your Flink installation. Keep in mind that the external JARs must be
installed on every Flink node where your application is expected to run.
This works well for dependencies that are large in size or used by multiple
Flink applications in your cluster (avoid duplication of dependencies).

Best,

--Scott Kidder

On Mon, Nov 14, 2016 at 7:59 AM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi,
>
> I have been trying to use the -C flag to add external jars with user code
> and I have observed some strange behaviour.
>
> What I am trying to do is the following:
> I have 2 jars, JarWithMain.jar contains the main class and UserJar.jar
> contains some classes that the main method will eventually execute and also
> depends on classes from JarWithMain.
>
> Running this works:
> flink run  -C UserJar.jar -c MainMethod JarWithMain.jar args...
>
> Running this leads to no class def found errors in the StreamTask
> initialization where it reads the functions from the config:
> flink run  -C JarWithMain.jar -c MainMethod UserJar.jar  args...
>
> Did I miss something?
>
> Cheers,
> Gyula
>


Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Scott Kidder
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good
results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:
https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039cb4d37518859e159b32

It might be worth testing with newer AWS SDK & KCL libraries to see if the
problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote:

> Hi Gordon,
>
> Thanks for the fast reply!
> You're right about the expired iterator exception occurring just before
> each spike. I can't see any signs of long GC on the task managers... CPU
> has been <15% the whole time when the spikes were taking place and I can't
> see anything unusual in the task manager logs.
>
> But actually I just noticed that the Flink UI showed no successful
> checkpoints during the time of the problem even though my checkpoint
> interval is 15 minutes. So I guess this is probably some kind of Flink
> problem rather than a problem with the Kinesis consumer. Unfortunately I
> can't find anything useful in the logs so not sure what happened!
>
> Josh
>
>
>
> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Josh,
>>
>> That warning message was added as part of FLINK-4514. It pops out
>> whenever a shard iterator was used after 5 minutes it was returned from
>> Kinesis.
>> The only time spent between after a shard iterator was returned and
>> before it was used to fetch the next batch of records, is on deserializing
>> and emitting of the records of the last fetched batch.
>> So unless processing of the last fetched batch took over 5 minutes, this
>> normally shouldn’t happen.
>>
>> Have you noticed any sign of long, constant full GC for your Flink task
>> managers? From your description and check in code, the only possible guess
>> I can come up with now is that
>> the source tasks completely seized to be running for a period of time,
>> and when it came back, the shard iterator was unexpectedly found to be
>> expired. According to the graph you attached,
>> when the iterator was refreshed and tasks successfully fetched a few more
>> batches, the source tasks again halted, and so on.
>> So you should see that same warning message right before every small peak
>> within the graph.
>>
>> Best Regards,
>> Gordon
>>
>>
>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>
>> Hey Gordon,
>>
>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>> with no problems, but yesterday the Kinesis consumer started behaving
>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>> however the Flink Kinesis consumer started to stop consuming for periods of
>> time (see the spikes in graph attached which shows data consumed by the
>> Flink Kinesis consumer)
>>
>> Looking in the task manager logs, there are no exceptions however there
>> is this log message which I believe is related to the problem:
>>
>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>> expired iterator AAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmu
>> wY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex
>> 3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard
>> KinesisStreamShard{streamName='stream001', shard='{ShardId:
>> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>> 85070511730234615865841151857942042863},SequenceNumberRange:
>> {StartingSequenceNumber: 495665429169236488921642479266
>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>
>> Having restarted the job from my last savepoint, it's consuming the
>> stream fine again with no problems.
>>
>> Do you have any idea what might be causing this, or anything I should do
>> to investigate further?
>>
>> Cheers,
>>
>> Josh
>>
>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>>> Hi Steffen,
>>>
>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
>>> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
>>> noticing this!).
>>> The Flink community is going