Re: [DISCUSS] Flink 1.6 features

2018-06-05 Thread Rico Bergmann
+1 on K8s integration 



> Am 06.06.2018 um 00:01 schrieb Hao Sun :
> 
> adding my vote to K8S Job mode, maybe it is this?
> > Smoothen the integration in Container environment, like "Flink as a 
> > Library", and easier integration with Kubernetes services and other proxies.
> 
> 
> 
>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan  wrote:
>> Hi Stephan,
>> 
>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]  (Per-partition 
>> watermarks in FlinkKafkaConsumer should consider idle partitions) be 
>> included in 1.6? As we are seeing more users with this issue on the mailing 
>> lists.
>> 
>> Thanks.
>> Ben
>> 
>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>> Hi Stephan,
>>> 
>>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included in 
>>> 1.6? There were discussions about possibly including it in 1.6: 
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e
>>> 
>>> Thanks,
>>> Shirley Shum
>>> 
>>> Stephan Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of 
>>> Apache Flink 1.5 has happened (yay!) - so it is a good time
>>> 
>>> From: Stephan Ewen 
>>> To: d...@flink.apache.org, user 
>>> Date: 06/04/2018 02:21 AM
>>> Subject: [DISCUSS] Flink 1.6 features
>>> 
>>> 
>>> 
>>> 
>>> Hi Flink Community!
>>> 
>>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good time 
>>> to start talking about what to do for release 1.6.
>>> 
>>> == Suggested release timeline ==
>>> 
>>> I would propose to release around end of July (that is 8-9 weeks from now).
>>> 
>>> The rational behind that: There was a lot of effort in release testing 
>>> automation (end-to-end tests, scripted stress tests) as part of release 
>>> 1.5. You may have noticed the big set of new modules under 
>>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5 
>>> release a bit, and needs to continue as part of the coming release cycle, 
>>> but should help make releasing more lightweight from now on.
>>> 
>>> (Side note: There are also some nightly stress tests that we created and 
>>> run at data Artisans, and where we are looking whether and in which way it 
>>> would make sense to contribute them to Flink.)
>>> 
>>> == Features and focus areas ==
>>> 
>>> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new 
>>> network stack, recovery, SQL joins and client, ... Following something like 
>>> a "tick-tock-model", I would suggest to focus the next release more on 
>>> integrations, tooling, and reducing user friction. 
>>> 
>>> Of course, this does not mean that no other pull request gets reviewed, an 
>>> no other topic will be examined - it is simply meant as a help to 
>>> understand where to expect more activity during the next release cycle. 
>>> Note that these are really the coarse focus areas - don't read this as a 
>>> comprehensive list.
>>> 
>>> This list is my first suggestion, based on discussions with committers, 
>>> users, and mailing list questions.
>>> 
>>>   - Support Java 9 and Scala 2.12
>>>   
>>>   - Smoothen the integration in Container environment, like "Flink as a 
>>> Library", and easier integration with Kubernetes services and other proxies.
>>>   
>>>   - Polish the remaing parts of the FLIP-6 rewrite
>>> 
>>>   - Improve state backends with asynchronous timer snapshots, efficient 
>>> timer deletes, state TTL, and broadcast state support in RocksDB.
>>> 
>>>   - Extends Streaming Sinks:
>>>  - Bucketing Sink should support S3 properly (compensate for eventual 
>>> consistency), work with Flink's shaded S3 file systems, and efficiently 
>>> support formats that compress/index arcoss individual rows (Parquet, ORC, 
>>> ...)
>>>  - Support ElasticSearch's new REST API
>>> 
>>>   - Smoothen State Evolution to support type conversion on snapshot restore
>>>   
>>>   - Enhance Stream SQL and CEP
>>>  - Add support for "update by key" Table Sources
>>>  - Add more table sources and sinks (Kafka, Kinesis, Files, K/V stores)
>>>  - Expand SQL client
>>>  - Integrate CEP and SQL, through MATCH_RECOGNIZE clause
>>>  - Improve CEP Performance of SharedBuffer on RocksDB
>>> 
>>> 
>>> 
>> 


Re: Flink 1.4.2 in Zeppelin Notebook

2018-04-10 Thread Rico Bergmann
FYI: I finally managed to get the new Flink version running in Zeppelin.
Besides adding the parameters mentioned below you have to build Zeppelin
with profile scala-2.11 and the new Flink version 1.4.2.


Best,

Rico.


Am 09.04.2018 um 14:43 schrieb Rico Bergmann:
>
> The error message is:
>
> org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Communication with JobManager failed: Lost
> connection to the JobManager.
>
>
>
> Am 09.04.2018 um 14:12 schrieb kedar mhaswade:
>> Hmm. What error do you see on the Zeppelin console when you click the
>> run (flink code) button after making these changes for flink
>> interpreter config (I assume you restart the interpreter)?
>>
>> Regards,
>> Kedar
>>
>> On Mon, Apr 9, 2018 at 12:50 AM, Rico Bergmann <i...@ricobergmann.de
>> <mailto:i...@ricobergmann.de>> wrote:
>>
>> Hi. 
>>
>> Thanks for your reply. But this also didn’t work for me. 
>>
>> In the JM log I get an akka Error („dropping message for
>> non-local recipient“). 
>>
>> My setup: I have Flink running on Kubernetes cluster, version
>> 1.4.2. zeppelin is version 0.8 using the flink interpreter
>> compiled against flink 1.1.3. 
>> When submitting a job with the CLI tool everything is working
>> fine. The CLI tool is version 1.4.2 ...
>>
>> Any other suggestions?
>>
>> Thanks a lot. 
>> Best,
>> Rico. 
>>
>>
>> Am 06.04.2018 um 18:44 schrieb kedar mhaswade
>> <kedar.mhasw...@gmail.com <mailto:kedar.mhasw...@gmail.com>>:
>>
>>> Yes. You need to add the two properties for the job manager (I
>>> agree, it is confusing because the properties named "host" and
>>> "port" already available, but the names of the useful properties
>>> are different):
>>>
>>> Could you please try this and let us know if it works for you?
>>>
>>> Regards,
>>> Kedar
>>>
>>>
>>> On Fri, Apr 6, 2018 at 5:51 AM, Dipl.-Inf. Rico Bergmann
>>> <i...@ricobergmann.de <mailto:i...@ricobergmann.de>> wrote:
>>>
>>> Hi!
>>>
>>> Has someone successfully integrated Flink 1.4.2 into
>>> Zeppelin notebook (using Flink in cluster mode, not local mode)?
>>>
>>> Best,
>>>
>>> Rico.
>>>
>>>
>>
>



Re: Flink 1.4.2 in Zeppelin Notebook

2018-04-09 Thread Rico Bergmann
The error message is:

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Communication with JobManager failed: Lost connection
to the JobManager.



Am 09.04.2018 um 14:12 schrieb kedar mhaswade:
> Hmm. What error do you see on the Zeppelin console when you click the
> run (flink code) button after making these changes for flink
> interpreter config (I assume you restart the interpreter)?
>
> Regards,
> Kedar
>
> On Mon, Apr 9, 2018 at 12:50 AM, Rico Bergmann <i...@ricobergmann.de
> <mailto:i...@ricobergmann.de>> wrote:
>
> Hi. 
>
> Thanks for your reply. But this also didn’t work for me. 
>
> In the JM log I get an akka Error („dropping message for non-local
> recipient“). 
>
> My setup: I have Flink running on Kubernetes cluster, version
> 1.4.2. zeppelin is version 0.8 using the flink interpreter
> compiled against flink 1.1.3. 
> When submitting a job with the CLI tool everything is working
> fine. The CLI tool is version 1.4.2 ...
>
> Any other suggestions?
>
> Thanks a lot. 
> Best,
> Rico. 
>
>
> Am 06.04.2018 um 18:44 schrieb kedar mhaswade
> <kedar.mhasw...@gmail.com <mailto:kedar.mhasw...@gmail.com>>:
>
>> Yes. You need to add the two properties for the job manager (I
>> agree, it is confusing because the properties named "host" and
>> "port" already available, but the names of the useful properties
>> are different):
>>
>> Could you please try this and let us know if it works for you?
>>
>> Regards,
>> Kedar
>>
>>
>> On Fri, Apr 6, 2018 at 5:51 AM, Dipl.-Inf. Rico Bergmann
>> <i...@ricobergmann.de <mailto:i...@ricobergmann.de>> wrote:
>>
>> Hi!
>>
>> Has someone successfully integrated Flink 1.4.2 into Zeppelin
>> notebook (using Flink in cluster mode, not local mode)?
>>
>> Best,
>>
>> Rico.
>>
>>
>



Re: Flink 1.4.2 in Zeppelin Notebook

2018-04-09 Thread Rico Bergmann
Hi. 

Thanks for your reply. But this also didn’t work for me. 

In the JM log I get an akka Error („dropping message for non-local recipient“). 

My setup: I have Flink running on Kubernetes cluster, version 1.4.2. zeppelin 
is version 0.8 using the flink interpreter compiled against flink 1.1.3. 
When submitting a job with the CLI tool everything is working fine. The CLI 
tool is version 1.4.2 ...

Any other suggestions?

Thanks a lot. 
Best,
Rico. 


> Am 06.04.2018 um 18:44 schrieb kedar mhaswade <kedar.mhasw...@gmail.com>:
> 
> Yes. You need to add the two properties for the job manager (I agree, it is 
> confusing because the properties named "host" and "port" already available, 
> but the names of the useful properties are different):
> 
> 
> Could you please try this and let us know if it works for you?
> 
> Regards,
> Kedar
> 
> 
>> On Fri, Apr 6, 2018 at 5:51 AM, Dipl.-Inf. Rico Bergmann 
>> <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> Has someone successfully integrated Flink 1.4.2 into Zeppelin notebook 
>> (using Flink in cluster mode, not local mode)?
>> 
>> Best,
>> 
>> Rico.
>> 
> 


Flink 1.4.2 in Zeppelin Notebook

2018-04-06 Thread Dipl.-Inf. Rico Bergmann

Hi!

Has someone successfully integrated Flink 1.4.2 into Zeppelin notebook 
(using Flink in cluster mode, not local mode)?


Best,

Rico.



Re: Performance Issue

2015-09-24 Thread Rico Bergmann
I took a first glance. 

I ran 2 test setups. One with a limited test data generator, the outputs around 
200 events per second. In this setting the new implementation keeps up with the 
incoming message rate. 

The other setup had an unlimited generation (at highest possible rate). There 
the same problem as before can be observed. After 2 minutes runtime the output 
of my program is more than a minute behind ... And increasing over time. But I 
don't know whether this could be a setup problem. I noticed the os load of my 
testsystem was around 90%. So it might be more a setup problem ...

Thanks for your support so far. 

Cheers. Rico. 





> Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Rico,
> you should be able to get it with these steps:
> 
> git clone https://github.com/StephanEwen/incubator-flink.git flink
> cd flink
> git checkout -t origin/windows
> 
> This will get you on Stephan's windowing branch. Then you can do a
> 
> mvn clean install -DskipTests
> 
> to build it.
> 
> I will merge his stuff later today, then you should also be able to use it by 
> running the 0.10-SNAPSHOT version.
> 
> Cheers,
> Aljoscha
> 
> 
>> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> Sounds great. How can I get the source code before it's merged to the master 
>> branch? Unfortunately I only have 2 days left for trying this out ...
>> 
>> Greets. Rico. 
>> 
>> 
>> 
>>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen <se...@apache.org>:
>>> 
>>> Hi Rico!
>>> 
>>> We have finished the first part of the Window API reworks. You can find the 
>>> code here: https://github.com/apache/flink/pull/1175
>>> 
>>> It should fix the issues and offer vastly improved performance (up to 50x 
>>> faster). For now, it supports time windows, but we will support the other 
>>> cases in the next days.
>>> 
>>> I'll ping you once it is merged, I'd be curious if it fixes your issue. 
>>> Sorry that you ran into this problem...
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> 
>>>> wrote:
>>>> Hi!
>>>> 
>>>> While working with grouping and windowing I encountered a strange 
>>>> behavior. I'm doing:
>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>> 
>>>> When I run the program containing this snippet it initially outputs data 
>>>> at a rate around 150 events per sec. (That is roughly the input rate for 
>>>> the program). After about 10-30 minutes the rate drops down below 5 events 
>>>> per sec. This leads to event delivery offsets getting bigger and bigger 
>>>> ... 
>>>> 
>>>> Any explanation for this? I know you are reworking the streaming API. But 
>>>> it would be useful to know, why this happens ...
>>>> 
>>>> Cheers. Rico. 


Re: Performance Issue

2015-09-24 Thread Rico Bergmann
Hi!

Sounds great. How can I get the source code before it's merged to the master 
branch? Unfortunately I only have 2 days left for trying this out ...

Greets. Rico. 



> Am 24.09.2015 um 00:57 schrieb Stephan Ewen <se...@apache.org>:
> 
> Hi Rico!
> 
> We have finished the first part of the Window API reworks. You can find the 
> code here: https://github.com/apache/flink/pull/1175
> 
> It should fix the issues and offer vastly improved performance (up to 50x 
> faster). For now, it supports time windows, but we will support the other 
> cases in the next days.
> 
> I'll ping you once it is merged, I'd be curious if it fixes your issue. Sorry 
> that you ran into this problem...
> 
> Greetings,
> Stephan
> 
> 
>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> While working with grouping and windowing I encountered a strange behavior. 
>> I'm doing:
>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>> 
>> When I run the program containing this snippet it initially outputs data at 
>> a rate around 150 events per sec. (That is roughly the input rate for the 
>> program). After about 10-30 minutes the rate drops down below 5 events per 
>> sec. This leads to event delivery offsets getting bigger and bigger ... 
>> 
>> Any explanation for this? I know you are reworking the streaming API. But it 
>> would be useful to know, why this happens ...
>> 
>> Cheers. Rico. 
> 


Re: Performance Issue

2015-09-24 Thread Rico Bergmann
The test data is generated in a flink program running in a separate jvm. The 
generated data is then written to a Kafka topic from which my programs reads 
the data ...



> Am 24.09.2015 um 14:53 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Rico,
> are you generating the data directly in your flink program or some external 
> queue, such as Kafka?
> 
> Cheers,
> Aljoscha
> 
>> On Thu, 24 Sep 2015 at 13:47 Rico Bergmann <i...@ricobergmann.de> wrote:
>> And as side note:
>> 
>> The problem with duplicates seems also to be solved!
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 24.09.2015 um 12:21 schrieb Rico Bergmann <i...@ricobergmann.de>:
>>> 
>>> I took a first glance. 
>>> 
>>> I ran 2 test setups. One with a limited test data generator, the outputs 
>>> around 200 events per second. In this setting the new implementation keeps 
>>> up with the incoming message rate. 
>>> 
>>> The other setup had an unlimited generation (at highest possible rate). 
>>> There the same problem as before can be observed. After 2 minutes runtime 
>>> the output of my program is more than a minute behind ... And increasing 
>>> over time. But I don't know whether this could be a setup problem. I 
>>> noticed the os load of my testsystem was around 90%. So it might be more a 
>>> setup problem ...
>>> 
>>> Thanks for your support so far. 
>>> 
>>> Cheers. Rico. 
>>> 
>>> 
>>> 
>>> 
>>> 
>>>> Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>> 
>>>> Hi Rico,
>>>> you should be able to get it with these steps:
>>>> 
>>>> git clone https://github.com/StephanEwen/incubator-flink.git flink
>>>> cd flink
>>>> git checkout -t origin/windows
>>>> 
>>>> This will get you on Stephan's windowing branch. Then you can do a
>>>> 
>>>> mvn clean install -DskipTests
>>>> 
>>>> to build it.
>>>> 
>>>> I will merge his stuff later today, then you should also be able to use it 
>>>> by running the 0.10-SNAPSHOT version.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>>>> 
>>>>> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann <i...@ricobergmann.de> wrote:
>>>>> Hi!
>>>>> 
>>>>> Sounds great. How can I get the source code before it's merged to the 
>>>>> master branch? Unfortunately I only have 2 days left for trying this out 
>>>>> ...
>>>>> 
>>>>> Greets. Rico. 
>>>>> 
>>>>> 
>>>>> 
>>>>>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen <se...@apache.org>:
>>>>>> 
>>>>>> Hi Rico!
>>>>>> 
>>>>>> We have finished the first part of the Window API reworks. You can find 
>>>>>> the code here: https://github.com/apache/flink/pull/1175
>>>>>> 
>>>>>> It should fix the issues and offer vastly improved performance (up to 
>>>>>> 50x faster). For now, it supports time windows, but we will support the 
>>>>>> other cases in the next days.
>>>>>> 
>>>>>> I'll ping you once it is merged, I'd be curious if it fixes your issue. 
>>>>>> Sorry that you ran into this problem...
>>>>>> 
>>>>>> Greetings,
>>>>>> Stephan
>>>>>> 
>>>>>> 
>>>>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> 
>>>>>>> wrote:
>>>>>>> Hi!
>>>>>>> 
>>>>>>> While working with grouping and windowing I encountered a strange 
>>>>>>> behavior. I'm doing:
>>>>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>>>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>>>>> 
>>>>>>> When I run the program containing this snippet it initially outputs 
>>>>>>> data at a rate around 150 events per sec. (That is roughly the input 
>>>>>>> rate for the program). After about 10-30 minutes the rate drops down 
>>>>>>> below 5 events per sec. This leads to event delivery offsets getting 
>>>>>>> bigger and bigger ... 
>>>>>>> 
>>>>>>> Any explanation for this? I know you are reworking the streaming API. 
>>>>>>> But it would be useful to know, why this happens ...
>>>>>>> 
>>>>>>> Cheers. Rico. 


Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Hi!

I also think it's a GC problem. In the KeySelector I don't instantiate any 
object. It's a simple toString method call. 
In the mapWindow I create new objects. But I'm doing the same in other map 
operators, too. They don't slow down the execution. Only with this construct 
the execution is slowed down. 

I watched on the memory footprint of my program. Once with the code construct I 
wrote and once without. The memory characteristic were the same. The CPU usage 
also ... 

I don't have an explanation. But I don't think it comes from my operator 
functions ...

Cheers Rico. 



> Am 07.09.2015 um 22:43 schrieb Martin Neumann <mneum...@sics.se>:
> 
> Hej,
> 
> This sounds like it could be a garbage collection problem. Do you instantiate 
> any classes inside any of the operators (e.g. in the KeySelector). You can 
> also try to run it locally and use something like jstat to rule this out.
> 
> cheers Martin
> 
>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> While working with grouping and windowing I encountered a strange behavior. 
>> I'm doing:
>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>> 
>> When I run the program containing this snippet it initially outputs data at 
>> a rate around 150 events per sec. (That is roughly the input rate for the 
>> program). After about 10-30 minutes the rate drops down below 5 events per 
>> sec. This leads to event delivery offsets getting bigger and bigger ... 
>> 
>> Any explanation for this? I know you are reworking the streaming API. But it 
>> would be useful to know, why this happens ...
>> 
>> Cheers. Rico. 
> 


Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Where can I find these information? I can see the memory usage and cpu load. 
But where are the information on the GC?



> Am 08.09.2015 um 09:34 schrieb Robert Metzger <rmetz...@apache.org>:
> 
> The webinterface of Flink has a tab for the TaskManagers. There, you can also 
> see how much time the JVM spend with garbage collection.
> Can you check whether the number of GC calls + the time spend goes up after 
> 30 minutes?
> 
>> On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> I also think it's a GC problem. In the KeySelector I don't instantiate any 
>> object. It's a simple toString method call. 
>> In the mapWindow I create new objects. But I'm doing the same in other map 
>> operators, too. They don't slow down the execution. Only with this construct 
>> the execution is slowed down. 
>> 
>> I watched on the memory footprint of my program. Once with the code 
>> construct I wrote and once without. The memory characteristic were the same. 
>> The CPU usage also ... 
>> 
>> I don't have an explanation. But I don't think it comes from my operator 
>> functions ...
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 07.09.2015 um 22:43 schrieb Martin Neumann <mneum...@sics.se>:
>>> 
>>> Hej,
>>> 
>>> This sounds like it could be a garbage collection problem. Do you 
>>> instantiate any classes inside any of the operators (e.g. in the 
>>> KeySelector). You can also try to run it locally and use something like 
>>> jstat to rule this out.
>>> 
>>> cheers Martin
>>> 
>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> 
>>>> wrote:
>>>> Hi!
>>>> 
>>>> While working with grouping and windowing I encountered a strange 
>>>> behavior. I'm doing:
>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>> 
>>>> When I run the program containing this snippet it initially outputs data 
>>>> at a rate around 150 events per sec. (That is roughly the input rate for 
>>>> the program). After about 10-30 minutes the rate drops down below 5 events 
>>>> per sec. This leads to event delivery offsets getting bigger and bigger 
>>>> ... 
>>>> 
>>>> Any explanation for this? I know you are reworking the streaming API. But 
>>>> it would be useful to know, why this happens ...
>>>> 
>>>> Cheers. Rico. 
> 


Re: Performance Issue

2015-09-08 Thread Rico Bergmann
The marksweep value is very high, the scavenge very low. If this helps ;-)




> Am 08.09.2015 um 11:27 schrieb Robert Metzger <rmetz...@apache.org>:
> 
> It is in the "Information" column: http://i.imgur.com/rzxxURR.png
> In the screenshot, the two GCs only spend 84 and 25 ms.
> 
>> On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> Where can I find these information? I can see the memory usage and cpu load. 
>> But where are the information on the GC?
>> 
>> 
>> 
>>> Am 08.09.2015 um 09:34 schrieb Robert Metzger <rmetz...@apache.org>:
>>> 
>>> The webinterface of Flink has a tab for the TaskManagers. There, you can 
>>> also see how much time the JVM spend with garbage collection.
>>> Can you check whether the number of GC calls + the time spend goes up after 
>>> 30 minutes?
>>> 
>>>> On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann <i...@ricobergmann.de> wrote:
>>>> Hi!
>>>> 
>>>> I also think it's a GC problem. In the KeySelector I don't instantiate any 
>>>> object. It's a simple toString method call. 
>>>> In the mapWindow I create new objects. But I'm doing the same in other map 
>>>> operators, too. They don't slow down the execution. Only with this 
>>>> construct the execution is slowed down. 
>>>> 
>>>> I watched on the memory footprint of my program. Once with the code 
>>>> construct I wrote and once without. The memory characteristic were the 
>>>> same. The CPU usage also ... 
>>>> 
>>>> I don't have an explanation. But I don't think it comes from my operator 
>>>> functions ...
>>>> 
>>>> Cheers Rico. 
>>>> 
>>>> 
>>>> 
>>>>> Am 07.09.2015 um 22:43 schrieb Martin Neumann <mneum...@sics.se>:
>>>>> 
>>>>> Hej,
>>>>> 
>>>>> This sounds like it could be a garbage collection problem. Do you 
>>>>> instantiate any classes inside any of the operators (e.g. in the 
>>>>> KeySelector). You can also try to run it locally and use something like 
>>>>> jstat to rule this out.
>>>>> 
>>>>> cheers Martin
>>>>> 
>>>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> 
>>>>>> wrote:
>>>>>> Hi!
>>>>>> 
>>>>>> While working with grouping and windowing I encountered a strange 
>>>>>> behavior. I'm doing:
>>>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>>>> 
>>>>>> When I run the program containing this snippet it initially outputs data 
>>>>>> at a rate around 150 events per sec. (That is roughly the input rate for 
>>>>>> the program). After about 10-30 minutes the rate drops down below 5 
>>>>>> events per sec. This leads to event delivery offsets getting bigger and 
>>>>>> bigger ... 
>>>>>> 
>>>>>> Any explanation for this? I know you are reworking the streaming API. 
>>>>>> But it would be useful to know, why this happens ...
>>>>>> 
>>>>>> Cheers. Rico. 
> 


Re: Performance Issue

2015-09-08 Thread Rico Bergmann
I also see in the TM overview the CPU load is still around 25% although there 
is no input to the program since minutes. The CPU load is degrading very 
slowly. 

The memory consumption is still fluctuating at a high level. It does not 
degrade. 

In my test I generated test input for 1 minute. Now 10 minutes are over ... 

I think there must be something with flink...



> Am 08.09.2015 um 13:32 schrieb Rico Bergmann <i...@ricobergmann.de>:
> 
> The marksweep value is very high, the scavenge very low. If this helps ;-)
> 
> 
> 
> 
>> Am 08.09.2015 um 11:27 schrieb Robert Metzger <rmetz...@apache.org>:
>> 
>> It is in the "Information" column: http://i.imgur.com/rzxxURR.png
>> In the screenshot, the two GCs only spend 84 and 25 ms.
>> 
>>> On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann <i...@ricobergmann.de> wrote:
>>> Where can I find these information? I can see the memory usage and cpu 
>>> load. But where are the information on the GC?
>>> 
>>> 
>>> 
>>>> Am 08.09.2015 um 09:34 schrieb Robert Metzger <rmetz...@apache.org>:
>>>> 
>>>> The webinterface of Flink has a tab for the TaskManagers. There, you can 
>>>> also see how much time the JVM spend with garbage collection.
>>>> Can you check whether the number of GC calls + the time spend goes up 
>>>> after 30 minutes?
>>>> 
>>>>> On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann <i...@ricobergmann.de> 
>>>>> wrote:
>>>>> Hi!
>>>>> 
>>>>> I also think it's a GC problem. In the KeySelector I don't instantiate 
>>>>> any object. It's a simple toString method call. 
>>>>> In the mapWindow I create new objects. But I'm doing the same in other 
>>>>> map operators, too. They don't slow down the execution. Only with this 
>>>>> construct the execution is slowed down. 
>>>>> 
>>>>> I watched on the memory footprint of my program. Once with the code 
>>>>> construct I wrote and once without. The memory characteristic were the 
>>>>> same. The CPU usage also ... 
>>>>> 
>>>>> I don't have an explanation. But I don't think it comes from my operator 
>>>>> functions ...
>>>>> 
>>>>> Cheers Rico. 
>>>>> 
>>>>> 
>>>>> 
>>>>>> Am 07.09.2015 um 22:43 schrieb Martin Neumann <mneum...@sics.se>:
>>>>>> 
>>>>>> Hej,
>>>>>> 
>>>>>> This sounds like it could be a garbage collection problem. Do you 
>>>>>> instantiate any classes inside any of the operators (e.g. in the 
>>>>>> KeySelector). You can also try to run it locally and use something like 
>>>>>> jstat to rule this out.
>>>>>> 
>>>>>> cheers Martin
>>>>>> 
>>>>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> 
>>>>>>> wrote:
>>>>>>> Hi!
>>>>>>> 
>>>>>>> While working with grouping and windowing I encountered a strange 
>>>>>>> behavior. I'm doing:
>>>>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>>>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>>>>> 
>>>>>>> When I run the program containing this snippet it initially outputs 
>>>>>>> data at a rate around 150 events per sec. (That is roughly the input 
>>>>>>> rate for the program). After about 10-30 minutes the rate drops down 
>>>>>>> below 5 events per sec. This leads to event delivery offsets getting 
>>>>>>> bigger and bigger ... 
>>>>>>> 
>>>>>>> Any explanation for this? I know you are reworking the streaming API. 
>>>>>>> But it would be useful to know, why this happens ...
>>>>>>> 
>>>>>>> Cheers. Rico. 
>> 


Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Yes. The keys are constantly changing. Indeed each unique event has its own key 
(the event itself). The purpose was to do an event deduplication ...



> Am 08.09.2015 um 20:05 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Rico,
> I have a suspicion. What is the distribution of your keys? That is, are there 
> many unique keys, do the keys keep evolving, i.e. is it always new and 
> different keys?
> 
> Cheers,
> Aljoscha
> 
>> On Tue, 8 Sep 2015 at 13:44 Rico Bergmann <i...@ricobergmann.de> wrote:
>> I also see in the TM overview the CPU load is still around 25% although 
>> there is no input to the program since minutes. The CPU load is degrading 
>> very slowly. 
>> 
>> The memory consumption is still fluctuating at a high level. It does not 
>> degrade. 
>> 
>> In my test I generated test input for 1 minute. Now 10 minutes are over ... 
>> 
>> I think there must be something with flink...
>> 
>> 
>> 
>>> Am 08.09.2015 um 13:32 schrieb Rico Bergmann <i...@ricobergmann.de>:
>>> 
>>> The marksweep value is very high, the scavenge very low. If this helps ;-)
>>> 
>>> 
>>> 
>>> 
>>>> Am 08.09.2015 um 11:27 schrieb Robert Metzger <rmetz...@apache.org>:
>>>> 
>>>> It is in the "Information" column: http://i.imgur.com/rzxxURR.png
>>>> In the screenshot, the two GCs only spend 84 and 25 ms.
>>>> 
>>>>> On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann <i...@ricobergmann.de> 
>>>>> wrote:
>>>>> Where can I find these information? I can see the memory usage and cpu 
>>>>> load. But where are the information on the GC?
>>>>> 
>>>>> 
>>>>> 
>>>>>> Am 08.09.2015 um 09:34 schrieb Robert Metzger <rmetz...@apache.org>:
>>>>>> 
>>>>>> The webinterface of Flink has a tab for the TaskManagers. There, you can 
>>>>>> also see how much time the JVM spend with garbage collection.
>>>>>> Can you check whether the number of GC calls + the time spend goes up 
>>>>>> after 30 minutes?
>>>>>> 
>>>>>>> On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann <i...@ricobergmann.de> 
>>>>>>> wrote:
>>>>>>> Hi!
>>>>>>> 
>>>>>>> I also think it's a GC problem. In the KeySelector I don't instantiate 
>>>>>>> any object. It's a simple toString method call. 
>>>>>>> In the mapWindow I create new objects. But I'm doing the same in other 
>>>>>>> map operators, too. They don't slow down the execution. Only with this 
>>>>>>> construct the execution is slowed down. 
>>>>>>> 
>>>>>>> I watched on the memory footprint of my program. Once with the code 
>>>>>>> construct I wrote and once without. The memory characteristic were the 
>>>>>>> same. The CPU usage also ... 
>>>>>>> 
>>>>>>> I don't have an explanation. But I don't think it comes from my 
>>>>>>> operator functions ...
>>>>>>> 
>>>>>>> Cheers Rico. 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> Am 07.09.2015 um 22:43 schrieb Martin Neumann <mneum...@sics.se>:
>>>>>>>> 
>>>>>>>> Hej,
>>>>>>>> 
>>>>>>>> This sounds like it could be a garbage collection problem. Do you 
>>>>>>>> instantiate any classes inside any of the operators (e.g. in the 
>>>>>>>> KeySelector). You can also try to run it locally and use something 
>>>>>>>> like jstat to rule this out.
>>>>>>>> 
>>>>>>>> cheers Martin
>>>>>>>> 
>>>>>>>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann <i...@ricobergmann.de> 
>>>>>>>>> wrote:
>>>>>>>>> Hi!
>>>>>>>>> 
>>>>>>>>> While working with grouping and windowing I encountered a strange 
>>>>>>>>> behavior. I'm doing:
>>>>>>>>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>>>>>>>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>>>>>>> 
>>>>>>>>> When I run the program containing this snippet it initially outputs 
>>>>>>>>> data at a rate around 150 events per sec. (That is roughly the input 
>>>>>>>>> rate for the program). After about 10-30 minutes the rate drops down 
>>>>>>>>> below 5 events per sec. This leads to event delivery offsets getting 
>>>>>>>>> bigger and bigger ... 
>>>>>>>>> 
>>>>>>>>> Any explanation for this? I know you are reworking the streaming API. 
>>>>>>>>> But it would be useful to know, why this happens ...
>>>>>>>>> 
>>>>>>>>> Cheers. Rico. 


Performance Issue

2015-09-07 Thread Rico Bergmann
Hi!

While working with grouping and windowing I encountered a strange behavior. I'm 
doing:
> dataStream.groupBy(KeySelector).window(Time.of(x, 
> TimeUnit.SECONDS)).mapWindow(toString).flatten()

When I run the program containing this snippet it initially outputs data at a 
rate around 150 events per sec. (That is roughly the input rate for the 
program). After about 10-30 minutes the rate drops down below 5 events per sec. 
This leads to event delivery offsets getting bigger and bigger ... 

Any explanation for this? I know you are reworking the streaming API. But it 
would be useful to know, why this happens ...

Cheers. Rico. 

Re: Duplicates in Flink

2015-09-03 Thread Rico Bergmann
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my 
output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly once 
> processing and checkpointed operators. We reworked how the checkpoints are 
> handled for the 0.10 release so it should work well there. 
> 
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
> problems persist there?
> 
> Cheers,
> Aljoscha
> 
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <i...@ricobergmann.de> 
>> wrote:
>> Hi!
>> 
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>> 
>>  final FlinkKafkaConsumer082 kafkaSrc = new
>> FlinkKafkaConsumer082(
>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>> 
>>  DataStream start = env.addSource(kafkaSrc)
>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>> 
>>  DataStream jsonized = start
>>  .flatMap(new ExtractAndFilterJSON());
>> 
>>  DataStream sessions = jsonized
>>  .partitionByHash(new KeySelector<JSONObject, String>() {
>>  /**
>>   * partition by session id
>>   */
>>  @Override
>>  public String getKey(JSONObject value)
>>  throws Exception {
>>  try {
>>  return /*session id*/;
>>  } catch (Exception e) {
>>  LOG.error("no session could be retrieved", e);
>>  }
>>  return "";
>>  }
>>  }).flatMap(new StatefulSearchSessionizer());
>> 
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>> 
>> Any ideas?
>> 
>> Cheers, Rico.


Re: Duplicates in Flink

2015-09-03 Thread Rico Bergmann
No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with 
flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication 
(does not eliminate all duplicates though). Then some computation, afterwards 
again deduplication (group by message in a window of last 2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>:
> 
> Do you mean the KafkaSource?
> 
> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the 
> KafkaSource?
> 
>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> Testing it with the current 0.10 snapshot is not easily possible atm
>> 
>> But I deactivated checkpointing in my program and still get duplicates in my 
>> output. So it seems not only to come from the checkpointing feature, or?
>> 
>> May be the KafkaSink is responsible for this? (Just my guess)
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi Rico,
>>> unfortunately the 0.9 branch still seems to have problems with exactly once 
>>> processing and checkpointed operators. We reworked how the checkpoints are 
>>> handled for the 0.10 release so it should work well there. 
>>> 
>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
>>> problems persist there?
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
>>>> <i...@ricobergmann.de> wrote:
>>>> Hi!
>>>> 
>>>> I still have an issue... I was now using 0.9.1 and the new
>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>>> relevant part:
>>>> 
>>>>  final FlinkKafkaConsumer082 kafkaSrc = new
>>>> FlinkKafkaConsumer082(
>>>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>>>> 
>>>>  DataStream start = env.addSource(kafkaSrc)
>>>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>> 
>>>>  DataStream jsonized = start
>>>>  .flatMap(new ExtractAndFilterJSON());
>>>> 
>>>>  DataStream sessions = jsonized
>>>>  .partitionByHash(new KeySelector<JSONObject, String>() {
>>>>  /**
>>>>   * partition by session id
>>>>   */
>>>>  @Override
>>>>  public String getKey(JSONObject value)
>>>>  throws Exception {
>>>>  try {
>>>>  return /*session id*/;
>>>>  } catch (Exception e) {
>>>>  LOG.error("no session could be retrieved", e);
>>>>  }
>>>>  return "";
>>>>  }
>>>>  }).flatMap(new StatefulSearchSessionizer());
>>>> 
>>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>>> sure that the kafka topic I'm reading from does not contain any
>>>> duplicates. So it must be in the flink program ...
>>>> 
>>>> Any ideas?
>>>> 
>>>> Cheers, Rico.
> 


Re: Duplicates in Flink

2015-09-03 Thread Rico Bergmann
The KafkaSink is the last step in my program after the 2nd deduplication. 

I could not yet track down where duplicates show up. That's a bit difficult to 
find out... But I'm trying to find it...



> Am 03.09.2015 um 14:14 schrieb Stephan Ewen <se...@apache.org>:
> 
> Can you tell us where the KafkaSink comes into play? At what point do the 
> duplicates come up?
> 
>> On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> No. I mean the KafkaSink. 
>> 
>> A bit more insight to my program: I read from a Kafka topic with 
>> flinkKafkaConsumer082, then hashpartition the data, then I do a 
>> deduplication (does not eliminate all duplicates though). Then some 
>> computation, afterwards again deduplication (group by message in a window of 
>> last 2 seconds). 
>> 
>> Of course the last deduplication is not perfect.
>> 
>> Cheers. Rico. 
>> 
>> 
>> 
>>> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>:
>>> 
>>> Do you mean the KafkaSource?
>>> 
>>> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the 
>>> KafkaSource?
>>> 
>>>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>>>> Hi!
>>>> 
>>>> Testing it with the current 0.10 snapshot is not easily possible atm
>>>> 
>>>> But I deactivated checkpointing in my program and still get duplicates in 
>>>> my output. So it seems not only to come from the checkpointing feature, or?
>>>> 
>>>> May be the KafkaSink is responsible for this? (Just my guess)
>>>> 
>>>> Cheers Rico. 
>>>> 
>>>> 
>>>> 
>>>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>>> 
>>>>> Hi Rico,
>>>>> unfortunately the 0.9 branch still seems to have problems with exactly 
>>>>> once processing and checkpointed operators. We reworked how the 
>>>>> checkpoints are handled for the 0.10 release so it should work well 
>>>>> there. 
>>>>> 
>>>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
>>>>> problems persist there?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> 
>>>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
>>>>>> <i...@ricobergmann.de> wrote:
>>>>>> Hi!
>>>>>> 
>>>>>> I still have an issue... I was now using 0.9.1 and the new
>>>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>>>>> relevant part:
>>>>>> 
>>>>>>  final FlinkKafkaConsumer082 kafkaSrc = new
>>>>>> FlinkKafkaConsumer082(
>>>>>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>>>>>> 
>>>>>>  DataStream start = env.addSource(kafkaSrc)
>>>>>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>>>> 
>>>>>>  DataStream jsonized = start
>>>>>>  .flatMap(new ExtractAndFilterJSON());
>>>>>> 
>>>>>>  DataStream sessions = jsonized
>>>>>>  .partitionByHash(new KeySelector<JSONObject, String>() {
>>>>>>  /**
>>>>>>   * partition by session id
>>>>>>   */
>>>>>>  @Override
>>>>>>  public String getKey(JSONObject value)
>>>>>>  throws Exception {
>>>>>>  try {
>>>>>>  return /*session id*/;
>>>>>>  } catch (Exception e) {
>>>>>>  LOG.error("no session could be retrieved", e);
>>>>>>  }
>>>>>>  return "";
>>>>>>  }
>>>>>>  }).flatMap(new StatefulSearchSessionizer());
>>>>>> 
>>>>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>>>>> sure that the kafka topic I'm reading from does not contain any
>>>>>> duplicates. So it must be in the flink program ...
>>>>>> 
>>>>>> Any ideas?
>>>>>> 
>>>>>> Cheers, Rico.
> 


Re: Problem with Windowing

2015-09-01 Thread Rico Bergmann
Hi Stefan,

Thanks for the advice. It works ...

Cheers. Rico. 



> Am 31.08.2015 um 20:14 schrieb Stephan Ewen <se...@apache.org>:
> 
> Hey Rico!
> 
> Parts of the "global windows" are still not super stable, and we are heavily 
> reworking them for the 0.10 release.
> 
> What you can try is reversing the order of the "window" and "groupby" 
> statement. If you group before windowing, you get local windows, if you 
> window before grouping, you get global windows. Local windows work better.
> 
> Greetings,
> Stephan
> 
> 
>> On Mon, Aug 31, 2015 at 6:40 PM, Matthias J. Sax 
>> <mj...@informatik.hu-berlin.de> wrote:
>> Maybe you could include some log statements in you user code to see
>> which parts of the program receive data and which not. To narrow down
>> the problematic part...
>> 
>> On 08/31/2015 06:03 PM, Rico Bergmann wrote:
>> > The part is exactly as I wrote. ds is assigned a data flow that computes 
>> > some stuff. Then the de duplication code as written in my first mail us 
>> > assigned to a new variable called output. Then output.addSink(.) is called.
>> >
>> >
>> >> Am 31.08.2015 um 17:45 schrieb Matthias J. Sax 
>> >> <mj...@informatik.hu-berlin.de>:
>> >>
>> >> Can you post your whole program (both versions if possible)?
>> >>
>> >> Otherwise I have only a wild guess: A common mistake is not to assign
>> >> the stream variable properly:
>> >>
>> >> DataStream ds = ...
>> >>
>> >> ds = ds.APPLY_FUNCTIONS
>> >>
>> >> ds.APPLY_MORE_FUNCTIONS
>> >>
>> >> In your code example, the assignment is missing -- but maybe it just
>> >> missing in your email.
>> >>
>> >> -Matthias
>> >>
>> >>
>> >>> On 08/31/2015 04:38 PM, Dipl.-Inf. Rico Bergmann wrote:
>> >>> Hi!
>> >>>
>> >>> I have a problem that I cannot really track down. I'll try to describe
>> >>> the issue.
>> >>>
>> >>> My streaming flink program computes something. At the end I'm doing the
>> >>> follwing on my DataStream ds
>> >>> ds.window(2, TimeUnit.SECONDS)
>> >>> .groupBy(/*custom KeySelector converting input to a String
>> >>> representation*/)
>> >>> .mapWindow(/*TypeConversion*/)
>> >>> .flatten()
>> >>>
>> >>> Then the result is written to a Kafka topic.
>> >>>
>> >>> The purpose of this is output deduplication within a 2 seconds window...
>> >>>
>> >>> Without the above the program works fine. But with the above I don't get
>> >>> any output and no error appears in the log. The program keeps running.
>> >>> Am I doing something wrong?
>> >>>
>> >>> I would be happy for help!
>> >>>
>> >>> Cheers, Rico.
>> >>
>> >
> 


Duplicates in Flink

2015-09-01 Thread Dipl.-Inf. Rico Bergmann

Hi!

I still have an issue... I was now using 0.9.1 and the new 
KafkaConnector. But I still get duplicates in my flink prog. Here's the 
relevant part:


final FlinkKafkaConsumer082 kafkaSrc = new 
FlinkKafkaConsumer082(

kafkaTopicIn, new SimpleStringSchema(), properties);

DataStream start = env.addSource(kafkaSrc)
.setParallelism(numReadPartitions); //numReadPartitions = 2

DataStream jsonized = start
.flatMap(new ExtractAndFilterJSON());

DataStream sessions = jsonized
.partitionByHash(new KeySelector() {
/**
 * partition by session id
 */
@Override
public String getKey(JSONObject value)
throws Exception {
try {
return /*session id*/;
} catch (Exception e) {
LOG.error("no session could be retrieved", e);
}
return "";
}
}).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm 
sure that the kafka topic I'm reading from does not contain any 
duplicates. So it must be in the flink program ...


Any ideas?

Cheers, Rico.



Re: Problem with Windowing

2015-08-31 Thread Rico Bergmann
The part is exactly as I wrote. ds is assigned a data flow that computes some 
stuff. Then the de duplication code as written in my first mail us assigned to 
a new variable called output. Then output.addSink(.) is called. 


> Am 31.08.2015 um 17:45 schrieb Matthias J. Sax 
> <mj...@informatik.hu-berlin.de>:
> 
> Can you post your whole program (both versions if possible)?
> 
> Otherwise I have only a wild guess: A common mistake is not to assign
> the stream variable properly:
> 
> DataStream ds = ...
> 
> ds = ds.APPLY_FUNCTIONS
> 
> ds.APPLY_MORE_FUNCTIONS
> 
> In your code example, the assignment is missing -- but maybe it just
> missing in your email.
> 
> -Matthias
> 
> 
>> On 08/31/2015 04:38 PM, Dipl.-Inf. Rico Bergmann wrote:
>> Hi!
>> 
>> I have a problem that I cannot really track down. I'll try to describe
>> the issue.
>> 
>> My streaming flink program computes something. At the end I'm doing the
>> follwing on my DataStream ds
>> ds.window(2, TimeUnit.SECONDS)
>> .groupBy(/*custom KeySelector converting input to a String
>> representation*/)
>> .mapWindow(/*TypeConversion*/)
>> .flatten()
>> 
>> Then the result is written to a Kafka topic.
>> 
>> The purpose of this is output deduplication within a 2 seconds window...
>> 
>> Without the above the program works fine. But with the above I don't get
>> any output and no error appears in the log. The program keeps running.
>> Am I doing something wrong?
>> 
>> I would be happy for help!
>> 
>> Cheers, Rico.
> 


Re: Flink to ingest from Kafka to HDFS?

2015-08-25 Thread Rico Bergmann
Hi!

Sorry, I won't be able to implement this soon. I just shared my ideas on this. 

Greets. Rico. 



 Am 25.08.2015 um 17:52 schrieb Stephan Ewen se...@apache.org:
 
 Hi Rico!
 
 Can you give us an update on your status here? We actually need something 
 like this as well (and pretty urgent), so we would jump in
 and implement this, unless you have something already.
 
 Stephan
 
 
 On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen se...@apache.org wrote:
 BTW: This is becoming a dev discussion, maybe should move to that list...
 
 On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen se...@apache.org wrote:
 Yes, one needs exactly a mechanism to seek the output stream back to the 
 last checkpointed position, in order to overwrite duplicates.
 
 I think HDFS is not going to make this easy, there is basically no seek for 
 write. Not sure how to solve this, other then writing to tmp files and 
 copying upon success.
 
 Apache Flume must have solved this issue in some way, it may be a worth 
 looking into how they solved it.
 
 On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann i...@ricobergmann.de 
 wrote:
 My ideas for checkpointing:
 
 I think writing to the destination should not depend on the checkpoint 
 mechanism (otherwise the output would never be written to the destination 
 if checkpointing is disabled). Instead I would keep the offsets of written 
 and Checkpointed records. When recovering you would then somehow delete or 
 overwrite the records after that offset. (But I don't really know whether 
 this is as simple as I wrote it ;-) ). 
 
 Regarding the rolling files I would suggest making the values of the 
 user-defined partitioning function part of the path or file name. Writing 
 records is then basically:
 Extract the partition to write to, then add the record to a queue for this 
 partition. Each queue has an output format assigned to it. On flushing the 
 output file is opened, the content of the queue is written to it, and then 
 closed.
 
 Does this sound reasonable?
 
 
 
 Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek aljos...@apache.org:
 
 Yes, this seems like a good approach. We should probably no reuse the 
 KeySelector for this but maybe a more use-case specific type of function 
 that can create a desired filename from an input object.
 
 This is only the first part, though. The hard bit would be implementing 
 rolling files and also integrating it with Flink's checkpointing 
 mechanism. For integration with checkpointing you could maybe use 
 staging-files: all elements are put into a staging file. And then, when 
 the notification about a completed checkpoint is received the contents of 
 this file would me moved (or appended) to the actual destination.
 
 Do you have any Ideas about the rolling files/checkpointing?
 
 On Thu, 20 Aug 2015 at 09:44 Rico Bergmann i...@ricobergmann.de wrote:
 I'm thinking about implementing this. 
 
 After looking into the flink code I would basically subclass 
 FileOutputFormat in let's say KeyedFileOutputFormat, that gets an 
 additional KeySelector object. The path in the file system is then 
 appended by the string, the KeySelector returns. 
 
 U think this is a good approach?
 
 Greets. Rico. 
 
 
 
 Am 16.08.2015 um 19:56 schrieb Stephan Ewen se...@apache.org:
 
 If you are up for it, this would be a very nice addition to Flink, a 
 great contribution :-)
 
 On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen se...@apache.org wrote:
 Hi!
 
 This should definitely be possible in Flink. Pretty much exactly like 
 you describe it.
 
 You need a custom version of the HDFS sink with some logic when to 
 roll over to a new file.
 
 You can also make the sink exactly once by integrating it with the 
 checkpointing. For that, you would probably need to keep the current 
 path and output stream offsets as of the last checkpoint, so you can 
 resume from that offset and overwrite records to avoid duplicates. If 
 that is not possible, you would probably buffer records between 
 checkpoints and only write on checkpoints.
 
 Greetings,
 Stephan
 
 
 
 On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn hpz...@gmail.com 
 wrote:
 Hi,
 
 Did anybody think of (mis-) using Flink streaming as an alternative 
 to Apache Flume just for ingesting data from Kafka (or other 
 streaming sources) to HDFS? Knowing that Flink can read from Kafka 
 and write to hdfs I assume it should be possible, but Is this a good 
 idea to do? 
 
 Flume basically is about consuming data from somewhere, peeking into 
 each record and then directing it to a specific directory/file in 
 HDFS reliably. I've seen there is a FlumeSink, but would it be 
 possible to get the same functionality with
 Flink alone?
 
 I've skimmed through the documentation and found the option to split 
 the output by key and the possibility to add multiple sinks. As I 
 understand, Flink programs are generally static, so it would not be 
 possible to add/remove sinks at runtime?
 So you would need to implement a custom

Re: Flink to ingest from Kafka to HDFS?

2015-08-20 Thread Rico Bergmann
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint 
mechanism (otherwise the output would never be written to the destination if 
checkpointing is disabled). Instead I would keep the offsets of written and 
Checkpointed records. When recovering you would then somehow delete or 
overwrite the records after that offset. (But I don't really know whether this 
is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the 
user-defined partitioning function part of the path or file name. Writing 
records is then basically:
Extract the partition to write to, then add the record to a queue for this 
partition. Each queue has an output format assigned to it. On flushing the 
output file is opened, the content of the queue is written to it, and then 
closed.

Does this sound reasonable?



 Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek aljos...@apache.org:
 
 Yes, this seems like a good approach. We should probably no reuse the 
 KeySelector for this but maybe a more use-case specific type of function that 
 can create a desired filename from an input object.
 
 This is only the first part, though. The hard bit would be implementing 
 rolling files and also integrating it with Flink's checkpointing mechanism. 
 For integration with checkpointing you could maybe use staging-files: all 
 elements are put into a staging file. And then, when the notification about a 
 completed checkpoint is received the contents of this file would me moved (or 
 appended) to the actual destination.
 
 Do you have any Ideas about the rolling files/checkpointing?
 
 On Thu, 20 Aug 2015 at 09:44 Rico Bergmann i...@ricobergmann.de wrote:
 I'm thinking about implementing this. 
 
 After looking into the flink code I would basically subclass 
 FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional 
 KeySelector object. The path in the file system is then appended by the 
 string, the KeySelector returns. 
 
 U think this is a good approach?
 
 Greets. Rico. 
 
 
 
 Am 16.08.2015 um 19:56 schrieb Stephan Ewen se...@apache.org:
 
 If you are up for it, this would be a very nice addition to Flink, a great 
 contribution :-)
 
 On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen se...@apache.org wrote:
 Hi!
 
 This should definitely be possible in Flink. Pretty much exactly like you 
 describe it.
 
 You need a custom version of the HDFS sink with some logic when to roll 
 over to a new file.
 
 You can also make the sink exactly once by integrating it with the 
 checkpointing. For that, you would probably need to keep the current path 
 and output stream offsets as of the last checkpoint, so you can resume 
 from that offset and overwrite records to avoid duplicates. If that is not 
 possible, you would probably buffer records between checkpoints and only 
 write on checkpoints.
 
 Greetings,
 Stephan
 
 
 
 On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn hpz...@gmail.com wrote:
 Hi,
 
 Did anybody think of (mis-) using Flink streaming as an alternative to 
 Apache Flume just for ingesting data from Kafka (or other streaming 
 sources) to HDFS? Knowing that Flink can read from Kafka and write to 
 hdfs I assume it should be possible, but Is this a good idea to do? 
 
 Flume basically is about consuming data from somewhere, peeking into each 
 record and then directing it to a specific directory/file in HDFS 
 reliably. I've seen there is a FlumeSink, but would it be possible to get 
 the same functionality with
 Flink alone?
 
 I've skimmed through the documentation and found the option to split the 
 output by key and the possibility to add multiple sinks. As I understand, 
 Flink programs are generally static, so it would not be possible to 
 add/remove sinks at runtime?
 So you would need to implement a custom sink directing the records to 
 different files based on a key (e.g. date)? Would it be difficult to 
 implement things like rolling outputs etc? Or better just use Flume?
 
 Best, 
 Hans-Peter


Re: Flink to ingest from Kafka to HDFS?

2015-08-20 Thread Rico Bergmann
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat 
in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. 
The path in the file system is then appended by the string, the KeySelector 
returns. 

U think this is a good approach?

Greets. Rico. 



 Am 16.08.2015 um 19:56 schrieb Stephan Ewen se...@apache.org:
 
 If you are up for it, this would be a very nice addition to Flink, a great 
 contribution :-)
 
 On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen se...@apache.org wrote:
 Hi!
 
 This should definitely be possible in Flink. Pretty much exactly like you 
 describe it.
 
 You need a custom version of the HDFS sink with some logic when to roll over 
 to a new file.
 
 You can also make the sink exactly once by integrating it with the 
 checkpointing. For that, you would probably need to keep the current path 
 and output stream offsets as of the last checkpoint, so you can resume from 
 that offset and overwrite records to avoid duplicates. If that is not 
 possible, you would probably buffer records between checkpoints and only 
 write on checkpoints.
 
 Greetings,
 Stephan
 
 
 
 On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn hpz...@gmail.com wrote:
 Hi,
 
 Did anybody think of (mis-) using Flink streaming as an alternative to 
 Apache Flume just for ingesting data from Kafka (or other streaming 
 sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs 
 I assume it should be possible, but Is this a good idea to do? 
 
 Flume basically is about consuming data from somewhere, peeking into each 
 record and then directing it to a specific directory/file in HDFS reliably. 
 I've seen there is a FlumeSink, but would it be possible to get the same 
 functionality with
 Flink alone?
 
 I've skimmed through the documentation and found the option to split the 
 output by key and the possibility to add multiple sinks. As I understand, 
 Flink programs are generally static, so it would not be possible to 
 add/remove sinks at runtime?
 So you would need to implement a custom sink directing the records to 
 different files based on a key (e.g. date)? Would it be difficult to 
 implement things like rolling outputs etc? Or better just use Flume?
 
 Best, 
 Hans-Peter
 


Re: when use broadcast variable and run on bigdata display this error please help

2015-08-20 Thread Rico Bergmann
As you can see from the exceptions your broadcast variable is too large to fit 
into the main memory. 

I think storing that amount of data in a broadcast variable is not the best 
approach. Try to use a dataset for this, I would suggest. 



 Am 20.08.2015 um 11:56 schrieb hagersaleh loveallah1...@yahoo.com:
 
 please help
 
 
 
 --
 View this message in context: 
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2461.html
 Sent from the Apache Flink User Mailing List archive. mailing list archive at 
 Nabble.com.


Re: Custom Class for state checkpointing

2015-08-18 Thread Rico Bergmann
Hi Marton. 

I think this is more a class loader issue. My custom class implements 
Serializable and so do all contained members classes. 

Greets. Rico. 



 Am 18.08.2015 um 11:45 schrieb Márton Balassi balassi.mar...@gmail.com:
 
 Hey Rico,
 
 Currently the Checkpointed interface has the limitation the return type of 
 the snapshotstate method (the generic paramter of Checkpointed) has to be 
 java Serializable. I suspect that is the problem here. This is a limitation 
 that we plan to lift soon.
 
 Marton
 
 On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann i...@ricobergmann.de wrote:
 Hi!
 Is it possible to use your own class?
 I'm using the file state handler at the Jobmanager and implemented the 
 Checkpointed interface. 
 
 I tried this and got an exception:
 
 Error: java.lang.RuntimeException: Failed to deserialize state handle and 
 setup initial operator state.
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.ClassNotFoundException: 
 com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at 
 org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
 at 
 org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
 at 
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
 at 
 org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)
 


Custom Class for state checkpointing

2015-08-18 Thread Rico Bergmann
Hi!
Is it possible to use your own class?
I'm using the file state handler at the Jobmanager and implemented the 
Checkpointed interface. 

I tried this and got an exception:

Error: java.lang.RuntimeException: Failed to deserialize state handle and setup 
initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at 
 org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
 at 
 org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
 at 
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
 at 
 org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)


Re: Custom Class for state checkpointing

2015-08-18 Thread Rico Bergmann
Hi!

Using TupleX is not possible since the state is very big (a Hashtable). 

How would I have to do serialization into a byte array?

Greets. Rico. 



 Am 18.08.2015 um 11:44 schrieb Robert Metzger rmetz...@apache.org:
 
 Hi Rico,
 
 I'm pretty sure that this is a valid bug you've found, since this case is not 
 yet tested (afaik).
 We'll fix the issue asap, until then, are you able to encapsulate your state 
 in something that is available in Flink, for example a TupleX or just 
 serialize it yourself into a byte[] ?
 
 On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann i...@ricobergmann.de wrote:
 Hi!
 Is it possible to use your own class?
 I'm using the file state handler at the Jobmanager and implemented the 
 Checkpointed interface. 
 
 I tried this and got an exception:
 
 Error: java.lang.RuntimeException: Failed to deserialize state handle and 
 setup initial operator state.
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.ClassNotFoundException: 
 com.ottogroup.bi.searchlab.searchsessionizer.OperatorState
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at 
 org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63)
 at 
 org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33)
 at 
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276)
 at 
 org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541)
 


optimal deployment model for Flink Streaming programs

2015-07-30 Thread Dipl.-Inf. Rico Bergmann

Hi!

We want to build an infrastructure for automated deployment of Flink 
Streaming programs to a dedicated environment. This includes automated 
tests (unit and integration) via Jenkins and in case of a successful 
buildtest the program should be deployed to the execution environment.


Since streaming programs run infinitely, the problem is to switch from 
the running program to the newly deployed. The CLI has some features 
that would make it possible (list, cancel). Is there another way of 
somehow restarting a streaming program? Do you have a suggested way for 
the deployment (automated!)?


Regards
Rico B.