Re: What is the best way to load/add patterns dynamically (at runtime) with Flink?

2016-11-03 Thread PedroMrChaves
Hi,

Thank you for the response.

Can you give me an example?
I'm new to Flink and I still don't understand all the constructs.

I also read this article
https://techblog.king.com/rbea-scalable-real-time-analytics-king/. They use
a similar approach, but am still not understanding how assign windows.

Regards,
Pedro Chaves



On Thu, Nov 3, 2016 at 6:02 PM, Aljoscha Krettek [via Apache Flink User
Mailing List archive.]  wrote:

> Hi Pedro,
> you can have dynamic windows by assigning the windows to elements in your
> Processor (so you would need to extend that type to have a field for the
> window). Then, you can write a custom WindowAssigner that will simply get
> the window from an event and assign that as the internal window.
>
> Please let me know if you need more details.
>
> Cheers,
> Aljoscha
>
> On Thu, 3 Nov 2016 at 18:40 PedroMrChaves <[hidden email]
> > wrote:
>
>> Hello,
>>
>> Your tip was very helpful and I took a similar approach.
>>
>> I have something like this:
>> class Processor extends RichCoFlatMapFunction {
>> public void flatMap1(Event event, Collector out) {
>>  process(event,out); // run the javscript (rules)  against the
>> incoming events
>> }
>>
>> public void flatMap2(Rule rule , Collector out) {
>>   // We add the rule to the list of existing rules
>>   addNewRule(rule)
>> }
>> }
>>
>> But know I face a new challenge, I don't have access to the windowed
>> constructs of flink and I can't dynamically create new window aggregations
>> inside the flatMap. At least not that I know of.
>>
>> Did you face a similar problem? Any Ideas?
>>
>> Thank you and regards,
>> Pedro Chaves
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/What-is-the-
>> best-way-to-load-add-patterns-dynamically-at-runtime-with-
>> Flink-tp9461p9876.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/What-is-the-best-way-to-load-add-patterns-
> dynamically-at-runtime-with-Flink-tp9461p9882.html
> To unsubscribe from What is the best way to load/add patterns dynamically
> (at runtime) with Flink?, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461p9891.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Release Process

2016-11-03 Thread Fabian Hueske
Hi Dominik,

the discussion about the 1.2 release was started on the dev mailing list
[1] about 2 weeks ago.

So far the proposed timeline is have a release in mid December.

Best, Fabian

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Schedule-and-Scope-for-Flink-1-2-tp14062.html

2016-11-03 22:50 GMT+01:00 Dominik Bruhn :

> Hey everyone,
> about three month ago, I made a PR [1] to the flink github project
> containing a small change for the RabbitMQ source. This PR was merged and
> the code is in the master.
>
> But: This code never made it into a release. In JIRA [2], it is meant to
> be released with 1.2. How is the policy here? When can I expect to see this
> in the a official release? How are the rules here?
>
> Thanks for a clarification,
> Dominik
>
> [1]: https://github.com/apache/flink/pull/2373
> [2]: https://issues.apache.org/jira/browse/FLINK-4394
>


Release Process

2016-11-03 Thread Dominik Bruhn

Hey everyone,
about three month ago, I made a PR [1] to the flink github project 
containing a small change for the RabbitMQ source. This PR was merged 
and the code is in the master.


But: This code never made it into a release. In JIRA [2], it is meant to 
be released with 1.2. How is the policy here? When can I expect to see 
this in the a official release? How are the rules here?


Thanks for a clarification,
Dominik

[1]: https://github.com/apache/flink/pull/2373
[2]: https://issues.apache.org/jira/browse/FLINK-4394


Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-03 Thread Anchit Jatana
Hi Maximilian,

Thanks for you response. Since, I'm running the application on YARN cluster
using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command.
Is there anything more that I need to configure apart from setting up
'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.

Just wished to confirm if there is anything more that I need to configure to
set up HA on 'yarn-cluster' mode.

Thank you

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Stephan Ewen
Is it possible that you have stalls in your topology?

Reasons could be:

  - The data sink blocks or becomes slow for some periods (where are you
sending the data to?)

  - If you are using large state and a state backend that only supports
synchronous checkpointing, there may be a delay introduced by the checkpoint


On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder  wrote:

> Hi Steffan & Josh,
>
> For what it's worth, I've been using the Kinesis connector with very good
> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
> and AWS SDK dependencies to the following versions:
>
> aws.sdk.version: 1.11.34
> aws.kinesis-kcl.version: 1.7.0
>
> My customizations are visible in this commit on my fork:
> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039cb4d37518
> 859e159b32
>
> It might be worth testing with newer AWS SDK & KCL libraries to see if the
> problem persists.
>
> Best,
>
> --Scott Kidder
>
>
> On Thu, Nov 3, 2016 at 7:08 AM, Josh  wrote:
>
>> Hi Gordon,
>>
>> Thanks for the fast reply!
>> You're right about the expired iterator exception occurring just before
>> each spike. I can't see any signs of long GC on the task managers... CPU
>> has been <15% the whole time when the spikes were taking place and I can't
>> see anything unusual in the task manager logs.
>>
>> But actually I just noticed that the Flink UI showed no successful
>> checkpoints during the time of the problem even though my checkpoint
>> interval is 15 minutes. So I guess this is probably some kind of Flink
>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>> can't find anything useful in the logs so not sure what happened!
>>
>> Josh
>>
>>
>>
>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai > > wrote:
>>
>>> Hi Josh,
>>>
>>> That warning message was added as part of FLINK-4514. It pops out
>>> whenever a shard iterator was used after 5 minutes it was returned from
>>> Kinesis.
>>> The only time spent between after a shard iterator was returned and
>>> before it was used to fetch the next batch of records, is on deserializing
>>> and emitting of the records of the last fetched batch.
>>> So unless processing of the last fetched batch took over 5 minutes, this
>>> normally shouldn’t happen.
>>>
>>> Have you noticed any sign of long, constant full GC for your Flink task
>>> managers? From your description and check in code, the only possible guess
>>> I can come up with now is that
>>> the source tasks completely seized to be running for a period of time,
>>> and when it came back, the shard iterator was unexpectedly found to be
>>> expired. According to the graph you attached,
>>> when the iterator was refreshed and tasks successfully fetched a few
>>> more batches, the source tasks again halted, and so on.
>>> So you should see that same warning message right before every small
>>> peak within the graph.
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>>
>>> Hey Gordon,
>>>
>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>> with no problems, but yesterday the Kinesis consumer started behaving
>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>>> however the Flink Kinesis consumer started to stop consuming for periods of
>>> time (see the spikes in graph attached which shows data consumed by the
>>> Flink Kinesis consumer)
>>>
>>> Looking in the task manager logs, there are no exceptions however there
>>> is this log message which I believe is related to the problem:
>>>
>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>> expired iterator AAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>> {StartingSequenceNumber: 495665429169236488921642479266
>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>
>>> Having restarted the job from my last savepoint, it's consuming the
>>> stream fine again with no problems.
>>>
>>> Do you have any idea what might be causing this, or anything I should do
>>> to investigate further?
>>>
>>> Cheers,
>>>
>>> Josh
>>>
>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>>
 Hi Steffen,

 Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
 in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
 for noticing this!).
 The Flink community is going to release 

Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Scott Kidder
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good
results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:
https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039cb4d37518859e159b32

It might be worth testing with newer AWS SDK & KCL libraries to see if the
problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh  wrote:

> Hi Gordon,
>
> Thanks for the fast reply!
> You're right about the expired iterator exception occurring just before
> each spike. I can't see any signs of long GC on the task managers... CPU
> has been <15% the whole time when the spikes were taking place and I can't
> see anything unusual in the task manager logs.
>
> But actually I just noticed that the Flink UI showed no successful
> checkpoints during the time of the problem even though my checkpoint
> interval is 15 minutes. So I guess this is probably some kind of Flink
> problem rather than a problem with the Kinesis consumer. Unfortunately I
> can't find anything useful in the logs so not sure what happened!
>
> Josh
>
>
>
> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Josh,
>>
>> That warning message was added as part of FLINK-4514. It pops out
>> whenever a shard iterator was used after 5 minutes it was returned from
>> Kinesis.
>> The only time spent between after a shard iterator was returned and
>> before it was used to fetch the next batch of records, is on deserializing
>> and emitting of the records of the last fetched batch.
>> So unless processing of the last fetched batch took over 5 minutes, this
>> normally shouldn’t happen.
>>
>> Have you noticed any sign of long, constant full GC for your Flink task
>> managers? From your description and check in code, the only possible guess
>> I can come up with now is that
>> the source tasks completely seized to be running for a period of time,
>> and when it came back, the shard iterator was unexpectedly found to be
>> expired. According to the graph you attached,
>> when the iterator was refreshed and tasks successfully fetched a few more
>> batches, the source tasks again halted, and so on.
>> So you should see that same warning message right before every small peak
>> within the graph.
>>
>> Best Regards,
>> Gordon
>>
>>
>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>
>> Hey Gordon,
>>
>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>> with no problems, but yesterday the Kinesis consumer started behaving
>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>> however the Flink Kinesis consumer started to stop consuming for periods of
>> time (see the spikes in graph attached which shows data consumed by the
>> Flink Kinesis consumer)
>>
>> Looking in the task manager logs, there are no exceptions however there
>> is this log message which I believe is related to the problem:
>>
>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>> expired iterator AAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmu
>> wY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex
>> 3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard
>> KinesisStreamShard{streamName='stream001', shard='{ShardId:
>> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>> 85070511730234615865841151857942042863},SequenceNumberRange:
>> {StartingSequenceNumber: 495665429169236488921642479266
>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>
>> Having restarted the job from my last savepoint, it's consuming the
>> stream fine again with no problems.
>>
>> Do you have any idea what might be causing this, or anything I should do
>> to investigate further?
>>
>> Cheers,
>>
>> Josh
>>
>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Steffen,
>>>
>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
>>> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
>>> noticing this!).
>>> The Flink community is going to release 1.1.3 asap, which will include
>>> the fix.
>>> If you don’t want to wait for the release and want to try the fix now,
>>> you can also build on the current “release-1.1” branch, which already has
>>> FLINK-4514 merged.
>>> Sorry for the inconvenience. Let me know if you bump into any other
>>> problems afterwards.
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>> stef...@hausmann-family.de) wrote:
>>>
>>> Hi there,
>>>
>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is 

Re: Freeing resources in SourceFunction

2016-11-03 Thread Maximilian Michels
For your use case you should use the close() method which is always
called upon shutdown of your source. The cancel() is only called when
you explicitly cancel your job.

-Max


On Thu, Nov 3, 2016 at 2:45 PM, Yury Ruchin  wrote:
> Hello,
>
> I'm writing a custom source function for my streaming job. The source
> function manages some connection pool. I want to close that pool once my job
> is "finished" (since the stream is unbounded, the only way I see is to
> cancel the streaming job). Since I inherit RichSourceFunction, there are two
> candidates: cancel() and close(). I'm wondering which one should be picked.
> Looking for best practices, I resorted to the existing sources. One example
> is FlinkKafkaConsumerBase which has both callbacks implemented identically
> (one delegating to the other). Counterexample is InputFormatSourceFunction
> which uses cancel() only to reset flag, while actual cleanup is done in
> close(). Which of these approaches is a better fit in the described case?
>
> Just FYI, Flink version I use is 1.1.2.
>
> Thanks,
> Yury


Re: What is the best way to load/add patterns dynamically (at runtime) with Flink?

2016-11-03 Thread Aljoscha Krettek
Hi Pedro,
you can have dynamic windows by assigning the windows to elements in your
Processor (so you would need to extend that type to have a field for the
window). Then, you can write a custom WindowAssigner that will simply get
the window from an event and assign that as the internal window.

Please let me know if you need more details.

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 18:40 PedroMrChaves  wrote:

> Hello,
>
> Your tip was very helpful and I took a similar approach.
>
> I have something like this:
> class Processor extends RichCoFlatMapFunction {
> public void flatMap1(Event event, Collector out) {
>  process(event,out); // run the javscript (rules)  against the
> incoming events
> }
>
> public void flatMap2(Rule rule , Collector out) {
>   // We add the rule to the list of existing rules
>   addNewRule(rule)
> }
> }
>
> But know I face a new challenge, I don't have access to the windowed
> constructs of flink and I can't dynamically create new window aggregations
> inside the flatMap. At least not that I know of.
>
> Did you face a similar problem? Any Ideas?
>
> Thank you and regards,
> Pedro Chaves
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461p9876.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: emit partial state in window (streaming)

2016-11-03 Thread Luis Mariano Guerra
On Thu, Nov 3, 2016 at 7:05 PM, Kostas Kloudas 
wrote:

> Hi Luis,
>
> You cannot have event-time early firings on both chained window operators.
> The reason is that each early result from the first window operator will
> have a timestamp equal to window.maxTimestamp-1.
> So in the second windowing operator, they will be buffered until the
> watermark signaling the end of the window arrives.
>

so, how do I get windowAll and partial results? do I have to remove the
partial calculations and do it all in one node/thread? is there another way?


>
> Now for the second point, I think that what you have understood is correct.
> The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to
> call the onEventTime().
>

then what else calls onEventTime? because if a register the event time
timer inside of it something else calls it.



>
> Cheers,
> Kostas
>
>
> > On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra <
> mari...@event-fabric.com> wrote:
> >
> > On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
> > Hi Luis,
> >
> > Can you try to comment the whole final windowing and see if this is
> works?
> > This includes the following lines:
> >
> >   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
> >   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
> >   .apply(creator.create(), windowAllFold, windowAllMerge);
> >
> >
> > commenting it emits on fire, how do I make the trigger "go thorough" the
> windowAll, or if not possible, how can I join the substreams in one stream
> and respect the trigger?
> >
> > An additional note is that I would go for registering an event time
> timer at the onEventTime
> > instead of checking the timestamp on the onElement(). This is because
> with your implementation,
> > in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
> >
> > then I think I understood the purpose of registering the event time
> timer wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called
> to register a timer to call onEventTime?
> >
> >
> > Cheers,
> > Kostas
> >
> >> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra <
> mari...@event-fabric.com> wrote:
> >>
> >>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime,
> timeUnit)))
> >> //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
> >> .apply(creator.create(), windowAllFold, windowAllMerge);
> >
> >
>
>


Re: Testing DataStreams

2016-11-03 Thread Maximilian Michels
Hi Juan,

StreamingMultipleProgramsTestBase is in the testing scope. Thus, is it
not bundled in the normal jars. You would have to add the
flink-test-utils_2.10 module.

It is true that there is no guide. There is
https://github.com/ottogroup/flink-spector for testing streaming
pipelines.

For unit tests and integration tests please have a look at the Flink
source code which contains many such tests.

-Max


On Wed, Nov 2, 2016 at 4:58 PM, Juan Rodríguez Hortalá
 wrote:
> Hi,
>
> I'm new to Flink, and I'm trying to write my first unit test  for a simple
> DataStreams job. In
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/util/package-summary.html
> I see several promising classes, but for example I cannot import
> org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase from the
> artifacts obtained by the following Maven dependencies:
>
>  
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-streaming-java_2.10
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients_2.10
> ${flink.version}
> 
>
> I also see that the page
> https://cwiki.apache.org/confluence/display/FLINK/Testing+Utilities+and+Mini+Clusters
> is empty. Is there any documentation or tutorial about writing simple unit
> tests running in local mode? I'm looking for something similar to
> http://blog.cloudera.com/blog/2015/09/making-apache-spark-testing-easy-with-spark-testing-base/,
> where you can specify the expected output as a collection to define an
> assertion, but for Flink.
>
> By the way I have also implemented source function similar to
> StreamExecutionEnvironment.fromElements but that allows to add time gaps
> between the generated elements, that I think could be useful for testing, in
> case someone is interested
> https://github.com/juanrh/flink-state-eviction/blob/master/src/main/java/com/github/juanrh/streaming/source/ElementsWithGapsSource.java.
>
> Thanks,
>
> Juan


Re: emit partial state in window (streaming)

2016-11-03 Thread Kostas Kloudas
Hi Luis,

You cannot have event-time early firings on both chained window operators. 
The reason is that each early result from the first window operator will have a 
timestamp equal to window.maxTimestamp-1.
So in the second windowing operator, they will be buffered until the watermark 
signaling the end of the window arrives.

Now for the second point, I think that what you have understood is correct. 
The "ctx.registerEventTimeTimer(window.getEnd())” registers a timer to call the 
onEventTime().

Cheers,
Kostas


> On Nov 3, 2016, at 3:29 PM, Luis Mariano Guerra  
> wrote:
> 
> On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas  
> wrote:
> Hi Luis,
> 
> Can you try to comment the whole final windowing and see if this is works? 
> This includes the following lines:
> 
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, 
> windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
> 
> 
> commenting it emits on fire, how do I make the trigger "go thorough" the 
> windowAll, or if not possible, how can I join the substreams in one stream 
> and respect the trigger?
>  
> An additional note is that I would go for registering an event time timer at 
> the onEventTime 
> instead of checking the timestamp on the onElement(). This is because with 
> your implementation,
> in order to fire a computation, you always have to wait for an element 
> outside the partial window interval to arrive.
> 
> then I think I understood the purpose of registering the event time timer 
> wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to register 
> a timer to call onEventTime?
>  
> 
> Cheers,
> Kostas
> 
>> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra  
>> wrote:
>> 
>>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>> //.trigger(new PartialWindowTrigger<>(partialWindowTime, 
>> timeUnit, windowTime, timeUnit))
>> .apply(creator.create(), windowAllFold, windowAllMerge);
> 
> 



Re: BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

2016-11-03 Thread Maximilian Michels
The BoundedOutOfOrdernessTimestampExtractor is not really useful if
you have outliers because you always set the Watermark to the element
with the largest timestamp minus the out-of-orderness. If your data is
of such nature, you will have to implement a custom Watermark
extractor to deal with these elements.

-Max


On Tue, Nov 1, 2016 at 10:02 PM, Konstantin Knauf
 wrote:
> Hi Dominik,
>
> out of curiosity, how come that you receive timestamps from the future? ;)
>
> Depending on the semantics of these future events, it might also make
> sense to already "floor" the timestamp to processing time in the
> extractTimestamp()-Method.
>
> I am not sure, if I understand your follow up question correctly, but
> afaik Flink does not have a notion of future and past. Events just have
> just timestamps and the general assumption is that time runs forward (at
> least in the long run). "Future" events can potentially advance the
> curent watermark. So (event time) windows might be closed "too early"
> w.r.t. to the rest of the events (These events can be processed with
> "allowed lateness".) There are some sections in the documentation, which
> might help you
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html).
> Depending on the particular problem, you might be able to develop a
> fancy watermarking mechanism, which mitigates the effect of these future
> timestamps. Does this answer your question in any way? :)
>
> Cheers,
>
> Konstantin
>
>
> On 01.11.2016 15:05, Dominik Bruhn wrote:
>> Hey,
>> I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my
>> timestamps and discarding to old events (which happens sometimes).
>>
>> Now my problem is that some events, by accident have timestamps in the
>> future. If the timestamps are more in the future than my
>> `maxOutOfOrderness`, I'm discarding valid events. So I need a way of
>> saying that the
>> BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from
>> the future for the watermark calculation. I still want to keep the
>> events if they are in the future and assign them to the right watermarks.
>>
>> How can I achieve this? I thought about checking whether the potential
>> timestamp is in the future before considering it for a watermark. I
>> cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea
>> https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7
>>
>> Does this make sense? Or is there a better approach?
>>
>> In general, how does Flink handle readings from the future?
>>
>> Thanks,
>> Dominik
>>
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: What is the best way to load/add patterns dynamically (at runtime) with Flink?

2016-11-03 Thread PedroMrChaves
Hello,

Your tip was very helpful and I took a similar approach.

I have something like this:
class Processor extends RichCoFlatMapFunction {
public void flatMap1(Event event, Collector out) {
 process(event,out); // run the javscript (rules)  against the
incoming events
}

public void flatMap2(Rule rule , Collector out) {
  // We add the rule to the list of existing rules
  addNewRule(rule)
}
}

But know I face a new challenge, I don't have access to the windowed
constructs of flink and I can't dynamically create new window aggregations
inside the flatMap. At least not that I know of. 

Did you face a similar problem? Any Ideas?

Thank you and regards,
Pedro Chaves



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461p9876.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Kafka 0.10.0.0 connector

2016-11-03 Thread Dominik Safaric
Hi Robert,

Thanks for sharing this insight. 

However, the Flink Kafka 010 connector is only compatible with the 
1.2-SNAPSHOT. 

Despite that, I’ve managed to get the Flink Kafka 09 use the Kafka version 
0.10.0.1 Only minor changes to the test code had to be made, mostly in regard 
to Zookeeper utilities. 

Thanks for your help though!
Domini

> On 3 Nov 2016, at 13:59, Robert Metzger  wrote:
> 
> Hi,
> I just tried the Kafka 0.10 connector again, and I could not reproduce the 
> issue you are reporting.
> 
> This is my test job:
> 
> // parse input arguments
> final ParameterTool parameterTool = ParameterTool.fromArgs(args);
> 
> if(parameterTool.getNumberOfParameters() < 4) {
>System.out.println("Missing parameters!\nUsage: Kafka --topic  " +
>  "--bootstrap.servers  --zookeeper.connect  
> --group.id  ");
>return;
> }
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableSysoutLogging();
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
> 1));
> env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
> env.getConfig().setGlobalJobParameters(parameterTool); // make parameters 
> available in the web interface
> 
> DataStream messageStream = env
>   .addSource(new FlinkKafkaConsumer010<>(
> parameterTool.getRequired("topic"),
> new SimpleStringSchema(),
> parameterTool.getProperties()));
> 
> // write kafka stream to standard out.
> messageStream.print();
> 
> env.execute("Read from Kafka example");
> 
> On Thu, Nov 3, 2016 at 1:48 PM, Dominik Safaric  > wrote:
> Hi Robert,
> 
>> I think the easiest way to get Kafka 0.10 running with Flink is to use the 
>> Kafka 0.10 connector in the current Flink master.
> 
> 
> Well, I’ve already builded the Kafka 0.10 connector from the master, but 
> unfortunately I keep getting the error of the type checker that the type of 
> the FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment 
> are not compatible - that is, addSource requires a subclass of the 
> SourceFunction, whereas the instance of the FlinkKafkaConsumer10 class is 
> of type FlinkKafkaConsumer10. 
> 
> Which I find quite strange because I would assume that the FlinkKafkaConsumer 
> instance should be of type SourceFunction. However, the same even happened 
> while building the FlinkKafkaConsumer09. 
> 
> Any hint what might be going on?
> 
> I’ve build the jar distribution as a clean maven package (without running the 
> tests). 
> 
> Thanks,
> Dominik
> 
>> On 3 Nov 2016, at 13:29, Robert Metzger > > wrote:
>> 
>> Hi Dominik,
>> 
>> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not 
>> compile the Kafka 0.9 against Kafka 0.10 dependencies.
>> 
>> I think the easiest way to get Kafka 0.10 running with Flink is to use the 
>> Kafka 0.10 connector in the current Flink master.
>> You can probably copy the connector's code into your own project and use the 
>> new connector from there.
>> 
>> Regards,
>> Robert
>> 
>> 
>> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric > > wrote:
>> Dear all,
>> 
>> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x connector, 
>> I want to use the Flink 0.9 connector in conjunction with the 0.10.x 
>> versions. 
>> 
>> The reason behind this is because we are currently evaluating Flink part of 
>> an empirical research, hence a stable release is required. In addition, the 
>> reason why we have the requirement of using the Kafka 0.10.x versions is 
>> because since the 0.10.0 Kafka supports consumer and producer interceptors 
>> and message timestamps.
>> 
>> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example, so 
>> far I’ve changed the Flink Kafka 0.9 connector dependency to the required 
>> Kafka version and build the project. However, as I imported the jar and 
>> added the source to the StreamExecutionEnvironment a type error occurred 
>> stating that the addSource function requires a class deriving from the 
>> SourceFunction interface. 
>> 
>> Hence, what have gone wrong during the build? I assume a dependency issue.
>> 
>> Next, I’ve tried just simply overriding the dependencies of the Flink Kafka 
>> connector within the project pom.xml, however there is obviously a slight 
>> API mismatch hence this cannot be done. 
>> 
>> I would really appreciate if anyone could provide some guidance once how to 
>> successfully build the Flink Kafka connector supporting Kafka 0.10.x 
>> versions. 
>> 
>> Thanks in advance,
>> Dominik 
>> 
> 
> 



Re: NotSerializableException: jdk.nashorn.api.scripting.NashornScriptEngine

2016-11-03 Thread Greg Hogan
Hi Pedro,

Which problem are you having, the NotSerializableException or not seeing
open() called on a RichFunction?

Greg

On Wed, Nov 2, 2016 at 10:47 AM, PedroMrChaves 
wrote:

> Hello,
>
> I'm having the exact same problem.
> I'm using a filter function on a datastream.
> My flink version is 1.1.3.
>
> What could be the problem?
>
>
> Regards,
> Pedro Chaves.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/NotSerializableException-jdk-
> nashorn-api-scripting-NashornScriptEngine-tp1496p9834.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Best Practices/Advice - Execution of jobs

2016-11-03 Thread PedroMrChaves
Thank you.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-Practices-Advice-Execution-of-jobs-tp9822p9873.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Using Flink with Accumulo

2016-11-03 Thread Josh Elser

Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better 
suggestions. Some points inline, and sorry in advance if I suggest 
something outright wrong. Hopefully someone from the Flink side can help 
give context where necessary :)


Oliver Swoboda wrote:

Hello,

I'm using Flink with Accumulo and wanted to read data from the database
by using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here:
https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java


I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and
4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
so there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used
scanner isn't correctly closed. The client count only decreases to
normal values when I restart Flink.


Hrm, this does seem rather bad. Eventually, you'll saturate the 
connections to ZK and ZK itself will start limiting new connections (per 
the maxClientCnxns property).


This sounds somewhat familiar to 
https://issues.apache.org/jira/browse/ACCUMULO-2113. The lack of a 
proper "close()" method on the Instance interface is a known deficiency. 
I'm not sure how Flink execution happens, so I am kind of just guessing.


You might be able to try to use the CleanUp[1] utility to close out the 
thread pools/connections when your Flink "task" is done.



Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table
scan (called baseline in the code), but sometimes it takes 17-18 minutes
and sometimes its between 30 and 60 minutes. In the longer case I can
see in the Accumulo Overview that after some time only one worker is
left with running scans and there are just a few entries/s sanned (4
million at the beginning when all workers are running to 200k when the
one worker is left). Because there are 2.5 billion records to scan and
almost 500 million left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch
scanner on the master node, each scan has almost identical durations and
graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
are for each scan the same.


It sounds like maybe your partitioning was sub-optimal and caused one 
task to get a majority of the data? Having the autoAdjustRanges=true (as 
you do by default) should help get many batches of work based on the 
tablet boundaries in Accumulo. I'm not sure how Flink actually executes 
them though.



Yours faithfully,
Oliver Swoboda



[1] 
https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36




Re: Best Practices/Advice - Execution of jobs

2016-11-03 Thread Aljoscha Krettek
Hi Pedro,
I think it would be better to have two jobs and keep all the rules in one
place. If it's not too many sources you might even consider having
everything in one job so you don't have to duplicate the rules.

There's a tradeoff, though, if it becomes too much stuff then splitting up
will be beneficial because it makes the jobs easier to maintain/monitor.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:26 PedroMrChaves  wrote:

> Hello,
>
> I'm trying to build a stream event correlation engine with Flink and I have
> some questions regarding the for the execution of jobs.
>
> In my architecture I need to have different sources of data, lets say for
> instance:
> /firewallStream= environment.addSource([FirewalLogsSource]);
> proxyStream = environment.addSource([ProxyLogsSource]);
> /
> and for each of these sources, I need to apply a set of rules.
> So lets say I have a job that has as a source the proxy stream data with
> the
> following rules:
>
> ///Abnormal Request Method
> stream.[RuleLogic].addSink([output])
> //Web Service on Non-Typical Port
> stream.[RuleLogic].addSink([output])
> //Possible Brute Force
> stream.[RuleLogic].addSink([output])/
>
> These rules will probably scale to be in the order of 15 to 20 rules.
>
> What is the best approach in this case:
> 1. Should I create 2 jobs one for each source and each job would have the
> 15-20 rules?
> 2. Should I split the rules into several jobs?
> 3. Other options?
>
>
> Thank you and Regards,
> Pedro Chaves.
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-Practices-Advice-Execution-of-jobs-tp9822.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Why are externalized checkpoints deleted on Job Manager exit?

2016-11-03 Thread Ufuk Celebi
A fix is pending here: https://github.com/apache/flink/pull/2750

The behaviour on graceful shut down/suspension respects the
cancellation behaviour with this change.

On Thu, Nov 3, 2016 at 3:23 PM, Ufuk Celebi  wrote:
> I don't need the logs. Externalized checkpoints have been configured
> to be deleted when the job is suspended, too. When the YARN session is
> terminated, all jobs are suspended.
>
> The behaviour seems like a bug. As a work around you have to cancel
> the job before you shut down the YARN session. Let me think for a
> minute whether there is a good reason to discard externalized
> checkpoints on suspension, but I don't think so.
>
> On Thu, Nov 3, 2016 at 3:00 PM, Ufuk Celebi  wrote:
>> They should actually be not deleted.
>>
>> Could you please share the logs with me? In the mean time, I will try
>> to reproduce this.
>>
>> On Thu, Nov 3, 2016 at 2:04 PM, Aljoscha Krettek  wrote:
>>> +Ufuk
>>>
>>> Ufuk recently worked on that, if I'm not mistaken. Do you have an Idea what
>>> could be going on here?
>>>
>>>
>>> On Wed, 2 Nov 2016 at 21:52 Clifford Resnick  wrote:

 Testing externalized checkpoints in a YARN-based cluster, configured with:


 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 I can confirm that checkpoint is retained between cancelled jobs, however
 it’s deleted when the Job Manager session is gracefully shutdown. We’d
 really like for the persistent checkpoint to be treated like a Savepoint 
 and
 not be deleted. Is there a way to enable this?


>>>


Re: emit partial state in window (streaming)

2016-11-03 Thread Luis Mariano Guerra
On Thu, Nov 3, 2016 at 2:06 PM, Kostas Kloudas 
wrote:

> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
commenting it emits on fire, how do I make the trigger "go thorough" the
windowAll, or if not possible, how can I join the substreams in one stream
and respect the trigger?


> An additional note is that I would go for registering an event time timer
> at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with
> your implementation,
> in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
>

then I think I understood the purpose of registering the event time timer
wrong, isn't "ctx.registerEventTimeTimer(window.getEnd())" called to
register a timer to call onEventTime?


>
> Cheers,
> Kostas
>
> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra 
> wrote:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
> //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
> .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
>


Re: Why are externalized checkpoints deleted on Job Manager exit?

2016-11-03 Thread Ufuk Celebi
I don't need the logs. Externalized checkpoints have been configured
to be deleted when the job is suspended, too. When the YARN session is
terminated, all jobs are suspended.

The behaviour seems like a bug. As a work around you have to cancel
the job before you shut down the YARN session. Let me think for a
minute whether there is a good reason to discard externalized
checkpoints on suspension, but I don't think so.

On Thu, Nov 3, 2016 at 3:00 PM, Ufuk Celebi  wrote:
> They should actually be not deleted.
>
> Could you please share the logs with me? In the mean time, I will try
> to reproduce this.
>
> On Thu, Nov 3, 2016 at 2:04 PM, Aljoscha Krettek  wrote:
>> +Ufuk
>>
>> Ufuk recently worked on that, if I'm not mistaken. Do you have an Idea what
>> could be going on here?
>>
>>
>> On Wed, 2 Nov 2016 at 21:52 Clifford Resnick  wrote:
>>>
>>> Testing externalized checkpoints in a YARN-based cluster, configured with:
>>>
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>
>>> I can confirm that checkpoint is retained between cancelled jobs, however
>>> it’s deleted when the Job Manager session is gracefully shutdown. We’d
>>> really like for the persistent checkpoint to be treated like a Savepoint and
>>> not be deleted. Is there a way to enable this?
>>>
>>>
>>


Re: Best way of doing some global initialization

2016-11-03 Thread Satish Chandra Gupta
Hi Aljoscha,

Thanks for quick reply. I didn't quite understand your suggestion. Say I
have three Kafka Stream sources that my Flink program consumes. How can I
modify those three sources to be Kafka source as well as consumer of this
single element?

Thanks,
+satish

On Thu, Nov 3, 2016 at 6:37 PM, Aljoscha Krettek 
wrote:

> Hi,
> I'm afraid this is not possible right now if you don't want to go with
> completely custom sources/operators.
>
> If you want to go the custom source route you would have only one true
> source in your job that does the global initialisation and then emits one
> element. Your other sources would be operators that have this one source as
> input and only start producing data once that one element arrived. In this
> way, you would block all other sources until the first source has done the
> initialisation.
>
> Cheers,
> Aljoscha
>
> On Thu, 3 Nov 2016 at 09:26 Satish Chandra Gupta 
> wrote:
>
>> Hi,
>>
>> I need to do set/initialize some config of a framework/util that is used
>> in my Flink stream processing app. Basically, a piece of code that needs to
>> be executed exactly once before anything else. Clearly doing it in the main
>> flink processor function will not suffice, as apart from the client, the
>> same needs to be done on other node before topology is executed.
>>
>> I have gone through Flink best practices
>> ,
>> and one way I can think about it to check whether init has been done in the
>> open() all Rich Functions, and if not then call the initialization code.
>> But that seems to be "right", basically to add any operator, I must do this
>> initillazation call boilerplate code.
>>
>> Is there anyway to define a global initializations in Flink, or to define
>> an operator that is guaranteed to be called the first thing on all nodes?
>>
>> Thanks,
>> +satish
>>
>


Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Josh
Hi Gordon,

Thanks for the fast reply!
You're right about the expired iterator exception occurring just before
each spike. I can't see any signs of long GC on the task managers... CPU
has been <15% the whole time when the spikes were taking place and I can't
see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful
checkpoints during the time of the problem even though my checkpoint
interval is 15 minutes. So I guess this is probably some kind of Flink
problem rather than a problem with the Kinesis consumer. Unfortunately I
can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Josh,
>
> That warning message was added as part of FLINK-4514. It pops out whenever
> a shard iterator was used after 5 minutes it was returned from Kinesis.
> The only time spent between after a shard iterator was returned and before
> it was used to fetch the next batch of records, is on deserializing and
> emitting of the records of the last fetched batch.
> So unless processing of the last fetched batch took over 5 minutes, this
> normally shouldn’t happen.
>
> Have you noticed any sign of long, constant full GC for your Flink task
> managers? From your description and check in code, the only possible guess
> I can come up with now is that
> the source tasks completely seized to be running for a period of time, and
> when it came back, the shard iterator was unexpectedly found to be expired.
> According to the graph you attached,
> when the iterator was refreshed and tasks successfully fetched a few more
> batches, the source tasks again halted, and so on.
> So you should see that same warning message right before every small peak
> within the graph.
>
> Best Regards,
> Gordon
>
>
> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>
> Hey Gordon,
>
> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
> with no problems, but yesterday the Kinesis consumer started behaving
> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
> however the Flink Kinesis consumer started to stop consuming for periods of
> time (see the spikes in graph attached which shows data consumed by the
> Flink Kinesis consumer)
>
> Looking in the task manager logs, there are no exceptions however there is
> this log message which I believe is related to the problem:
>
> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.
> connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
> expired iterator AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/
> tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/
> EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//
> Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for
> shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> 85070511730234615865841151857942042863},SequenceNumberRange:
> {StartingSequenceNumber: 495665429169236488921642479266
> 79091159472198219567464450,}}'}; refreshing the iterator ...
>
> Having restarted the job from my last savepoint, it's consuming the stream
> fine again with no problems.
>
> Do you have any idea what might be causing this, or anything I should do
> to investigate further?
>
> Cheers,
>
> Josh
>
> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Steffen,
>>
>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
>> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
>> noticing this!).
>> The Flink community is going to release 1.1.3 asap, which will include
>> the fix.
>> If you don’t want to wait for the release and want to try the fix now,
>> you can also build on the current “release-1.1” branch, which already has
>> FLINK-4514 merged.
>> Sorry for the inconvenience. Let me know if you bump into any other
>> problems afterwards.
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>> stef...@hausmann-family.de) wrote:
>>
>> Hi there,
>>
>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>> from a Kinesis stream. However, after a while (the exact duration varies
>> and is in the order of minutes) the Kinesis source doesn't emit any
>> further events and hence Flink doesn't produce any further output.
>> Eventually, an ExpiredIteratorException occurs in one of the task,
>> causing the entire job to fail:
>>
>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>> the future than the tolerated delay of 30 milliseconds. (Service:
>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>> Request ID: 

Re: Why are externalized checkpoints deleted on Job Manager exit?

2016-11-03 Thread Ufuk Celebi
They should actually be not deleted.

Could you please share the logs with me? In the mean time, I will try
to reproduce this.

On Thu, Nov 3, 2016 at 2:04 PM, Aljoscha Krettek  wrote:
> +Ufuk
>
> Ufuk recently worked on that, if I'm not mistaken. Do you have an Idea what
> could be going on here?
>
>
> On Wed, 2 Nov 2016 at 21:52 Clifford Resnick  wrote:
>>
>> Testing externalized checkpoints in a YARN-based cluster, configured with:
>>
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>> I can confirm that checkpoint is retained between cancelled jobs, however
>> it’s deleted when the Job Manager session is gracefully shutdown. We’d
>> really like for the persistent checkpoint to be treated like a Savepoint and
>> not be deleted. Is there a way to enable this?
>>
>>
>


Re: Reg. custom sink for Flink streaming

2016-11-03 Thread Fabian Hueske
Hi,

a MapFunction should be the way to go for this use case.
What exactly is not working? Do you get an exception? Is the map method not
called?

Best, Fabian

2016-11-03 0:00 GMT+01:00 Sandeep Vakacharla :

> Hi there,
>
>
>
> I have the following use case-
>
>
>
> I have data coming from Kafka which I need to stream and write each
> message to a database. I’m using kafka-flink connector for streaming data
> from Kafka. I don’t want to use flink sinks to write date from stream.
>
>
>
> I’m doing the following which doesn’t seem to work-
>
>
>
> messageStream
> .rebalance()
> .map(*new *MapFunction() {
> @Override
> *public *String map(String value) {
> getDbSession().execute(*"insert into TABLE_XXX (key,
> event_timeuuid, data) " *+
> *"VALUES ("*+ i+*",null, value); "*);
> *return *value;
> }
> })
>
>
>
> How can I iterate over each message in the stream and do something with
> that message?
>
>
>
> Thanks
>
>
> Information contained in this e-mail message is confidential. This e-mail
> message is intended only for the personal use of the recipient(s) named
> above. If you are not an intended recipient, do not read, distribute or
> reproduce this transmission (including any attachments). If you have
> received this email in error, please immediately notify the sender by email
> reply and delete the original message.
>


Freeing resources in SourceFunction

2016-11-03 Thread Yury Ruchin
Hello,

I'm writing a custom source function for my streaming job. The source
function manages some connection pool. I want to close that pool once my
job is "finished" (since the stream is unbounded, the only way I see is to
cancel the streaming job). Since I inherit RichSourceFunction, there are
two candidates: cancel() and close(). I'm wondering which one should be
picked. Looking for best practices, I resorted to the existing sources. One
example is FlinkKafkaConsumerBase which has both callbacks implemented
identically (one delegating to the other). Counterexample is
InputFormatSourceFunction which uses cancel() only to reset flag, while
actual cleanup is done in close(). Which of these approaches is a better
fit in the described case?

Just FYI, Flink version I use is 1.1.2.

Thanks,
Yury


Re: emit partial state in window (streaming)

2016-11-03 Thread Manu Zhang
Hi Luis,

You may try ContinuousEventTimeTrigger

 that continuously fire on given time interval instead of writing your own.
Note that we recently fixed a bug for this trigger so I think only the
trunk version is working.

Cheers,
Manu

On Thu, Nov 3, 2016 at 9:07 PM Kostas Kloudas 
wrote:

> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
> An additional note is that I would go for registering an event time timer
> at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with
> your implementation,
> in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
>
> Cheers,
> Kostas
>
> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra 
> wrote:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
> //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
> .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
>


Re: Best way of doing some global initialization

2016-11-03 Thread Aljoscha Krettek
Hi,
I'm afraid this is not possible right now if you don't want to go with
completely custom sources/operators.

If you want to go the custom source route you would have only one true
source in your job that does the global initialisation and then emits one
element. Your other sources would be operators that have this one source as
input and only start producing data once that one element arrived. In this
way, you would block all other sources until the first source has done the
initialisation.

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 09:26 Satish Chandra Gupta 
wrote:

> Hi,
>
> I need to do set/initialize some config of a framework/util that is used
> in my Flink stream processing app. Basically, a piece of code that needs to
> be executed exactly once before anything else. Clearly doing it in the main
> flink processor function will not suffice, as apart from the client, the
> same needs to be done on other node before topology is executed.
>
> I have gone through Flink best practices
> ,
> and one way I can think about it to check whether init has been done in the
> open() all Rich Functions, and if not then call the initialization code.
> But that seems to be "right", basically to add any operator, I must do this
> initillazation call boilerplate code.
>
> Is there anyway to define a global initializations in Flink, or to define
> an operator that is guaranteed to be called the first thing on all nodes?
>
> Thanks,
> +satish
>


Re: emit partial state in window (streaming)

2016-11-03 Thread Kostas Kloudas
Hi Luis,

Can you try to comment the whole final windowing and see if this is works? 
This includes the following lines:

  .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
  .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit, windowTime, 
timeUnit))
  .apply(creator.create(), windowAllFold, windowAllMerge);

An additional note is that I would go for registering an event time timer at 
the onEventTime 
instead of checking the timestamp on the onElement(). This is because with your 
implementation,
in order to fire a computation, you always have to wait for an element outside 
the partial window interval to arrive.

Cheers,
Kostas

> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra  
> wrote:
> 
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
> //.trigger(new PartialWindowTrigger<>(partialWindowTime, 
> timeUnit, windowTime, timeUnit))
> .apply(creator.create(), windowAllFold, windowAllMerge);



Re: Why are externalized checkpoints deleted on Job Manager exit?

2016-11-03 Thread Aljoscha Krettek
+Ufuk

Ufuk recently worked on that, if I'm not mistaken. Do you have an Idea what
could be going on here?


On Wed, 2 Nov 2016 at 21:52 Clifford Resnick  wrote:

> Testing externalized checkpoints in a YARN-based cluster, configured with:
>
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> I can confirm that checkpoint is retained between cancelled jobs, however
> it’s deleted when the Job Manager session is gracefully shutdown. We’d
> really like for the persistent checkpoint to be treated like a Savepoint
> and not be deleted. Is there a way to enable this?
>
>
>


Re: Accessing StateBackend snapshots outside of Flink

2016-11-03 Thread Aljoscha Krettek
Hi,
there are two open issues about this:
 * https://issues.apache.org/jira/browse/FLINK-3946
 * https://issues.apache.org/jira/browse/FLINK-3089

no work was done on this yet. You can, however, simulate TTL for state by
using a TimelyFlatMapFunction and manually setting a timer for clearing out
state. (available in Flink 1.2-SNAPSHOT).

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 01:30 bwong247  wrote:

> We're currently investigating Flink, and one of the features that we'd like
> to have is a TTL feature to time out older values in state.  I saw this
> thread and it sounds like the functionality was being considered.  Is there
> any update?
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116p9846.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink Kafka 0.10.0.0 connector

2016-11-03 Thread Robert Metzger
Hi,
I just tried the Kafka 0.10 connector again, and I could not reproduce the
issue you are reporting.

This is my test job:

// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if(parameterTool.getNumberOfParameters() < 4) {
   System.out.println("Missing parameters!\nUsage: Kafka --topic  " +
 "--bootstrap.servers  --zookeeper.connect  --group.id ");
   return;
}

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
env.getConfig().setGlobalJobParameters(parameterTool); // make
parameters available in the web interface

DataStream messageStream = env
  .addSource(new FlinkKafkaConsumer010<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));

// write kafka stream to standard out.
messageStream.print();

env.execute("Read from Kafka example");


On Thu, Nov 3, 2016 at 1:48 PM, Dominik Safaric 
wrote:

> Hi Robert,
>
> I think the easiest way to get Kafka 0.10 running with Flink is to use the
> Kafka 0.10 connector in the current Flink master.
>
>
> Well, I’ve already builded the Kafka 0.10 connector from the master, but
> unfortunately I keep getting the error of the type checker that the type of
> the FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment
> are not compatible - that is, addSource requires a subclass of the
> SourceFunction, whereas the instance of the FlinkKafkaConsumer10 class
> is of type FlinkKafkaConsumer10.
>
> Which I find quite strange because I would assume that the
> FlinkKafkaConsumer instance should be of type SourceFunction. However, the
> same even happened while building the FlinkKafkaConsumer09.
>
> Any hint what might be going on?
>
> I’ve build the jar distribution as a clean maven package (without running
> the tests).
>
> Thanks,
> Dominik
>
> On 3 Nov 2016, at 13:29, Robert Metzger  wrote:
>
> Hi Dominik,
>
> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not
> compile the Kafka 0.9 against Kafka 0.10 dependencies.
>
> I think the easiest way to get Kafka 0.10 running with Flink is to use the
> Kafka 0.10 connector in the current Flink master.
> You can probably copy the connector's code into your own project and use
> the new connector from there.
>
> Regards,
> Robert
>
>
> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric 
> wrote:
>
>> Dear all,
>>
>> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x
>> connector, I want to use the Flink 0.9 connector in conjunction with the
>> 0.10.x versions.
>>
>> The reason behind this is because we are currently evaluating Flink part
>> of an empirical research, hence a stable release is required. In addition,
>> the reason why we have the requirement of using the Kafka 0.10.x versions
>> is because since the 0.10.0 Kafka supports consumer and producer
>> interceptors and message timestamps.
>>
>> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example,
>> so far I’ve changed the Flink Kafka 0.9 connector dependency to the
>> required Kafka version and build the project. However, as I imported the
>> jar and added the source to the StreamExecutionEnvironment a type error
>> occurred stating that the addSource function requires a class deriving from
>> the SourceFunction interface.
>>
>> *Hence, what have gone wrong during the build?* I assume a dependency
>> issue.
>>
>> Next, I’ve tried just simply overriding the dependencies of the Flink
>> Kafka connector within the project pom.xml, however there is obviously a
>> slight API mismatch hence this cannot be done.
>>
>> I would really appreciate if anyone could provide some guidance once how
>> to successfully build the Flink Kafka connector supporting Kafka 0.10.x
>> versions.
>>
>> Thanks in advance,
>> Dominik
>>
>
>
>


Using Flink with Accumulo

2016-11-03 Thread Oliver Swoboda
Hello,

I'm using Flink with Accumulo and wanted to read data from the database by
using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here: https://github.com/
OSwoboda/masterthesis/blob/master/aggregation.flink/src/
main/java/de/oswoboda/aggregation/Main.java

I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and 4
Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks, so
there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used scanner
isn't correctly closed. The client count only decreases to normal values
when I restart Flink.

Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table scan
(called baseline in the code), but sometimes it takes 17-18 minutes and
sometimes its between 30 and 60 minutes. In the longer case I can see in
the Accumulo Overview that after some time only one worker is left with
running scans and there are just a few entries/s sanned (4 million at the
beginning when all workers are running to 200k when the one worker is
left). Because there are 2.5 billion records to scan and almost 500 million
left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch
scanner on the master node, each scan has almost identical durations and
graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks are
for each scan the same.

Yours faithfully,
Oliver Swoboda


Re: Flink Kafka 0.10.0.0 connector

2016-11-03 Thread Dominik Safaric
Hi Robert,

> I think the easiest way to get Kafka 0.10 running with Flink is to use the 
> Kafka 0.10 connector in the current Flink master.


Well, I’ve already builded the Kafka 0.10 connector from the master, but 
unfortunately I keep getting the error of the type checker that the type of the 
FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment are not 
compatible - that is, addSource requires a subclass of the SourceFunction, 
whereas the instance of the FlinkKafkaConsumer10 class is of type 
FlinkKafkaConsumer10. 

Which I find quite strange because I would assume that the FlinkKafkaConsumer 
instance should be of type SourceFunction. However, the same even happened 
while building the FlinkKafkaConsumer09. 

Any hint what might be going on?

I’ve build the jar distribution as a clean maven package (without running the 
tests). 

Thanks,
Dominik

> On 3 Nov 2016, at 13:29, Robert Metzger  wrote:
> 
> Hi Dominik,
> 
> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not 
> compile the Kafka 0.9 against Kafka 0.10 dependencies.
> 
> I think the easiest way to get Kafka 0.10 running with Flink is to use the 
> Kafka 0.10 connector in the current Flink master.
> You can probably copy the connector's code into your own project and use the 
> new connector from there.
> 
> Regards,
> Robert
> 
> 
> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric  > wrote:
> Dear all,
> 
> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x connector, I 
> want to use the Flink 0.9 connector in conjunction with the 0.10.x versions. 
> 
> The reason behind this is because we are currently evaluating Flink part of 
> an empirical research, hence a stable release is required. In addition, the 
> reason why we have the requirement of using the Kafka 0.10.x versions is 
> because since the 0.10.0 Kafka supports consumer and producer interceptors 
> and message timestamps.
> 
> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example, so 
> far I’ve changed the Flink Kafka 0.9 connector dependency to the required 
> Kafka version and build the project. However, as I imported the jar and added 
> the source to the StreamExecutionEnvironment a type error occurred stating 
> that the addSource function requires a class deriving from the SourceFunction 
> interface. 
> 
> Hence, what have gone wrong during the build? I assume a dependency issue.
> 
> Next, I’ve tried just simply overriding the dependencies of the Flink Kafka 
> connector within the project pom.xml, however there is obviously a slight API 
> mismatch hence this cannot be done. 
> 
> I would really appreciate if anyone could provide some guidance once how to 
> successfully build the Flink Kafka connector supporting Kafka 0.10.x 
> versions. 
> 
> Thanks in advance,
> Dominik 
> 



Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Tzu-Li (Gordon) Tai
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a 
shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it 
was used to fetch the next batch of records, is on deserializing and emitting 
of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this 
normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task 
managers? From your description and check in code, the only possible guess I 
can come up with now is that
the source tasks completely seized to be running for a period of time, and when 
it came back, the shard iterator was unexpectedly found to be expired. 
According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more 
batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak 
within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no 
problems, but yesterday the Kinesis consumer started behaving strangely... My 
Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink 
Kinesis consumer started to stop consuming for periods of time (see the spikes 
in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this 
log message which I believe is related to the problem:
2016-11-03 09:27:53,782 WARN  
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - 
Encountered an unexpected expired iterator 
AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
 for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
85070511730234615865841151857942042863},SequenceNumberRange: 
{StartingSequenceNumber: 
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the 
iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine 
again with no problems.

Do you have any idea what might be causing this, or anything I should do to 
investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the 
release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing 
this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can 
also build on the current “release-1.1” branch, which already has FLINK-4514 
merged.
Sorry for the inconvenience. Let me know if you bump into any other problems 
afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while 
> right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future 
> than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
> Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
> dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen



Re: Flink Kafka 0.10.0.0 connector

2016-11-03 Thread Robert Metzger
Hi Dominik,

Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not
compile the Kafka 0.9 against Kafka 0.10 dependencies.

I think the easiest way to get Kafka 0.10 running with Flink is to use the
Kafka 0.10 connector in the current Flink master.
You can probably copy the connector's code into your own project and use
the new connector from there.

Regards,
Robert


On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric 
wrote:

> Dear all,
>
> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x
> connector, I want to use the Flink 0.9 connector in conjunction with the
> 0.10.x versions.
>
> The reason behind this is because we are currently evaluating Flink part
> of an empirical research, hence a stable release is required. In addition,
> the reason why we have the requirement of using the Kafka 0.10.x versions
> is because since the 0.10.0 Kafka supports consumer and producer
> interceptors and message timestamps.
>
> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example,
> so far I’ve changed the Flink Kafka 0.9 connector dependency to the
> required Kafka version and build the project. However, as I imported the
> jar and added the source to the StreamExecutionEnvironment a type error
> occurred stating that the addSource function requires a class deriving from
> the SourceFunction interface.
>
> *Hence, what have gone wrong during the build?* I assume a dependency
> issue.
>
> Next, I’ve tried just simply overriding the dependencies of the Flink
> Kafka connector within the project pom.xml, however there is obviously a
> slight API mismatch hence this cannot be done.
>
> I would really appreciate if anyone could provide some guidance once how
> to successfully build the Flink Kafka connector supporting Kafka 0.10.x
> versions.
>
> Thanks in advance,
> Dominik
>


Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Josh
Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with
no problems, but yesterday the Kinesis consumer started behaving
strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
however the Flink Kinesis consumer started to stop consuming for periods of
time (see the spikes in graph attached which shows data consumed by the
Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is
this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN
 org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  -
Encountered an unexpected expired iterator
AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
85070511730234615865841151857942042863},SequenceNumberRange:
{StartingSequenceNumber:
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing
the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream
fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to
investigate further?

Cheers,

Josh

On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Steffen,
>
> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
> noticing this!).
> The Flink community is going to release 1.1.3 asap, which will include the
> fix.
> If you don’t want to wait for the release and want to try the fix now, you
> can also build on the current “release-1.1” branch, which already has
> FLINK-4514 merged.
> Sorry for the inconvenience. Let me know if you bump into any other
> problems afterwards.
>
> Best Regards,
> Gordon
>
>
> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
> stef...@hausmann-family.de) wrote:
>
> Hi there,
>
> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
> from a Kinesis stream. However, after a while (the exact duration varies
> and is in the order of minutes) the Kinesis source doesn't emit any
> further events and hence Flink doesn't produce any further output.
> Eventually, an ExpiredIteratorException occurs in one of the task,
> causing the entire job to fail:
>
> > com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016
> while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the
> future than the tolerated delay of 30 milliseconds. (Service:
> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>
> This seems to be related to FLINK-4514, which is marked as resovled for
> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
> running isn't suspended but hangs just a few minutes after the job has
> been started.
>
> I've attached a log file showing the described behavior.
>
> Any idea what may be wrong?
>
> Thanks,
> Steffen
>
>


Re: emit partial state in window (streaming)

2016-11-03 Thread Luis Mariano Guerra
On Thu, Oct 27, 2016 at 4:37 PM, Fabian Hueske  wrote:

> Hi Luis,
>
> these blogposts should help you with the periodic partial result trigger
> [1] [2].
>

Hi, thanks for the links, I read them and tried to implement what I need,
everything seems to work as expected except for the fact that the partial
results aren't emitted, I created a gist with my PartialWindowTrigger
implementation and the relevant part of the job:

https://gist.github.com/anonymous/041987821e37ee8f862ce1857bb074ea

Is the problem in the trigger?
do I have to create a window assigner too?
is it because of the windowAll?

I reproduce part of the readme from the gist here for convenience, please
see the readme for the PartialWindowTrigger implementation and the rest of
the logs:

What the job does (or I think it does) is to:

   - KeyBy the first string of a field
   - Create a tumbling window of 10 seconds
   - Register my PartialWindowTrigger that will trigger every 2 seconds
   (FIRE) and after the 10 second window (FIRE_AND_PURGE)
   - Fold on each partition to create a partial accumulation
   - Join all the partial results into a unique place through windowAll
   - Aggregate the partial aggregations into one result

Here is the job's relevant part:

return input.keyBy(keySelector)
.timeWindow(Time.of(windowTime, timeUnit))
.trigger(new PartialWindowTrigger<>(partialWindowTime,
timeUnit, windowTime, timeUnit))
.apply(creator.create(), timeWindowFold, timeWindowMerge)
.windowAll(TumblingEventTimeWindows.of(Time.of(windowTime,
timeUnit)))
//.trigger(new PartialWindowTrigger<>(partialWindowTime,
timeUnit, windowTime, timeUnit))
.apply(creator.create(), windowAllFold, windowAllMerge);

The problem is that the triggers FIRE correctly but no partial results
(every 2 seconds) are emitted, only the final result (every 10 seconds) is
emitted.

Even if instead of returning FIRE on onElement, I do:

ctx.registerEventTimeTimer(timestamp);

and return FIRE or FIRE_AND_PURGE on onEvent it still doesn't emit the
partial values.

There's a commented line on the job that registers a PartialWindowTrigger
for the windowAll window but still doesn't work if uncommented.

I added println's on the trigger and on the job steps, this is the output:

2016-11-03T11:07:04.180+01:00 onElement FIRE
2016-11-03T11:07:04.232+01:00 multiCountWindowFn1
2016-11-03T11:07:04.305+01:00 windowAllFold
2016-11-03T11:07:04.733+01:00 timeWindowFold
2016-11-03T11:07:04.681+01:00 onElement CONTINUE
2016-11-03T11:07:05.234+01:00 timeWindowFold
2016-11-03T11:07:05.182+01:00 onElement CONTINUE
2016-11-03T11:07:05.735+01:00 timeWindowFold
2016-11-03T11:07:05.682+01:00 onElement CONTINUE
2016-11-03T11:07:06.236+01:00 timeWindowFold
2016-11-03T11:07:06.183+01:00 onElement FIRE
<3 more blocks like the one above here>

2016-11-03T11:07:12.246+01:00 multiCountWindowFn1
2016-11-03T11:07:12.317+01:00 windowAllFold
2016-11-03T11:07:12.746+01:00 timeWindowFold
2016-11-03T11:07:12.693+01:00 onElement CONTINUE
2016-11-03T11:07:13.247+01:00 timeWindowFold
2016-11-03T11:07:13.194+01:00 onElement CONTINUE
2016-11-03T11:07:13.748+01:00 timeWindowFold
2016-11-03T11:07:13.695+01:00 onElement CONTINUE
2016-11-03T11:07:09.999+01:00 onEventTime FIRE_AND_PURGE
2016-11-03T11:07:13.948+01:00 multiCountWindowFn1
2016-11-03T11:07:14.020+01:00 windowAllFold
2016-11-03T11:07:14.020+01:00 allWindowMerger   1

{"blue":{"foo":{"v":65}},"$":{"ts":1478167634020}}



> Regarding the second question:
> Time windows are by default aligned to 1970-01-01-00:00:00.
> So a 24 hour window will always start at 00:00.
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
> [2] https://www.mapr.com/blog/essential-guide-streaming-
> first-processing-apache-flink
>
> 2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra :
>
>> hi,
>>
>>  I need to calculate some counts for the day but also emit the partial
>> counts periodically, I think triggers may help me, I'm searching around but
>> there's not much content about it, any tip?
>>
>> for example I'm counting access by location to different services, I want
>> to accumulate access during the whole day, but I want to emit the partial
>> count every 5 minutes.
>>
>> one slightly related question, is there a way to align a window to a day?
>> for example a 24 hour window that starts at 00:00.
>>
>> thanks.
>>
>
>


Re: Looping over a DataSet and accesing another DataSet

2016-11-03 Thread otherwise777
I just found out that I am able to use arrays in tuple values, nvm about that
question



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-DataSet-and-accesing-another-DataSet-tp9778p9850.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Best way of doing some global initialization

2016-11-03 Thread Satish Chandra Gupta
Hi,

I need to do set/initialize some config of a framework/util that is used in
my Flink stream processing app. Basically, a piece of code that needs to be
executed exactly once before anything else. Clearly doing it in the main
flink processor function will not suffice, as apart from the client, the
same needs to be done on other node before topology is executed.

I have gone through Flink best practices
,
and one way I can think about it to check whether init has been done in the
open() all Rich Functions, and if not then call the initialization code.
But that seems to be "right", basically to add any operator, I must do this
initillazation call boilerplate code.

Is there anyway to define a global initializations in Flink, or to define
an operator that is guaranteed to be called the first thing on all nodes?

Thanks,
+satish


Flink Kafka 0.10.0.0 connector

2016-11-03 Thread Dominik Safaric
Dear all,

Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x connector, I 
want to use the Flink 0.9 connector in conjunction with the 0.10.x versions. 

The reason behind this is because we are currently evaluating Flink part of an 
empirical research, hence a stable release is required. In addition, the reason 
why we have the requirement of using the Kafka 0.10.x versions is because since 
the 0.10.0 Kafka supports consumer and producer interceptors and message 
timestamps.

To make the 0.9 connector support Kafka version e.g. 0.10.0 for example, so far 
I’ve changed the Flink Kafka 0.9 connector dependency to the required Kafka 
version and build the project. However, as I imported the jar and added the 
source to the StreamExecutionEnvironment a type error occurred stating that the 
addSource function requires a class deriving from the SourceFunction interface. 

Hence, what have gone wrong during the build? I assume a dependency issue.

Next, I’ve tried just simply overriding the dependencies of the Flink Kafka 
connector within the project pom.xml, however there is obviously a slight API 
mismatch hence this cannot be done. 

I would really appreciate if anyone could provide some guidance once how to 
successfully build the Flink Kafka connector supporting Kafka 0.10.x versions. 

Thanks in advance,
Dominik 

Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-03 Thread Philipp Bussche
Hi there,
I am using Graphite and querying it in Grafana is super easy. You just
select fields and they come up automatically for you to select from
depending on how your metric structure in Graphite looks like. You can also
use wildcards.
The only thing I had to do because I am also using containers to run my
Flink components was to define a rather static naming for jobmanager and
task managers so that I wouldn't have to many new entities in my graphs when
I restart especially my task manager containers.
Thanks
Philipp



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9847.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.