Re: JobManager trying to re-submit jobs after failover

2016-07-27 Thread Hironori Ogibayashi
Thank you for telling me about the cause.
It recovered by restarting jobmanager-5 and jobmanager-1.

I restart jobmanager-1 because when I restarted jobmanager-5 ,
checkpointing started to
fail with the following message.


2016-07-28 10:42:28,217 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed
to trigger checkpoint (19 consecutive failed attempts so far)
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
= NoNode for 
/flink/flink_prod/checkpoint-counter/978ef000cca5a3aa6f3461274102f82c
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1270)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.SetDataBuilderImpl$4.call(SetDataBuilderImpl.java:274)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.SetDataBuilderImpl$4.call(SetDataBuilderImpl.java:270)
at 
org.apache.flink.shaded.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.SetDataBuilderImpl.pathInForeground(SetDataBuilderImpl.java:267)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.SetDataBuilderImpl.forPath(SetDataBuilderImpl.java:253)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.SetDataBuilderImpl.forPath(SetDataBuilderImpl.java:41)
at 
org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.trySetValue(SharedValue.java:168)
at 
org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedCount.trySetCount(SharedCount.java:111)
at 
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter.getAndIncrement(ZooKeeperCheckpointIDCounter.java:121)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:411)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:339)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$ScheduledTrigger.run(CheckpointCoordinator.java:928)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)


Anyway, thank you so much for your advice.
I think it would be great if the fix will be backported to 1.0.4.

Regards,
Hionori

2016-07-28 0:08 GMT+09:00 Ufuk Celebi :
> Thanks for the logs. Looking through them it's caused by this issue:
> https://issues.apache.org/jira/browse/FLINK-3800. The ExecutionGraph
> (Flink's internal scheduling structure) is not terminated properly and
> tries to restart the job over and over again.
>
> This is fixed for 1.1.0. Is it an option for you to upgrade to 1.1
> when it's out? We might need to backport this fix for 1.0.4. The work
> around is as I've described, just restart jobmanager-5.
>
>
>
> On Wed, Jul 27, 2016 at 2:55 PM, Hironori Ogibayashi
>  wrote:
>> Thank you so much for your quick response.
>> I am running Flink 1.0.3.
>>
>> I have attached jobmanager logs. The failover happend around 7/26 21:13 JST.
>>
>> Regards,
>> Hironori
>>
>> 2016-07-27 21:21 GMT+09:00 Ufuk Celebi :
>>> Which version of Flink are you running on? I think this might have
>>> been fixed for the 1.1 release
>>> (http://people.apache.org/~uce/flink-1.1.0-rc1/).
>>>
>>> It looks like the ExecutionGraph is still trying to restart although
>>> the JobManager is not the leader anymore. If you could provide the
>>> complete logs of both JobManagers, that would be helpful to be sure
>>> what is happening.
>>>
>>> You can recover from this by restarting the respective JobManager
>>> process (by running "jobmanager.sh stop" script on that machine and
>>> starting again via "jobmanager.sh start cluster") .
>>>
>>> – Ufuk
>>>
>>> On Wed, Jul 27, 2016 at 2:00 PM, Hironori Ogibayashi
>>>  wrote:
 Hello,

 I have standalone Flink cluster with JobManager HA.
 Last night, JobManager failovered because of the connection timeout to
 Zookeeper.
 Job is successfully running under new leader JobManager, but when
 I see the old leader JobManager log, it is trying to re-submit job and
 getting errors. ( for almost 24 hours now)

 Here is the log.

 -
 2016-07-27 20:56:09,218 WARN
 org.apache.flink.runtime.jobmanager.JobManager-
 Discard message
 LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
 20:56:09 Job execution switched to status RESTARTING.) because the
 expected leader session ID None did not equal the received leader
 session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
 2016-07-27 20:56:19,218 INFO
 org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore

how to start tuning to prevent OutOfMemory

2016-07-27 Thread Istvan Soos
Hi,

We can see an occasional OOM issue with our Flink jobs. Maybe the
input got more diverse, and the grouping has much more keys, not
really sure about that part.

How do you usually tackle these issues? We are running with
parallelism between 5-30. Would it help if we turn that down?

We do set taskmanager.heap.mb explicitly, but not the advanced config options.

Thanks,
  Istvan


Re: JobManager trying to re-submit jobs after failover

2016-07-27 Thread Ufuk Celebi
Thanks for the logs. Looking through them it's caused by this issue:
https://issues.apache.org/jira/browse/FLINK-3800. The ExecutionGraph
(Flink's internal scheduling structure) is not terminated properly and
tries to restart the job over and over again.

This is fixed for 1.1.0. Is it an option for you to upgrade to 1.1
when it's out? We might need to backport this fix for 1.0.4. The work
around is as I've described, just restart jobmanager-5.



On Wed, Jul 27, 2016 at 2:55 PM, Hironori Ogibayashi
 wrote:
> Thank you so much for your quick response.
> I am running Flink 1.0.3.
>
> I have attached jobmanager logs. The failover happend around 7/26 21:13 JST.
>
> Regards,
> Hironori
>
> 2016-07-27 21:21 GMT+09:00 Ufuk Celebi :
>> Which version of Flink are you running on? I think this might have
>> been fixed for the 1.1 release
>> (http://people.apache.org/~uce/flink-1.1.0-rc1/).
>>
>> It looks like the ExecutionGraph is still trying to restart although
>> the JobManager is not the leader anymore. If you could provide the
>> complete logs of both JobManagers, that would be helpful to be sure
>> what is happening.
>>
>> You can recover from this by restarting the respective JobManager
>> process (by running "jobmanager.sh stop" script on that machine and
>> starting again via "jobmanager.sh start cluster") .
>>
>> – Ufuk
>>
>> On Wed, Jul 27, 2016 at 2:00 PM, Hironori Ogibayashi
>>  wrote:
>>> Hello,
>>>
>>> I have standalone Flink cluster with JobManager HA.
>>> Last night, JobManager failovered because of the connection timeout to
>>> Zookeeper.
>>> Job is successfully running under new leader JobManager, but when
>>> I see the old leader JobManager log, it is trying to re-submit job and
>>> getting errors. ( for almost 24 hours now)
>>>
>>> Here is the log.
>>>
>>> -
>>> 2016-07-27 20:56:09,218 WARN
>>> org.apache.flink.runtime.jobmanager.JobManager-
>>> Discard message
>>> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
>>> 20:56:09 Job execution switched to status RESTARTING.) because the
>>> expected leader session ID None did not equal the received leader
>>> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
>>> 2016-07-27 20:56:19,218 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>> - Recovering checkpoints from ZooKeeper.
>>> 2016-07-27 20:56:19,218 WARN
>>> org.apache.flink.runtime.jobmanager.JobManager-
>>> Discard message
>>> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
>>> 20:56:19 Job execution switched to status CREATED.) because the
>>> expected leader session ID None did not equal the received leader
>>> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
>>> 2016-07-27 20:56:19,219 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>> - Found 1 checkpoints in ZooKeeper.
>>> 2016-07-27 20:56:19,221 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>>> - Initialized with Checkpoint 40229 @ 1469620528216 for
>>> 978ef000cca5a3aa6f3461274102f82c. Removing all older checkpoints.
>>> 2016-07-27 20:56:19,222 WARN
>>> org.apache.flink.runtime.jobmanager.JobManager-
>>> Discard message
>>> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
>>> 20:56:19 Job execution switched to status RUNNING.) because the
>>> expected leader session ID None did not equal the received leader
>>> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
>>> 2016-07-27 20:56:19,222 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>> Source: Custom Source (1/3) (bbdf55db0c19cc881c188bc6925929d0)
>>> switched from CREATED to SCHEDULED
>>> 2016-07-27 20:56:19,223 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>> Source: Custom Source (1/3) (bbdf55db0c19cc881c188bc6925929d0)
>>> switched from SCHEDULED to CANCELED
>>> 2016-07-27 20:56:19,223 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>> Source: Custom Source (2/3) (4c795c671ec7b548b5faac5b141c331c)
>>> switched from CREATED to CANCELED
>>> 2016-07-27 20:56:19,223 WARN
>>> org.apache.flink.runtime.jobmanager.JobManager-
>>> Discard message
>>> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
>>> 20:56:19 Job execution switched to status FAILING.) because the
>>> expected leader session ID None did not equal the received leader
>>> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
>>> 2016-07-27 20:56:19,223 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>> Source: Custom Source (3/3) (fce3b243e5b25041aafabbd93a266dbc)
>>> switched from CREATED to CANCELED
>>> 2016-07-27 20:56:19,223 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>> Source: Custom Source (1/3) (e1e5154f506901539e12b0fe8c140503)
>>> switched from 

Re: .so linkage error in Cluster

2016-07-27 Thread Debaditya Roy
Hello users,

I rebuilt the project and now on doing mvn clean package i have got two jar
files and I can run with the fat jar in the local jvm properly. However
when executing in the cluster I get error as follows:

Source: Custom Source -> Flat Map -> Sink: Unnamed(1/1) switched to FAILED
java.lang.UnsatisfiedLinkError: no jniopencv_core in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:726)
at org.bytedeco.javacpp.Loader.load(Loader.java:501)
at org.bytedeco.javacpp.Loader.load(Loader.java:418)
at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.bytedeco.javacpp.Loader.load(Loader.java:473)
at org.bytedeco.javacpp.Loader.load(Loader.java:418)
at
org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
at org.myorg.quickstart.FlinkStreamSource.run(FlinkStreamSource.java:33)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsatisfiedLinkError:
/tmp/javacpp5798629402792/libjniopencv_core.so: libgomp.so.1: cannot open
shared object file: No such file or directory
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:709)
... 14 more

07/27/2016 14:38:42Job execution switched to status FAILING.
java.lang.UnsatisfiedLinkError: no jniopencv_core in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:726)
at org.bytedeco.javacpp.Loader.load(Loader.java:501)
at org.bytedeco.javacpp.Loader.load(Loader.java:418)
at org.bytedeco.javacpp.opencv_core.(opencv_core.java:10)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.bytedeco.javacpp.Loader.load(Loader.java:473)
at org.bytedeco.javacpp.Loader.load(Loader.java:418)
at
org.bytedeco.javacpp.helper.opencv_core$AbstractArray.(opencv_core.java:109)
at org.myorg.quickstart.FlinkStreamSource.run(FlinkStreamSource.java:33)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsatisfiedLinkError:
/tmp/javacpp5798629402792/libjniopencv_core.so: libgomp.so.1: cannot open
shared object file: No such file or directory
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:709)
... 14 more

Do I need to do some more tweak to run in the cluster?

Regards,
Debaditya

On Tue, Jul 26, 2016 at 8:56 PM, Debaditya Roy  wrote:

> Hi,
>
> For the error I get this when I run the .jar made by mvn clean package
>
> java.lang.NoClassDefFoundError: org/bytedeco/javacpp/opencv_core$Mat
> at loc.video.Job.main(Job.java:29)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at 

OffHeap support in Flink

2016-07-27 Thread Aakash Agrawal
Hello,

I am comparing Flink, Spark and some other streaming frameworks to find the 
best fit for my project.

Currently we have a system, which works on single server and uses off-heap to 
save data. We now want to go distributed with streaming support.
So we have designed a rough data flow for that, and in the data flow, some 
operations need to cache data(for some purpose) before streaming it to next 
operation.
We prefer caching data in off-heap memory instead of in-heap.

I will stream in-heap tuples and the after some operation, I want to store that 
data in off-heap table.
I want to know if I can achieve this in Flink.
>From the documentation, I understand I can write data in file/db through sinks.

So my questions are -
1. can I write data off-heap(using say 'unsafe' library) through sink?
2. if yes, do I have to add sink(and then a source to stream to next operation) 
after each such operation where I want caching in my data flow?
3. is there other/better way than (2) to solve my problem?

I hope my problem is understandable. Let me know if not.

- Aakash Agrawal.





Re: No output when using event time with multiple Kafka partitions

2016-07-27 Thread Yassin Marzouki
I just tried playing with the source paralleism setting, and I got a very
strange result:

If specify the source parallism
using env.addSource(kafka).setParallelism(N), results are printed correctly
for any number N except for N=4. I guess that's related to the number of
task slots since I have a 4 CPU cores, but what is the explanation of that?
So I suppose that if I don't specify the source parallelism, it is set
automatically to 4. Isn't it supposed to be set to the number of topic
patitions (= 2) by default?


On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki 
wrote:

> Hi Kostas,
>
> When I remove the window and the apply() and put print() after 
> assignTimestampsAndWatermarks,
> the messages are printed correctly:
>
> 2> Request{ts=2015-01-01, 06:15:34:000}
> 2> Request{ts=2015-01-02, 16:38:10:000}
> 2> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-02, 19:10:00:000}
> 2> Request{ts=2015-01-02, 23:36:51:000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> ...
>
> But strangely using only one task. If I set the source parallelism to 1
> using env.addSource(kafka).setParallelism(1) (the window and the apply()
> still removed), results are printed using all available slots (number of
> CPU cores):
>
> 4> Request{ts=2015-01-01, 06:15:34:000}
> 4> Request{ts=2015-01-02, 16:38:10:000}
> 2> Request{ts=2015-01-02, 19:10:00:000}
> 4> Request{ts=2015-01-02, 23:36:51:000}
> 1> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> 3> Request{ts=2015-01-03, 17:56:42:000}
> ...
>
> Now if I keep the window and apply() with without specifying source
> parallelism, no messages are printed (only regular kafka consumer and flink
> logs), and if the source parallelism is set to 1, messages are printed
> correctly:
>
> 1> Window: TimeWindow{start=142007040, end=142015680}
> 2> Request{ts=2015-01-01, 06:15:34:000}
> 1> Request{ts=2015-01-02, 16:38:10:000}
> 4> Request{ts=2015-01-02, 19:10:00:000}
> 3> Window: TimeWindow{start=142015680, end=142024320}
> 3> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-02, 23:36:51:000}
> 3> Window: TimeWindow{start=142041600, end=142050240}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> 4> Window: TimeWindow{start=142024320, end=142032960}
> 1> Request{ts=2015-01-03, 17:56:42:000}
> 1> Request{ts=2015-01-05, 17:13:45:000}
> 4> Request{ts=2015-01-05, 01:25:55:000}
> 2> Request{ts=2015-01-05, 14:27:45:000}
> ...
>
> On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Yassine,
>>
>> Could you just remove the window and the apply, and  just put a print()
>> after the:
>>
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
>> {
>> @Override
>> public long extractAscendingTimestamp(Request req) {
>> return req.ts;
>> }
>> })
>>
>>
>> This at least will tell us if reading from Kafka works as expected.
>>
>> Kostas
>>
>> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki 
>> wrote:
>>
>> Hi everyone,
>>
>> I am reading messages from a Kafka topic with 2 partitions and using
>> event time. This is my code:
>>
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
>> {
>> @Override
>> public long extractAscendingTimestamp(Request req) {
>> return req.ts;
>> }
>> })
>> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
>> .apply((TimeWindow window, Iterable iterable, Collector
>> collector) -> {
>> collector.collect("Window: " + window.toString());
>> for (Request req : iterable) {
>> collector.collect(req.toString());
>> }
>> })
>> .print()
>>
>> I could get an output only when setting the kafka source parallelism to
>> 1. I guess that is because messages from multiple partitions arrive
>> out-of-order to the timestamp exctractor according to this thread
>> ,
>> correct?
>> So I replaced the AscendingTimestampExtractor with a
>> BoundedOutOfOrdernessGenerator as in the documentation example
>> 
>>  (with
>> a higher delay) in order to handle out-of-order events, but I still can't
>> get any output. Why is that?
>>
>> Best,
>> Yassine
>>
>>
>>
>


Re: No output when using event time with multiple Kafka partitions

2016-07-27 Thread Yassin Marzouki
Hi Kostas,

When I remove the window and the apply() and put print() after
assignTimestampsAndWatermarks,
the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1
using env.addSource(kafka).setParallelism(1) (the window and the apply()
still removed), results are printed using all available slots (number of
CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source
parallelism, no messages are printed (only regular kafka consumer and flink
logs), and if the source parallelism is set to 1, messages are printed
correctly:

1> Window: TimeWindow{start=142007040, end=142015680}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=142015680, end=142024320}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=142041600, end=142050240}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=142024320, end=142032960}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas  wrote:

> Hi Yassine,
>
> Could you just remove the window and the apply, and  just put a print()
> after the:
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })
>
>
> This at least will tell us if reading from Kafka works as expected.
>
> Kostas
>
> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki  wrote:
>
> Hi everyone,
>
> I am reading messages from a Kafka topic with 2 partitions and using event
> time. This is my code:
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })
> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
> .apply((TimeWindow window, Iterable iterable, Collector
> collector) -> {
> collector.collect("Window: " + window.toString());
> for (Request req : iterable) {
> collector.collect(req.toString());
> }
> })
> .print()
>
> I could get an output only when setting the kafka source parallelism to 1. I
> guess that is because messages from multiple partitions arrive out-of-order
> to the timestamp exctractor according to this thread
> ,
> correct?
> So I replaced the AscendingTimestampExtractor with a
> BoundedOutOfOrdernessGenerator as in the documentation example
> 
>  (with
> a higher delay) in order to handle out-of-order events, but I still can't
> get any output. Why is that?
>
> Best,
> Yassine
>
>
>


Re: JobManager trying to re-submit jobs after failover

2016-07-27 Thread Ufuk Celebi
Which version of Flink are you running on? I think this might have
been fixed for the 1.1 release
(http://people.apache.org/~uce/flink-1.1.0-rc1/).

It looks like the ExecutionGraph is still trying to restart although
the JobManager is not the leader anymore. If you could provide the
complete logs of both JobManagers, that would be helpful to be sure
what is happening.

You can recover from this by restarting the respective JobManager
process (by running "jobmanager.sh stop" script on that machine and
starting again via "jobmanager.sh start cluster") .

– Ufuk

On Wed, Jul 27, 2016 at 2:00 PM, Hironori Ogibayashi
 wrote:
> Hello,
>
> I have standalone Flink cluster with JobManager HA.
> Last night, JobManager failovered because of the connection timeout to
> Zookeeper.
> Job is successfully running under new leader JobManager, but when
> I see the old leader JobManager log, it is trying to re-submit job and
> getting errors. ( for almost 24 hours now)
>
> Here is the log.
>
> -
> 2016-07-27 20:56:09,218 WARN
> org.apache.flink.runtime.jobmanager.JobManager-
> Discard message
> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
> 20:56:09 Job execution switched to status RESTARTING.) because the
> expected leader session ID None did not equal the received leader
> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
> 2016-07-27 20:56:19,218 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2016-07-27 20:56:19,218 WARN
> org.apache.flink.runtime.jobmanager.JobManager-
> Discard message
> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
> 20:56:19 Job execution switched to status CREATED.) because the
> expected leader session ID None did not equal the received leader
> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
> 2016-07-27 20:56:19,219 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 1 checkpoints in ZooKeeper.
> 2016-07-27 20:56:19,221 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Initialized with Checkpoint 40229 @ 1469620528216 for
> 978ef000cca5a3aa6f3461274102f82c. Removing all older checkpoints.
> 2016-07-27 20:56:19,222 WARN
> org.apache.flink.runtime.jobmanager.JobManager-
> Discard message
> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
> 20:56:19 Job execution switched to status RUNNING.) because the
> expected leader session ID None did not equal the received leader
> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
> 2016-07-27 20:56:19,222 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (1/3) (bbdf55db0c19cc881c188bc6925929d0)
> switched from CREATED to SCHEDULED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (1/3) (bbdf55db0c19cc881c188bc6925929d0)
> switched from SCHEDULED to CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (2/3) (4c795c671ec7b548b5faac5b141c331c)
> switched from CREATED to CANCELED
> 2016-07-27 20:56:19,223 WARN
> org.apache.flink.runtime.jobmanager.JobManager-
> Discard message
> LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
> 20:56:19 Job execution switched to status FAILING.) because the
> expected leader session ID None did not equal the received leader
> session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (3/3) (fce3b243e5b25041aafabbd93a266dbc)
> switched from CREATED to CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (1/3) (e1e5154f506901539e12b0fe8c140503)
> switched from CREATED to CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (2/3) (f95eb0ad8fcc50e6bb9046e8700e8778)
> switched from CREATED to CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: Custom Source (3/3) (0e30de47933282533cf6dda3a22e7ddc)
> switched from CREATED to CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat
> Map (1/3) (ea260b7740d4ac8262c6500429b0ee6b) switched from CREATED to
> CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat
> Map (2/3) (cc5ab4fc296238d32078d2b4a8eb5062) switched from CREATED to
> CANCELED
> 2016-07-27 20:56:19,223 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat
> Map (3/3) (9694ae32fc12ec416197308f6a8cb3c1) switched from CREATED to
> 

Re: Sporadic exceptions when checkpointing to S3

2016-07-27 Thread Ufuk Celebi
Hey Gary,

your configuration looks good to me. I think that it could be an issue
with S3 as you suggest. It might help to decrease the checkpointing
interval (if you use case requirements allow for this) in order to
have less interaction with S3. In general, your program should still
continue as expected even when checkpoints fail intermittently.

Did you ever try DEBUG logging for
org.apache.hadoop.fs.s3a.S3AFileSystem? Maybe those give some hints
about what's happening there. If you have time to provide those logs,
I would like to take a look at them.

– Ufuk


On Wed, Jul 27, 2016 at 10:56 AM, Gary Yao  wrote:
> Hi all,
>
> I am using the filesystem state backend with checkpointing to S3.
> From the JobManager logs, I can see that it works most of the time, e.g.,
>
> 2016-07-26 17:49:07,311 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 3 @ 1469555347310
> 2016-07-26 17:49:11,128 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 3 (in 3335 ms)
>
> However, taking the checkpoint fails with the
> following exception from time to time:
>
> 2016-07-26 17:50:07,310 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1469555407310
> 2016-07-26 17:50:12,225 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> TriggerWindow(SlidingEventTimeWindows(360, 1000), 
> ListStateDescriptor{name=window-contents, defaultValue=null, 
> serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@103b8046},
>  EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:226)) -> Sink: 
> Unnamed (1/1) (0ec242b46c49039f673dc902fd983f49) switched from RUNNING to 
> FAILED
> 2016-07-26 17:50:12,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Status of job bd2930a4d6e7cf8d04d3bbafe22e386b ([...]) 
> changed to FAILING.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier
> #011at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
> #011at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
> #011at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> #011at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> #011at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> #011at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> #011at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> #011at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> #011at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Failed to fetch state handle size
> #011at 
> org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:234)
> #011at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:528)
> #011at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
> #011... 8 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 
> 403, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS 
> Error Message: Forbidden, S3 Extended Request ID: [...]
> #011at 
> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
> #011at 
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
> #011at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
> #011at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
> #011at 
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
> #011at 
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
> #011at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
> #011at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
> #011at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
> #011at 
> org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
> #011at 
> org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
> #011at 
> org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:428)
> #011at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77)
> #011at 
> org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:231)
> #011... 10 more
>
> All logs are from the 

Re: If I chain two windows, what event-time would the second window have?

2016-07-27 Thread Yassin Marzouki
Hi Kostas,

Thank you very much for the explanation.

Best,
Yassine

On Wed, Jul 27, 2016 at 1:09 PM, Kostas Kloudas  wrote:

> Hi Yassine,
>
> When the WindowFunction is applied to the content of a window, the
> timestamp of the resulting record
> is the window.maxTimestamp, which is the endOfWindow-1.
>
> You can imaging if you have a Tumbling window from 0 to 2000, the result
> will have a timestamp of 1999.
> Window boundaries are closed in the start and open at the end timestamp,
> or [start, end).
>
> If you want to play around, I would suggest checking out the tests in the
> WindowOperatorTest class.
>
> There you can do experiments and figure out how Flink’s windowOperator
> works internally and what is the
> interplay between windowAssingers, triggers, and the windowOperator.
>
> Hope this helps,
> Kostas
>
> On Jul 27, 2016, at 8:41 AM, Yassin Marzouki  wrote:
>
> Hi all,
>
> Say I assign timestamps to a stream and then apply a transformation like
> this:
>
>
> stream.keyBy(0).timeWindow(Time.hours(5)).reduce(count).timeWindowAll(Time.days(1)).apply(transformation)
>
> Now, when the first window is applied, events are aggregated based on
> their timestamps, but I don't understand what timestamp will be assigned to
> the aggregated result of the reduce operation for the second window to
> process it. Could you please explain it? Thank you.
>
> Best,
> Yassine
>
>
>


JobManager trying to re-submit jobs after failover

2016-07-27 Thread Hironori Ogibayashi
Hello,

I have standalone Flink cluster with JobManager HA.
Last night, JobManager failovered because of the connection timeout to
Zookeeper.
Job is successfully running under new leader JobManager, but when
I see the old leader JobManager log, it is trying to re-submit job and
getting errors. ( for almost 24 hours now)

Here is the log.

-
2016-07-27 20:56:09,218 WARN
org.apache.flink.runtime.jobmanager.JobManager-
Discard message
LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
20:56:09 Job execution switched to status RESTARTING.) because the
expected leader session ID None did not equal the received leader
session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
2016-07-27 20:56:19,218 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
- Recovering checkpoints from ZooKeeper.
2016-07-27 20:56:19,218 WARN
org.apache.flink.runtime.jobmanager.JobManager-
Discard message
LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
20:56:19 Job execution switched to status CREATED.) because the
expected leader session ID None did not equal the received leader
session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
2016-07-27 20:56:19,219 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
- Found 1 checkpoints in ZooKeeper.
2016-07-27 20:56:19,221 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
- Initialized with Checkpoint 40229 @ 1469620528216 for
978ef000cca5a3aa6f3461274102f82c. Removing all older checkpoints.
2016-07-27 20:56:19,222 WARN
org.apache.flink.runtime.jobmanager.JobManager-
Discard message
LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
20:56:19 Job execution switched to status RUNNING.) because the
expected leader session ID None did not equal the received leader
session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
2016-07-27 20:56:19,222 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (1/3) (bbdf55db0c19cc881c188bc6925929d0)
switched from CREATED to SCHEDULED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (1/3) (bbdf55db0c19cc881c188bc6925929d0)
switched from SCHEDULED to CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (2/3) (4c795c671ec7b548b5faac5b141c331c)
switched from CREATED to CANCELED
2016-07-27 20:56:19,223 WARN
org.apache.flink.runtime.jobmanager.JobManager-
Discard message
LeaderSessionMessage(54757d58-64d0-4118-a4d3-5f089287f1e4,07/27/2016
20:56:19 Job execution switched to status FAILING.) because the
expected leader session ID None did not equal the received leader
session ID Some(54757d58-64d0-4118-a4d3-5f089287f1e4).
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (3/3) (fce3b243e5b25041aafabbd93a266dbc)
switched from CREATED to CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (1/3) (e1e5154f506901539e12b0fe8c140503)
switched from CREATED to CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (2/3) (f95eb0ad8fcc50e6bb9046e8700e8778)
switched from CREATED to CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: Custom Source (3/3) (0e30de47933282533cf6dda3a22e7ddc)
switched from CREATED to CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat
Map (1/3) (ea260b7740d4ac8262c6500429b0ee6b) switched from CREATED to
CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat
Map (2/3) (cc5ab4fc296238d32078d2b4a8eb5062) switched from CREATED to
CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat
Map (3/3) (9694ae32fc12ec416197308f6a8cb3c1) switched from CREATED to
CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
TriggerWindow(GlobalWindows(),
FoldingStateDescriptor{name=window-contents,
defaultValue=ViewerCountHll(0,0,,com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@1),
serializer=null}, LiveContinuousProcessingTimeTriggerGlobal(1),
WindowedStream.fold(WindowedStream.java:207)) -> Filter -> Map ->
Filter -> Sink: Unnamed (1/3) (9c6b27873b6ddec58ce3f82f62632152)
switched from CREATED to CANCELED
2016-07-27 20:56:19,223 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
TriggerWindow(GlobalWindows(),
FoldingStateDescriptor{name=window-contents,
defaultValue=ViewerCountHll(0,0,,com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@1),
serializer=null}, 

Re: fault tolerance: suspend and resume?

2016-07-27 Thread Ufuk Celebi
Yes, the back pressure behaviour you describe is correct. With
checkpointing enable, the job should resume as soon as the sink can
contact the backend service again (you would see that the job fails
many times until the service is live again, but at the end it should
work). You can control the restart behaviour via the restart
strategies 
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html).

For jobs that are in state RUNNING you can trigger savepoints
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html).
This does not work for jobs in any other state (e.g.
finished/failed/cancelled etc.) yet, so it wouldn't work after your
job has thrown the Exception. Savepoints are triggered from the
outside of your application via the CLI. You might want to write a
script that triggers savepoints periodically and which monitors your
Flink job and backend service. If the Flink job fails, the script
would wait for the backend service to be live again and then re-submit
the job from the most recent savepoint. Maybe this helps as a work
around.

– Ufuk

On Wed, Jul 27, 2016 at 10:10 AM, Istvan Soos  wrote:
> Hi,
>
> I was wondering how Flink's fault tolerance works, because this page
> is short on the details:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/fault_tolerance.html
>
> My environment has a backend service that may be out for a couple of
> hours (sad, but working on fixing that). I have a sink that would like
> to write to that service, and in such cases it throws an exception.
> This brings the process down and I need to manually intervene to get
> it up and running again.
>
> I was thinking to rewrite the sink to loop until it is able to write
> the data (and have a multi-hour long tolarence before it throws an
> exception). I hope that it will create a backpressure on the process,
> "suspend" the processing and "resume" it when the backend service goes
> up again.
>
> Am I right with that assumption? Is there a better way to make
> suspending and resuming automatic?
>
> Thanks,
>   Istvan


Re: No output when using event time with multiple Kafka partitions

2016-07-27 Thread Kostas Kloudas
Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after 
the:
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })

This at least will tell us if reading from Kafka works as expected.

Kostas

> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki  wrote:
> 
> Hi everyone,
> 
> I am reading messages from a Kafka topic with 2 partitions and using event 
> time. This is my code:
> 
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })
> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
> .apply((TimeWindow window, Iterable iterable, Collector 
> collector) -> {
> collector.collect("Window: " + window.toString());
> for (Request req : iterable) {
> collector.collect(req.toString());
> }
> })
> .print()
> 
> I could get an output only when setting the kafka source parallelism to 1. I 
> guess that is because messages from multiple partitions arrive out-of-order 
> to the timestamp exctractor according to this thread 
> ,
>  correct?
> So I replaced the AscendingTimestampExtractor with a 
> BoundedOutOfOrdernessGenerator as in the documentation example 
> 
>  (with a higher delay) in order to handle out-of-order events, but I still 
> can't get any output. Why is that?
> 
> Best,
> Yassine
> 



Re: If I chain two windows, what event-time would the second window have?

2016-07-27 Thread Kostas Kloudas
Hi Yassine,

When the WindowFunction is applied to the content of a window, the timestamp of 
the resulting record
is the window.maxTimestamp, which is the endOfWindow-1.

You can imaging if you have a Tumbling window from 0 to 2000, the result will 
have a timestamp of 1999.
Window boundaries are closed in the start and open at the end timestamp, or 
[start, end).

If you want to play around, I would suggest checking out the tests in the 
WindowOperatorTest class.

There you can do experiments and figure out how Flink’s windowOperator works 
internally and what is the 
interplay between windowAssingers, triggers, and the windowOperator.

Hope this helps,
Kostas

> On Jul 27, 2016, at 8:41 AM, Yassin Marzouki  wrote:
> 
> Hi all,
> 
> Say I assign timestamps to a stream and then apply a transformation like this:
> 
> stream.keyBy(0).timeWindow(Time.hours(5)).reduce(count).timeWindowAll(Time.days(1)).apply(transformation)
> 
> Now, when the first window is applied, events are aggregated based on their 
> timestamps, but I don't understand what timestamp will be assigned to the 
> aggregated result of the reduce operation for the second window to process 
> it. Could you please explain it? Thank you.
> 
> Best,
> Yassine



AW: Performance issues with GroupBy?

2016-07-27 Thread Paschek, Robert
Hi Gábor, hi Ufuk, hi Greg,

thank you for your very helpful responses!


> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

While implementing my reducers I didn’t thought, that combining is applicable, 
‘cause each Mapper will produce each key only one time. I didn’t think of the 
factor, that some mappers running on the same machine and therefore will 
benefit from precombining before shuffling
After I implemented the combiner mentioned here
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#combinable-groupreducefunctions
the difference between the runtime of these algorithms with and without a 
combiner decreased. Thank you for the hint!

On curios thing: My Reducer receives BroadcastVariables and access them in the 
open() Method.
When the combine() Method is called, the BroadCast Variable seems not set yet: 
I got an explicit error messages within the open() method. Is this a potential 
bug? Using Apache Flink 1.0.3.

To avoid changing my reducers, I’am wondering, if I should implement a 
GroupCombineFunction independent from the reducer instead:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#groupcombine-on-a-grouped-dataset

As far as I understand, this will work similar?

-  The following transformation - especially the groupBy() - will run 
first on each local Machine:
 combinedWords = input
 .groupBy(0)
 .combineGroup(new GroupCombineFunction())


-  And then, the following transformation will shuffle the data within 
the 2nd groupBy() over the network:

 output = combinedWords

  .groupBy(0);

  .reduceGroup(new GroupReduceFunction())



> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)
I have to iterate through each tuple multiple time and the final result can 
only be emitted after the last tuple is processed, so I think, I can’t use a 
reduce.


> This could be further amplified by the blocking intermediate results, which 
> have a very simplistic implementation writing out many different files, which 
> can lead to a lot of random I/O.

Thank you for theses technical explanation. I will mentioned it in my 
evaluation!

> Are you able to simplify the your function input / output types? Flink 
> aggressively serializes the data stream and complex types such as ArrayList 
> and BitSet will be much slower to process. Are you able to reconstruct the 
> lists to be groupings on elements?
For my intention, to simulate an Apache Hadoop MapReduce like behaviour, I 
would say that my current implementation fits.
I will think about rewriting the code after the first Benchmarks to potential 
reveal advantages of Apache Flink in comparison for Hadoop MapReduce for the 
algorithms I implemented.

Thanks again!
Robert


Von: Greg Hogan [mailto:c...@greghogan.com]
Gesendet: Dienstag, 26. Juli 2016 18:57
An: user@flink.apache.org
Betreff: Re: Performance issues with GroupBy?

Hi Robert,
Are you able to simplify the your function input / output types? Flink 
aggressively serializes the data stream and complex types such as ArrayList and 
BitSet will be much slower to process. Are you able to reconstruct the lists to 
be groupings on elements?
Greg


-Ursprüngliche Nachricht-
Von: Ufuk Celebi [mailto:u...@apache.org]
Gesendet: Dienstag, 26. Juli 2016 11:53
An: user@flink.apache.org
Betreff: Re: Performance issues with GroupBy?



+1 to what Gavor said. The hash combine will be part of the upcoming

1.1. release, too.



This could be further amplified by the blocking intermediate results, which 
have a very simplistic implementation writing out many different files, which 
can lead to a lot of random I/O.



– Ufuk



On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay 
> wrote:

> Hello Robert,

>

>> Is there something I might could do to optimize the grouping?

>

> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

>

> Alternatively, if you can reformulate your algorithm to 

fault tolerance: suspend and resume?

2016-07-27 Thread Istvan Soos
Hi,

I was wondering how Flink's fault tolerance works, because this page
is short on the details:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/fault_tolerance.html

My environment has a backend service that may be out for a couple of
hours (sad, but working on fixing that). I have a sink that would like
to write to that service, and in such cases it throws an exception.
This brings the process down and I need to manually intervene to get
it up and running again.

I was thinking to rewrite the sink to loop until it is able to write
the data (and have a multi-hour long tolarence before it throws an
exception). I hope that it will create a backpressure on the process,
"suspend" the processing and "resume" it when the backend service goes
up again.

Am I right with that assumption? Is there a better way to make
suspending and resuming automatic?

Thanks,
  Istvan


If I chain two windows, what event-time would the second window have?

2016-07-27 Thread Yassin Marzouki
Hi all,

Say I assign timestamps to a stream and then apply a transformation like
this:

stream.keyBy(0).timeWindow(Time.hours(5)).reduce(count).timeWindowAll(Time.days(1)).apply(transformation)

Now, when the first window is applied, events are aggregated based on their
timestamps, but I don't understand what timestamp will be assigned to the
aggregated result of the reduce operation for the second window to process
it. Could you please explain it? Thank you.

Best,
Yassine