Re: checkpoint notifier not found?

2016-12-09 Thread Abhishek R. Singh
I was following the official documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
 


Looks like this is the right one to be using: import 
org.apache.flink.runtime.state.CheckpointListener;

-Abhishek-

> On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh 
>  wrote:
> 
> I can’t seem to find CheckpointNotifier. Appreciate help !
> 
> CheckpointNotifier is not a member of package 
> org.apache.flink.streaming.api.checkpoint
> 
> From my pom.xml:
> 
> 
> org.apache.flink
> flink-scala_2.11
> 1.1.3
> 
> 
> org.apache.flink
> flink-streaming-scala_2.11
> 1.1.3
> 
> 
> org.apache.flink
> flink-clients_2.11
> 1.1.3
> 
> 
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> 1.1.3
> 



checkpoint notifier not found?

2016-12-09 Thread Abhishek R. Singh
I can’t seem to find CheckpointNotifier. Appreciate help !

CheckpointNotifier is not a member of package 
org.apache.flink.streaming.api.checkpoint

From my pom.xml:


org.apache.flink
flink-scala_2.11
1.1.3


org.apache.flink
flink-streaming-scala_2.11
1.1.3


org.apache.flink
flink-clients_2.11
1.1.3


org.apache.flink
flink-statebackend-rocksdb_2.11
1.1.3


Testing Flink Streaming applications - controlling the clock

2016-12-09 Thread Rohit Agarwal
Hi,

I am writing tests for my flink streaming application. I mostly use
event-time. But there are some aspects which are still controlled by
wall-clock time. For example, I am using AssignerWithPeriodicWatermarks and
so watermarks are triggered based on wall-clock time. Similarly,
checkpoints are also triggered based on wall-clock time. Is there a way I
can manually control the clock which flink uses from my tests.

--
Rohit Agarwal


Re: Reg. custom sinks in Flink

2016-12-09 Thread Meghashyam Sandeep V
Thanks a lot for the quick reply Shannon.

1. I will create a class that extends SinkFunction and write my connection
logic there. My only question here is- will a dbSession be created for each
message/partition which might affect the performance? Thats the reason why
I added this line to create a connection once and use it along the
datastream. if(dbSession == null && store!=null) { dbSession =
getSession();}

2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C*
cluster and I couldn't get it work with all my SSL
config(truststore,keystore etc) added to cluster building. I didn't find a
proper example with SSL enabled flink-connector-cassandra


Thanks




On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey  wrote:

> You haven't really added a sink in Flink terminology, you're just
> performing a side effect within a map operator. So while it may work, if
> you want to add a sink proper you need have an object that extends
> SinkFunction or RichSinkFunction. The method call on the stream should be
> ".addSink(…)".
>
> Also, the dbSession isn't really Flink state as it will not vary based on
> the position in or content in the stream. It's a necessary helper object,
> yes, but you don't need Flink to checkpoint it.
>
> You can still use the sinks provided with flink-connector-cassandra and
> customize the cluster building by passing your own ClusterBuilder into the
> constructor.
>
> -Shannon
>
> From: Meghashyam Sandeep V 
> Date: Friday, December 9, 2016 at 12:26 PM
> To: , 
> Subject: Reg. custom sinks in Flink
>
> Hi there,
>
> I have a flink streaming app where my source is Kafka and a custom sink to
> Cassandra(I can't use standard C* sink that comes with flink as I have
> customized auth to C*). I'm currently have the following:
>
> messageStream
> .rebalance()
>
> .map( s-> {
>
> return mapper.readValue(s, JsonNode.class);)
>
> .filter(//filter some messages)
>
> .map(
>
>  (MapFunction) message -> {
>
>  getDbSession.execute("QUERY_TO_EXEC")
>
>  })
>
> private static Session getDbSession() {
> if(dbSession == null && store!=null) {
> dbSession = getSession();
> }
>
> return dbSession;
> }
>
> 1. Is this the right way to add a custom sink? As you can see, I have 
> dbSession as a class variable here and I'm storing its state.
>
> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I 
> run using flink with YARN on EMR I get a NPE at the session which is kind of 
> weird.
>
>
> Thanks
>
>


Re: Reg. custom sinks in Flink

2016-12-09 Thread Shannon Carey
You haven't really added a sink in Flink terminology, you're just performing a 
side effect within a map operator. So while it may work, if you want to add a 
sink proper you need have an object that extends SinkFunction or 
RichSinkFunction. The method call on the stream should be ".addSink(…)".

Also, the dbSession isn't really Flink state as it will not vary based on the 
position in or content in the stream. It's a necessary helper object, yes, but 
you don't need Flink to checkpoint it.

You can still use the sinks provided with flink-connector-cassandra and 
customize the cluster building by passing your own ClusterBuilder into the 
constructor.

-Shannon

From: Meghashyam Sandeep V 
>
Date: Friday, December 9, 2016 at 12:26 PM
To: >, 
>
Subject: Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to 
Cassandra(I can't use standard C* sink that comes with flink as I have 
customized auth to C*). I'm currently have the following:


messageStream
.rebalance()

.map( s-> {

return mapper.readValue(s, JsonNode.class);)

.filter(//filter some messages)

.map(

 (MapFunction) message -> {

 getDbSession.execute("QUERY_TO_EXEC")

 })

private static Session getDbSession() {
if(dbSession == null && store!=null) {
dbSession = getSession();
}

return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have dbSession 
as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I 
run using flink with YARN on EMR I get a NPE at the session which is kind of 
weird.


Thanks


How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-09 Thread Shannon Carey
This thread 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html
 describes the impetus for the addition of yarn.taskmanager.env.

I have configured a value within yarn.taskmanager.env, and I see it appearing 
in the Flink web UI in the list underneath Job Manager -> Configuration. 
However, I can't figure out how to retrieve the value from within a Flink job. 
It doesn't appear in the environment, the system properties, or my 
ParameterTool instance, and I can't figure out how I would get to it via the 
StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, 
so that programmers don't have to specify the environment as a job parameter 
every time they run it. I also see that there is a "env.java.opts" 
configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon


Reg. custom sinks in Flink

2016-12-09 Thread Meghashyam Sandeep V
Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to
Cassandra(I can't use standard C* sink that comes with flink as I have
customized auth to C*). I'm currently have the following:

messageStream
.rebalance()

.map( s-> {

return mapper.readValue(s, JsonNode.class);)

.filter(//filter some messages)

.map(

 (MapFunction) message -> {

 getDbSession.execute("QUERY_TO_EXEC")

 })

private static Session getDbSession() {
if(dbSession == null && store!=null) {
dbSession = getSession();
}

return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have
dbSession as a class variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar).
When I run using flink with YARN on EMR I get a NPE at the session
which is kind of weird.


Thanks


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-09 Thread Greg Hogan
Google indexes the mailing list. Anyone can filter the messages to trash in
a few clicks.

This will also be a means for the community to better understand which and
how companies are using Flink.

On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz 
wrote:

> Hi,
>
> I wonder whether a mailing list is a good choice for that in general. If I
> am looking for a job I won't register for a mailing list or browse through
> the archive of one but rather search it via Google. So what about putting
> it on a dedicated site on the Web Page. This feels more intuitive to me and
> gives a better overview.
>
> Best regards,
> Felix
>
> On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:
>
>
>
>
> On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org)
> wrote:
> > I'm against using the news@ list for that.
> > The promise of the news@ list is that its low-traffic and only for
> news. If
> > we now start having job offers (and potentially some questions on them
> > etc.) it'll be a list with more than some announcements.
> > That's also the reason why the news@ list is completely moderated.
>
> I agree with Robert. I would consider that to be spam if posted to news@.
>
>
>
>


Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-09 Thread Fabian Hueske
It looks like the result you are trying to fetch with collect() is too
large.
collect() does only work for result up to 10MB.

I would write the result to a file and read that file in.

Best, Fabian

2016-12-09 16:30 GMT+01:00 Miguel Coimbra :

> Hello Fabian,
>
> So if I want to have 10 nodes with one working thread each, I would just
> set this, I assume:
>
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 10
>
> There is progress, albeit little.
> I am now running on a directory with more space.
> For 10 iterations of label propagation, I am getting this error at the end
> (on the TaskManager).
> I thought the execution was taking too much time, so I checked CPU usage
> of the TaskManager and it was really low.
> Checking the log on the TaskManager, I found this error at the bottom in
> bold:
>
>
> 2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.
> taskmanager.Task - Freeing task resources for
> IterationHead(Scatter-gather iteration (org.apache.flink.graph.
> library.LabelPropagation$UpdateVertexLabel@21aa6d6c |
> org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
> (1/1)
> 2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.
> taskmanager.Task - DataSink (collect()) (1/1)
> switched to FINISHED
> 2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.
> taskmanager.Task - Freeing task resources for
> DataSink (collect()) (1/1)
> 2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager  - Un-registering task and sending
> final execution state FINISHED to JobManager for task
> IterationHead(Scatter-gather iteration (org.apache.flink.graph.
> library.LabelPropagation$UpdateVertexLabel@21aa6d6c |
> org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
> (89eb2508cbda679502c2e0b258068274)
> 2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager  - Un-registering task and sending
> final execution state FINISHED to JobManager for task DataSink (collect()) (
> 26b8f3950f4e736b0798d28c4bf967ed)
> 2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter
> - Transient association error
> (association remains live)
> *akka.remote.OversizedPayloadException: Discarding oversized payload sent
> to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963
> ]: max allowed
> size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
> was 79885809 bytes.*
>
> Do you have any idea what this might be?
>
> Kind regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>
> On 6 December 2016 at 19:57, Fabian Hueske  wrote:
>
>> Hi Miguel,
>>
>> estimating the space requirements is not trivial. It depends of course on
>> the algorithm and the data itself. I'm not an expert for graph algorithms
>> and don't know your datasets.
>>
>> But have you tried to run the algorithm in a non dockerized environment?
>> That might help to figure out if this is an issue with the Docker
>> configuration rather than Flink.
>>
>> Btw. If you want to run with a parallelism of 3 you need at least three
>> slots, either 3 three slots in one TM or 1 slot in each of three TMs.
>>
>> Best,
>> Fabian
>>
>> 2016-12-05 17:20 GMT+01:00 Miguel Coimbra :
>>
>>> Hello Fabian,
>>>
>>> Thanks for the attention. Still haven't solved this.
>>> I did set up a cron job to clean the Docker images daily - thanks for
>>> that hint.
>>> As a last resort, I am going to look into a 2 TB NAS to see if this
>>> works.
>>>
>>> What is confusing me is that this happens also for the
>>> com-orkut.ungraph.txt dataset which is much smaller than
>>> com-friendster.ungraph.txt but not that much bigger than the
>>> com-dblp.ungraph.txt.
>>>
>>> DBLP - ​I am able to run the DBLP on one TaskManager container.​
>>> https://snap.stanford.edu/data/com-DBLP.html
>>> Nodes 317080  ~0.3 M
>>> Edges 1049866 ~ 1 M
>>>
>>> Orkut - no disk space error.
>>> https://snap.stanford.edu/data/com-Orkut.html
>>> Nodes 3072441 ~3 M
>>> Edges 117185083 ~ 117 M
>>>
>>> ​Friendster - no disk space error.
>>> https://snap.stanford.edu/data/com-Friendster.html
>>> Nodes 65608366 ~65 M
>>> Edges 1806067135 ~ 1800 M​
>>>
>>> For testing purposes, I'm using a JobManager (in its own Docker
>>> container), a single TaskManager (in its own Docker container) with the
>>> following config parameters:
>>>
>>> Heap is currently configured to 6 GB:
>>> taskmanager.heap.mb: 6000
>>>
>>> Parallelism is set as such:
>>>
>>> taskmanager.numberOfTaskSlots: 1
>>> parallelism.default: 1
>>>
>>> It is my understanding that if I want to test for example N = 3
>>> TaskManagers (each in its own Docker 

Re: How to analyze space usage of Flink algorithms

2016-12-09 Thread Greg Hogan
This does sound like a nice feature, both per-job and per-taskmanager bytes
written to and read from disk.

On Fri, Dec 9, 2016 at 8:51 AM, Chesnay Schepler  wrote:

> We do not measure how much data we are spilling to disk.
>
>
> On 09.12.2016 14:43, Fabian Hueske wrote:
>
> Hi,
>
> the heap mem usage should be available via Flink's metrics system.
> Not sure if that also captures spilled data. Chesnay (in CC) should know
> that.
>
> If the spilled data is not available as a metric, you can try to write a
> small script that monitors the directories to which Flink spills (Config
> parameter: taskmanager.tmp.dirs [1]).
> The script would repeatedly list all files and keep for each file the max
> size (files are deleted once the are not used anymore). This is not super
> precise but might be good enough.
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/config.html#jobmanager-amp-taskmanager
>
> 2016-12-09 14:12 GMT+01:00 otherwise777 :
>
>> Currently i'm doing some analysis for some algorithms that i use in Flink,
>> I'm interested in the Space and time it takes to execute them. For the
>> Time
>> i used getNetRuntime() in the executionenvironment, but I have no idea how
>> to analyse the amount of space an algorithm uses.
>> Space can mean different things here, like Heap space, disk space, overal
>> memory or allocated memory. I would like to analyze some of these.
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/How-to-analyze-spac
>> e-usage-of-Flink-algorithms-tp10555.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>


Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-09 Thread Miguel Coimbra
Hello Fabian,

So if I want to have 10 nodes with one working thread each, I would just
set this, I assume:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 10

There is progress, albeit little.
I am now running on a directory with more space.
For 10 iterations of label propagation, I am getting this error at the end
(on the TaskManager).
I thought the execution was taking too much time, so I checked CPU usage of
the TaskManager and it was really low.
Checking the log on the TaskManager, I found this error at the bottom in
bold:


2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task - Freeing
task resources for IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c
|
org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
(1/1)
2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task - DataSink
(collect()) (1/1) switched to FINISHED
2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task - Freeing
task resources for DataSink (collect()) (1/1)
2016-12-09 09:46:00,317 INFO
org.apache.flink.runtime.taskmanager.TaskManager  -
Un-registering task and sending final execution state FINISHED to
JobManager for task IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c
|
org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
(89eb2508cbda679502c2e0b258068274)
2016-12-09 09:46:00,317 INFO
org.apache.flink.runtime.taskmanager.TaskManager  -
Un-registering task and sending final execution state FINISHED to
JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed)
2016-12-09 09:46:04,080 ERROR
akka.remote.EndpointWriter- Transient
association error (association remains live)
*akka.remote.OversizedPayloadException: Discarding oversized payload sent
to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963
]: max allowed
size 10485760 bytes, actual size of encoded class
org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
was 79885809 bytes.*

Do you have any idea what this might be?

Kind regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra

On 6 December 2016 at 19:57, Fabian Hueske  wrote:

> Hi Miguel,
>
> estimating the space requirements is not trivial. It depends of course on
> the algorithm and the data itself. I'm not an expert for graph algorithms
> and don't know your datasets.
>
> But have you tried to run the algorithm in a non dockerized environment?
> That might help to figure out if this is an issue with the Docker
> configuration rather than Flink.
>
> Btw. If you want to run with a parallelism of 3 you need at least three
> slots, either 3 three slots in one TM or 1 slot in each of three TMs.
>
> Best,
> Fabian
>
> 2016-12-05 17:20 GMT+01:00 Miguel Coimbra :
>
>> Hello Fabian,
>>
>> Thanks for the attention. Still haven't solved this.
>> I did set up a cron job to clean the Docker images daily - thanks for
>> that hint.
>> As a last resort, I am going to look into a 2 TB NAS to see if this works.
>>
>> What is confusing me is that this happens also for the
>> com-orkut.ungraph.txt dataset which is much smaller than
>> com-friendster.ungraph.txt but not that much bigger than the
>> com-dblp.ungraph.txt.
>>
>> DBLP - ​I am able to run the DBLP on one TaskManager container.​
>> https://snap.stanford.edu/data/com-DBLP.html
>> Nodes 317080  ~0.3 M
>> Edges 1049866 ~ 1 M
>>
>> Orkut - no disk space error.
>> https://snap.stanford.edu/data/com-Orkut.html
>> Nodes 3072441 ~3 M
>> Edges 117185083 ~ 117 M
>>
>> ​Friendster - no disk space error.
>> https://snap.stanford.edu/data/com-Friendster.html
>> Nodes 65608366 ~65 M
>> Edges 1806067135 ~ 1800 M​
>>
>> For testing purposes, I'm using a JobManager (in its own Docker
>> container), a single TaskManager (in its own Docker container) with the
>> following config parameters:
>>
>> Heap is currently configured to 6 GB:
>> taskmanager.heap.mb: 6000
>>
>> Parallelism is set as such:
>>
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 1
>>
>> It is my understanding that if I want to test for example N = 3
>> TaskManagers (each in its own Docker container) with minimum parallelism
>> within each, I would use:
>>
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 3
>>
>>
>> Fabian, do you think you could help estimate how much disk space would be
>> required to compute the Orkut data set for example?
>> I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
>> This is the code I am using to read SNAP datasets and to test with Orkut,
>> Friendster and DBLP, in case you have a minute 

Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-09 Thread Timur Shenkao
Hi there,
1) it's a perfect idea
2) as I understand such information can be placed neither on*
flink.apache.org * nor on* data-artisans.com
*
3) there are tons of sites; who would take care of *dedicated site? *how
not to forget about its existence?

So, 1 - 4 letters a month in dev / user mail lists would not spoil anything
+ all interested people will get the info

On Fri, Dec 9, 2016 at 4:27 PM, Felix Neutatz 
wrote:

> Hi,
>
> I wonder whether a mailing list is a good choice for that in general. If I
> am looking for a job I won't register for a mailing list or browse through
> the archive of one but rather search it via Google. So what about putting
> it on a dedicated site on the Web Page. This feels more intuitive to me and
> gives a better overview.
>
> Best regards,
> Felix
>
> On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:
>
>
>
>
> On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org)
> wrote:
> > I'm against using the news@ list for that.
> > The promise of the news@ list is that its low-traffic and only for
> news. If
> > we now start having job offers (and potentially some questions on them
> > etc.) it'll be a list with more than some announcements.
> > That's also the reason why the news@ list is completely moderated.
>
> I agree with Robert. I would consider that to be spam if posted to news@.
>
>
>
>


Re: How to analyze space usage of Flink algorithms

2016-12-09 Thread Chesnay Schepler

We do not measure how much data we are spilling to disk.

On 09.12.2016 14:43, Fabian Hueske wrote:

Hi,

the heap mem usage should be available via Flink's metrics system.
Not sure if that also captures spilled data. Chesnay (in CC) should 
know that.


If the spilled data is not available as a metric, you can try to write 
a small script that monitors the directories to which Flink spills 
(Config parameter: |taskmanager.tmp.dirs| [1]).
The script would repeatedly list all files and keep for each file the 
max size (files are deleted once the are not used anymore). This is 
not super precise but might be good enough.


Hope this helps,
Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#jobmanager-amp-taskmanager


2016-12-09 14:12 GMT+01:00 otherwise777 >:


Currently i'm doing some analysis for some algorithms that i use
in Flink,
I'm interested in the Space and time it takes to execute them. For
the Time
i used getNetRuntime() in the executionenvironment, but I have no
idea how
to analyse the amount of space an algorithm uses.
Space can mean different things here, like Heap space, disk space,
overal
memory or allocated memory. I would like to analyze some of these.



--
View this message in context:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-analyze-space-usage-of-Flink-algorithms-tp10555.html


Sent from the Apache Flink User Mailing List archive. mailing list
archive at Nabble.com.






Re: How to analyze space usage of Flink algorithms

2016-12-09 Thread Fabian Hueske
Hi,

the heap mem usage should be available via Flink's metrics system.
Not sure if that also captures spilled data. Chesnay (in CC) should know
that.

If the spilled data is not available as a metric, you can try to write a
small script that monitors the directories to which Flink spills (Config
parameter: taskmanager.tmp.dirs [1]).
The script would repeatedly list all files and keep for each file the max
size (files are deleted once the are not used anymore). This is not super
precise but might be good enough.

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#jobmanager-amp-taskmanager

2016-12-09 14:12 GMT+01:00 otherwise777 :

> Currently i'm doing some analysis for some algorithms that i use in Flink,
> I'm interested in the Space and time it takes to execute them. For the Time
> i used getNetRuntime() in the executionenvironment, but I have no idea how
> to analyse the amount of space an algorithm uses.
> Space can mean different things here, like Heap space, disk space, overal
> memory or allocated memory. I would like to analyze some of these.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-analyze-
> space-usage-of-Flink-algorithms-tp10555.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink survey by data Artisans

2016-12-09 Thread Mike Winters
Hi everyone,

A quick heads-up that we'll be closing the Flink user survey to new
responses this coming Monday 12 Dec around 9am EST.

If you'd still like to respond before Monday, you can do so here:
http://www.surveygizmo.com/s3/3166399/181bdb611f22.

We've seen more than 100 responses so far. Thank you to all who have
participated.

Best,
Mike

On Fri, Nov 18, 2016 at 7:55 PM, Shannon Carey  wrote:

> There's a newline that disrupts the URL.
>
> http://www.surveygizmo.com/s3/3166399/181bdb611f22
>
> Not:
>
> http://www.surveygizmo.com/s3/
> 3166399/181bdb611f22
>
>


-- 
-Mike


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-09 Thread Felix Neutatz
Hi,

I wonder whether a mailing list is a good choice for that in general. If I
am looking for a job I won't register for a mailing list or browse through
the archive of one but rather search it via Google. So what about putting
it on a dedicated site on the Web Page. This feels more intuitive to me and
gives a better overview.

Best regards,
Felix

On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:




On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org) wrote:
> I'm against using the news@ list for that.
> The promise of the news@ list is that its low-traffic and only for news.
If
> we now start having job offers (and potentially some questions on them
> etc.) it'll be a list with more than some announcements.
> That's also the reason why the news@ list is completely moderated.

I agree with Robert. I would consider that to be spam if posted to news@.


How to analyze space usage of Flink algorithms

2016-12-09 Thread otherwise777
Currently i'm doing some analysis for some algorithms that i use in Flink,
I'm interested in the Space and time it takes to execute them. For the Time
i used getNetRuntime() in the executionenvironment, but I have no idea how
to analyse the amount of space an algorithm uses.
Space can mean different things here, like Heap space, disk space, overal
memory or allocated memory. I would like to analyze some of these. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-analyze-space-usage-of-Flink-algorithms-tp10555.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-09 Thread Ufuk Celebi



On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org) wrote:
> I'm against using the news@ list for that.
> The promise of the news@ list is that its low-traffic and only for news. If
> we now start having job offers (and potentially some questions on them
> etc.) it'll be a list with more than some announcements.
> That's also the reason why the news@ list is completely moderated.

I agree with Robert. I would consider that to be spam if posted to news@.




Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-09 Thread Robert Metzger
I'm against using the news@ list for that.
The promise of the news@ list is that its low-traffic and only for news. If
we now start having job offers (and potentially some questions on them
etc.) it'll be a list with more than some announcements.
That's also the reason why the news@ list is completely moderated.

On Fri, Dec 9, 2016 at 9:36 AM, Kostas Tzoumas  wrote:

> I appreciate the concern Kanstantsin!
>
> We do have a news@ mailing list, but it has been under-utilized so far.
> Perhaps revamping that one would do it?
>
> My only concern is that subscribing to a new mailing list is an overhead.
> As a temp solution, we could cc the dev and user list in the first few
> (say, 3) threads and encourage folks in these threads to sign up for the
> news@ list.
>
> On Thu, Dec 8, 2016 at 10:07 AM, Robert Metzger 
> wrote:
>
>> Thank you for speaking up Kanstantsin. I really don't want to downgrade
>> the experience on the user@ list.
>>
>> I wonder if jobs@flink would be a too narrowly-scoped mailing list.
>> Maybe we could also start a community@flink (alternatively also general@)
>> mailing list for everything relating to the broader Flink community,
>> including job offers, meetups, conferences and everything else that is
>> important for the community to grow.
>>
>> On Thu, Dec 8, 2016 at 3:10 AM, Radu Tudoran 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> I think the idea of having such a monthly thread is very good and it
>>> might even help to further attract new people in the community.
>>>
>>> In the same time I do not think that 1 extra mail per month is necessary
>>> a spam J
>>>
>>> In the same time – we can also consider a jobs@flink mailing list
>>>
>>>
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Senior Research Engineer - Big Data Expert
>>>
>>> IT R Division
>>>
>>>
>>>
>>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: *radu.tudo...@huawei.com *
>>>
>>> Mobile: +49 15209084330 <01520%209084330>
>>>
>>> Telephone: +49 891588344173 <089%201588344173>
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information from
>>> HUAWEI, which is intended only for the person or entity whose address is
>>> listed above. Any use of the information contained herein in any way
>>> (including, but not limited to, total or partial disclosure, reproduction,
>>> or dissemination) by persons other than the intended recipient(s) is
>>> prohibited. If you receive this e-mail in error, please notify the sender
>>> by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:* Kanstantsin Kamkou [mailto:kkam...@gmail.com]
>>> *Sent:* Wednesday, December 07, 2016 9:57 PM
>>> *To:* user@flink.apache.org
>>> *Subject:* Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the
>>> mailing lists?
>>>
>>>
>>>
>>> Is it possible to avoid such a spam here? If I need a new job, I could
>>> search it. The same way I might want to subscribe to a different thread,
>>> like jobs@flink. * The idea itself is great.
>>>
>>>
>>>
>>> On Tue, 6 Dec 2016 at 14:04, Kostas Tzoumas  wrote:
>>>
>>> yes, of course!
>>>
>>>
>>>
>>> On Tue, Dec 6, 2016 at 12:54 PM, Márton Balassi <
>>> balassi.mar...@gmail.com> wrote:
>>>
>>> +1. It keeps it both organized and to a reasonable minimum overhead.
>>>
>>>
>>>
>>> Would you volunteer for starting the mail thread each month then, Kostas?
>>>
>>>
>>>
>>> Best,
>>>
>>>
>>>
>>> Marton
>>>
>>>
>>>
>>> On Tue, Dec 6, 2016 at 6:42 AM, Kostas Tzoumas 
>>> wrote:
>>>
>>> Hi folks,
>>>
>>>
>>>
>>>
>>>
>>> I'd like to see how the community feels about a monthly "Who is hiring on
>>>
>>>
>>> Flink" email thread on the dev@ and user@ mailing lists where folks can
>>>
>>>
>>> post job positions related to Flink.
>>>
>>>
>>>
>>>
>>>
>>> I personally think that posting individual job offerings in the mailing
>>>
>>>
>>> list is off-topic (hence I have refrained to do that wearing my company
>>>
>>>
>>> hat, and I have discouraged others when they asked for my opinion on
>>> this),
>>>
>>>
>>> but I thought that a monthly thread like this would be both helpful to
>>> the
>>>
>>>
>>> community and not cause overhead.
>>>
>>>
>>>
>>>
>>>
>>> Cheers,
>>>
>>>
>>> Kostas
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


RE: OutOfMemory when looping on dataset filter

2016-12-09 Thread LINZ, Arnaud
Hi,
It works with a local cluster, I effectively use a yarn cluster here.

Pushing user code to the lib folder of every datanode is not convenient ; it’s 
hard to maintain & exploit.

If I cannot make the treatment serializable to put everything in a group reduce 
function, I think I’ll try materializing the day-splitted dataset on the hdfs 
and then I’ll loop on re-reading it in the job manager. It’s even probably 
faster than looping on the full select.

Arnaud

De : Stephan Ewen [mailto:se...@apache.org]
Envoyé : vendredi 9 décembre 2016 11:57
À : user@flink.apache.org
Objet : Re: OutOfMemory when looping on dataset filter

Hi Arnaud!

I assume you are using either a standalone setup, or a YARN session?

This looks to me as if classes cannot be properly garbage collected. Since each 
job (each day is executed as a separate job), loads the classes again, the 
PermGen space runs over if classes are not properly collected.

The can be many reasons why classes are not properly collected, most 
prominently some user code or libraries create threads that hold onto objects.

A quick workaround could be to simply add the relevant libraries directly to 
the "lib" folder when starting the YARN session, and not having them in the 
user code jar file. That way, they need not be reloaded for each job.

Greetings,
Stephan



On Fri, Dec 9, 2016 at 11:30 AM, LINZ, Arnaud 
> wrote:
Hi,
Caching could have been a solution. Another one is using a “group reduce” by 
day, but for that I need to make the “applyComplexNonDistributedTreatment” 
serializable, and that’s not an easy task.

1 & 2 - The OOM in my current test occurs in the 8th iteration (7 were 
successful). In this current test, only the first day have data, in others days 
the filter() returns an empty dataset.
3 – The OOM is in a task manager, during the “select” phase.

Digging further, I see it’s a PermGen OOM occurring during deserialization, not 
a heap one.

2016-12-08 17:38:27,835 ERROR org.apache.flink.runtime.taskmanager.Task 
- Task execution failed.
java.lang.OutOfMemoryError: PermGen space
at sun.misc.Unsafe.defineClass(Native Method)
at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
at 
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
at 
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
at java.security.AccessController.doPrivileged(Native Method)
at 
sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
at 
sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
at 
sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
at 
java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
at 
java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.hive.hcatalog.common.HCatUtil.deserialize(HCatUtil.java:117)
at 
org.apache.hive.hcatalog.mapreduce.HCatSplit.readFields(HCatSplit.java:139)
at 

Re: separation of JVMs for different applications

2016-12-09 Thread Till Rohrmann
Hi Manu,

afaik there is no JIRA for standalone v2.0 yet. So feel free to open an
JIRA for it.

Just a small correction, FLIP-6 is not almost finished yet. But we're
working on it and are happy for every helping hand :-)

Cheers,
Till

On Fri, Dec 9, 2016 at 2:27 AM, Manu Zhang  wrote:

> If there are not any existing jira for standalone v2.0, may I open a new
> one ?
>
> Thanks,
> Manu
>
> On Wed, Dec 7, 2016 at 12:39 PM Manu Zhang 
> wrote:
>
>> Good to know that.
>>
>> Is it the "standalone setup v2.0" section ? The wiki page has no
>> Google-Doc-like change histories.
>> Any jiras opened for that ? Not sure that will be noticed given FLIP-6 is
>> almost finished.
>>
>> Thanks,
>> Manu
>>
>> On Tue, Dec 6, 2016 at 11:55 PM Stephan Ewen  wrote:
>>
>> Hi!
>>
>> We are currently changing the resource and process model quite a bit:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>> As part of that, I think it makes sense to introduce something like that.
>>
>> What you can do today is to set TaskManagers to use one slot only, and
>> then start multiple TaskManagers per machine. That makes sure that JVMs are
>> never shared across machines.
>> If you use the "start-cluster.sh" script from Flink, you can enter the
>> same hostname multiple times in the workers file, and it will start
>> multiple TaskManagers on a machine.
>>
>> Best,
>> Stephan
>>
>>
>>
>> On Tue, Dec 6, 2016 at 3:51 AM, Manu Zhang 
>> wrote:
>>
>> Thanks Stephan,
>>
>> They don't use YARN now but I think they will consider it.  Do you think
>> it would be beneficial to provide such an option as "separate-jvm" in
>> stand-alone mode for streaming processor and long running services ? Or do
>> you think it would introduce too much complexity ?
>>
>> Manu
>>
>> On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen  wrote:
>>
>> Hi!
>>
>> Are your customers using YARN? In that case, the default configuration
>> will start a new YARN application per Flink job, no JVMs are shared between
>> jobs. By default, even each slot has its own JVM.
>>
>> Greetings,
>> Stephan
>>
>> PS: I think the "spawning new JVMs" is what Till referred to when saying
>> "spinning up a new cluster". Keep in mind that Flink is also a batch
>> processor, and it handles sequences of short batch jobs (as issued for
>> example by interactive shells) and it pre-allocates and manages a lot of
>> memory for batch jobs.
>>
>>
>>
>> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang 
>> wrote:
>>
>> The pro for the multi-tenant cluster mode is that you can share data
>> between jobs and you don't have to spin up a new cluster for each job.
>>
>>
>> I don't think we have to spin up a new cluster for each job if every job
>> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
>> new job when free slots are available. How can we share data between jobs
>> and why ?
>>
>>
>>
>> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann 
>> wrote:
>>
>> The pro for the multi-tenant cluster mode is that you can share data
>> between jobs and you don't have to spin up a new cluster for each job. This
>> might be helpful for scenarios where you want to run many short-lived and
>> light-weight jobs.
>>
>> But the important part is that you don't have to use this method. You can
>> also start a new Flink cluster per job which will then execute the job
>> isolated from any other jobs (given that you don't submit other jobs to
>> this cluster).
>>
>> Cheers,
>> Till
>>
>> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang 
>> wrote:
>>
>> Thanks Fabian and Till.
>>
>> We have customers who are interested in using Flink but very concerned
>> about that "multiple jobs share the same set of TMs". I've just joined the
>> community recently so I'm not sure whether there has been a discussion over
>> the "multi-tenant cluster mode" before.
>>
>> The cons are one job/user's failure may crash another, which is
>> unacceptable in a multi-tenant scenario.
>> What are the pros ? Do the pros overweigh the cons ?
>>
>> Manu
>>
>> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann 
>> wrote:
>>
>> Hi Manu,
>>
>> with Flip-6 we will be able to support stricter application isolation by
>> starting for each job a dedicated JobManager which will execute its tasks
>> on TM reserved solely for this job. But at the same time we will continue
>> supporting the multi-tenant cluster mode where tasks belonging to multiple
>> jobs share the same set of TMs and, thus, might share information between
>> them.
>>
>> Cheers,
>> Till
>>
>> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske  wrote:
>>
>> Hi Manu,
>>
>> As far as I know, there are not plans to change the stand-alone
>> deployment.
>> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
>> etc.) which allow to start 

Re: OutOfMemory when looping on dataset filter

2016-12-09 Thread Stephan Ewen
Hi Arnaud!

I assume you are using either a standalone setup, or a YARN session?

This looks to me as if classes cannot be properly garbage collected. Since
each job (each day is executed as a separate job), loads the classes again,
the PermGen space runs over if classes are not properly collected.

The can be many reasons why classes are not properly collected, most
prominently some user code or libraries create threads that hold onto
objects.

A quick workaround could be to simply add the relevant libraries directly
to the "lib" folder when starting the YARN session, and not having them in
the user code jar file. That way, they need not be reloaded for each job.

Greetings,
Stephan



On Fri, Dec 9, 2016 at 11:30 AM, LINZ, Arnaud 
wrote:

> Hi,
>
> Caching could have been a solution. Another one is using a “group reduce”
> by day, but for that I need to make the “applyComplexNonDistributedTreatment”
> serializable, and that’s not an easy task.
>
>
>
> 1 & 2 - The OOM in my current test occurs in the 8th iteration (7 were
> successful). In this current test, only the first day have data, in others
> days the filter() returns an empty dataset.
>
> 3 – The OOM is in a task manager, during the “select” phase.
>
>
>
> Digging further, I see it’s a PermGen OOM occurring during
> deserialization, not a heap one.
>
>
>
> 2016-12-08 17:38:27,835 ERROR org.apache.flink.runtime.
> taskmanager.Task - Task execution failed.
>
> java.lang.OutOfMemoryError: PermGen space
>
> at sun.misc.Unsafe.defineClass(Native Method)
>
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:
> 63)
>
> at sun.reflect.MethodAccessorGenerator$1.run(
> MethodAccessorGenerator.java:399)
>
> at sun.reflect.MethodAccessorGenerator$1.run(
> MethodAccessorGenerator.java:396)
>
> at java.security.AccessController.doPrivileged(Native
> Method)
>
> at sun.reflect.MethodAccessorGenerator.generate(
> MethodAccessorGenerator.java:395)
>
> at sun.reflect.MethodAccessorGenerator.
> generateSerializationConstructor(MethodAccessorGenerator.java:113)
>
> at sun.reflect.ReflectionFactory.
> newConstructorForSerialization(ReflectionFactory.java:331)
>
> at java.io.ObjectStreamClass.getSerializableConstructor(
> ObjectStreamClass.java:1376)
>
> at java.io.ObjectStreamClass.
> access$1500(ObjectStreamClass.java:72)
>
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:
> 493)
>
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:
> 468)
>
> at java.security.AccessController.doPrivileged(Native
> Method)
>
> at java.io.ObjectStreamClass.<
> init>(ObjectStreamClass.java:468)
>
> at java.io.ObjectStreamClass.
> lookup(ObjectStreamClass.java:365)
>
> at java.io.ObjectStreamClass.initNonProxy(
> ObjectStreamClass.java:602)
>
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1622)
>
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1517)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1771)
>
> at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.java:1350)
>
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:1990)
>
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1915)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1798)
>
> at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.java:1350)
>
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:1990)
>
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1915)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1798)
>
> at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.java:1350)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:370)
>
> at org.apache.hive.hcatalog.common.HCatUtil.deserialize(
> HCatUtil.java:117)
>
> at org.apache.hive.hcatalog.mapreduce.HCatSplit.
> readFields(HCatSplit.java:139)
>
> at org.apache.flink.api.java.hadoop.mapreduce.wrapper.
> HadoopInputSplit.readObject(HadoopInputSplit.java:102)
>
>
>
>
>
> *De :* Fabian Hueske [mailto:fhue...@gmail.com]
> *Envoyé :* vendredi 9 décembre 2016 10:51
> *À :* user@flink.apache.org
> *Objet :* Re: OutOfMemory when looping on dataset filter
>
>
>
> Hi Arnaud,
>
> Flink does not cache data at the moment.
>
> What happens is that for every day, the complete program is executed,
> i.e., also the program that computes 

Re: conditional dataset output

2016-12-09 Thread lars . bachmann

Hi Chesnay,

I actually thought about the same but like you said it seems a bit hacky 
;-). Anyway thank you!


Regards,

Lars

Am 08.12.2016 16:47 schrieb Chesnay Schepler:

Hello Lars,

The only other way i can think of how this could be done is by wrapping 
the used
outputformat in a custom format, which calls open on the wrapped 
outputformat

when you receive the first record.

This should work but is quite hacky though as it interferes with the
format life-cycle.

Regards,
Chesnay

On 08.12.2016 16:39, lars.bachm...@posteo.de wrote:

Hi,

let's assume I have a dataset and depending on the input data and 
different filter operations this dataset can be empty. Now I want to 
output the dataset to HD, but I want that files are only created if 
the dataset is not empty. If the dataset is empty I don't want any 
files. The default way: dataset.write(...) will always create as many 
files as the parallelism of this operator is configured - in case of 
an empty dataset all files would be empty as well. I thought about 
doing something like:


if (dataset.count() > 0) {
   dataset.write(...)
}

but I don't think thats the way to go, because dataset.count() 
triggers a execution of the (sub)program.


Is there a simple way how to avoid creating empty files for empty 
datasets?


Regards,

Lars



RE: OutOfMemory when looping on dataset filter

2016-12-09 Thread LINZ, Arnaud
Hi,
Caching could have been a solution. Another one is using a “group reduce” by 
day, but for that I need to make the “applyComplexNonDistributedTreatment” 
serializable, and that’s not an easy task.

1 & 2 - The OOM in my current test occurs in the 8th iteration (7 were 
successful). In this current test, only the first day have data, in others days 
the filter() returns an empty dataset.
3 – The OOM is in a task manager, during the “select” phase.

Digging further, I see it’s a PermGen OOM occurring during deserialization, not 
a heap one.

2016-12-08 17:38:27,835 ERROR org.apache.flink.runtime.taskmanager.Task 
- Task execution failed.
java.lang.OutOfMemoryError: PermGen space
at sun.misc.Unsafe.defineClass(Native Method)
at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
at 
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
at 
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
at java.security.AccessController.doPrivileged(Native Method)
at 
sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
at 
sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
at 
sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
at 
java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
at 
java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.hive.hcatalog.common.HCatUtil.deserialize(HCatUtil.java:117)
at 
org.apache.hive.hcatalog.mapreduce.HCatSplit.readFields(HCatSplit.java:139)
at 
org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit.readObject(HadoopInputSplit.java:102)


De : Fabian Hueske [mailto:fhue...@gmail.com]
Envoyé : vendredi 9 décembre 2016 10:51
À : user@flink.apache.org
Objet : Re: OutOfMemory when looping on dataset filter

Hi Arnaud,
Flink does not cache data at the moment.
What happens is that for every day, the complete program is executed, i.e., 
also the program that computes wholeSet.
Each execution should be independent from each other and all temporary data be 
cleaned up.
Since Flink executes programs in a pipelined (or streaming) fashion, wholeSet 
is not kept in memory.
There is also no manual way to pin a DataSet in memory at the moment.

One think you could try is to push the day filter as close to the original 
source as possible.
This would reduce the size of intermediate results.
In general, Flink's DataSet API is implemented to work on managed memory. The 
most common reason for OOMs are user function that collect data on the heap.
However, this should not accumulate and be cleaned up after a job finished.
Collect can be a bit fragile here, because it moves all data to the client 
process.

I also have a few questions:
1. After how many iterations of the for loop is the OOM happening.
2. Is the data for all days of the same size?
3. Is the OOM happening in Flink or in the client process which fetches the 
result?
Best, Fabian


2016-12-09 10:35 GMT+01:00 LINZ, Arnaud 

Re: OutOfMemory when looping on dataset filter

2016-12-09 Thread Fabian Hueske
Hi Arnaud,

Flink does not cache data at the moment.
What happens is that for every day, the complete program is executed, i.e.,
also the program that computes wholeSet.
Each execution should be independent from each other and all temporary data
be cleaned up.
Since Flink executes programs in a pipelined (or streaming) fashion,
wholeSet is not kept in memory.
There is also no manual way to pin a DataSet in memory at the moment.

One think you could try is to push the day filter as close to the original
source as possible.
This would reduce the size of intermediate results.

In general, Flink's DataSet API is implemented to work on managed memory.
The most common reason for OOMs are user function that collect data on the
heap.
However, this should not accumulate and be cleaned up after a job finished.
Collect can be a bit fragile here, because it moves all data to the client
process.

I also have a few questions:
1. After how many iterations of the for loop is the OOM happening.
2. Is the data for all days of the same size?
3. Is the OOM happening in Flink or in the client process which fetches the
result?

Best, Fabian


2016-12-09 10:35 GMT+01:00 LINZ, Arnaud :

> Hello,
>
>
>
> I have a non-distributed treatment to apply to a DataSet of timed events,
> one day after another in a flink batch.
>
> My algorithm is:
>
>
>
> // wholeSet is too big to fit in RAM with a collect(), so we cut it in
> pieces
>
> DataSet wholeSet = [Select WholeSet];
>
> for (day 1 to 31) {
>
> List<> dayData = wholeSet.filter(day).collect();
>
> applyComplexNonDistributedTreatment(dayData);
>
> }
>
>
>
> Even if each day can perfectly fit in RAM (I’ve made a test where only the
> first day have data), I quickly get a OOM in a task manager at one point in
> the loop, so I guess that the “wholeSet” si keeped several times times in
> Ram.
>
>
>
> Two questions :
>
> 1)  Is there a better way of handling it where the “select wholeset”
> is made only once ?
>
> 2)  Even when the “select wholeset” is made at each iteration, how
> can I completely remove the old set so that I don’t get an OOM ?
>
>
>
> Thanks,
>
> Arnaud
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


OutOfMemory when looping on dataset filter

2016-12-09 Thread LINZ, Arnaud
Hello,

I have a non-distributed treatment to apply to a DataSet of timed events, one 
day after another in a flink batch.
My algorithm is:

// wholeSet is too big to fit in RAM with a collect(), so we cut it in pieces
DataSet wholeSet = [Select WholeSet];
for (day 1 to 31) {
List<> dayData = wholeSet.filter(day).collect();
applyComplexNonDistributedTreatment(dayData);
}

Even if each day can perfectly fit in RAM (I’ve made a test where only the 
first day have data), I quickly get a OOM in a task manager at one point in the 
loop, so I guess that the “wholeSet” si keeped several times times in Ram.

Two questions :

1)  Is there a better way of handling it where the “select wholeset” is 
made only once ?

2)  Even when the “select wholeset” is made at each iteration, how can I 
completely remove the old set so that I don’t get an OOM ?

Thanks,
Arnaud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-09 Thread Kostas Tzoumas
I appreciate the concern Kanstantsin!

We do have a news@ mailing list, but it has been under-utilized so far.
Perhaps revamping that one would do it?

My only concern is that subscribing to a new mailing list is an overhead.
As a temp solution, we could cc the dev and user list in the first few
(say, 3) threads and encourage folks in these threads to sign up for the
news@ list.

On Thu, Dec 8, 2016 at 10:07 AM, Robert Metzger  wrote:

> Thank you for speaking up Kanstantsin. I really don't want to downgrade
> the experience on the user@ list.
>
> I wonder if jobs@flink would be a too narrowly-scoped mailing list.
> Maybe we could also start a community@flink (alternatively also general@)
> mailing list for everything relating to the broader Flink community,
> including job offers, meetups, conferences and everything else that is
> important for the community to grow.
>
> On Thu, Dec 8, 2016 at 3:10 AM, Radu Tudoran 
> wrote:
>
>> Hi,
>>
>>
>>
>> I think the idea of having such a monthly thread is very good and it
>> might even help to further attract new people in the community.
>>
>> In the same time I do not think that 1 extra mail per month is necessary
>> a spam J
>>
>> In the same time – we can also consider a jobs@flink mailing list
>>
>>
>>
>>
>>
>> Dr. Radu Tudoran
>>
>> Senior Research Engineer - Big Data Expert
>>
>> IT R Division
>>
>>
>>
>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>
>>
>> E-mail: *radu.tudo...@huawei.com *
>>
>> Mobile: +49 15209084330 <01520%209084330>
>>
>> Telephone: +49 891588344173 <089%201588344173>
>>
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>>
>>
>> *From:* Kanstantsin Kamkou [mailto:kkam...@gmail.com]
>> *Sent:* Wednesday, December 07, 2016 9:57 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the
>> mailing lists?
>>
>>
>>
>> Is it possible to avoid such a spam here? If I need a new job, I could
>> search it. The same way I might want to subscribe to a different thread,
>> like jobs@flink. * The idea itself is great.
>>
>>
>>
>> On Tue, 6 Dec 2016 at 14:04, Kostas Tzoumas  wrote:
>>
>> yes, of course!
>>
>>
>>
>> On Tue, Dec 6, 2016 at 12:54 PM, Márton Balassi > > wrote:
>>
>> +1. It keeps it both organized and to a reasonable minimum overhead.
>>
>>
>>
>> Would you volunteer for starting the mail thread each month then, Kostas?
>>
>>
>>
>> Best,
>>
>>
>>
>> Marton
>>
>>
>>
>> On Tue, Dec 6, 2016 at 6:42 AM, Kostas Tzoumas 
>> wrote:
>>
>> Hi folks,
>>
>>
>>
>>
>>
>> I'd like to see how the community feels about a monthly "Who is hiring on
>>
>>
>> Flink" email thread on the dev@ and user@ mailing lists where folks can
>>
>>
>> post job positions related to Flink.
>>
>>
>>
>>
>>
>> I personally think that posting individual job offerings in the mailing
>>
>>
>> list is off-topic (hence I have refrained to do that wearing my company
>>
>>
>> hat, and I have discouraged others when they asked for my opinion on
>> this),
>>
>>
>> but I thought that a monthly thread like this would be both helpful to the
>>
>>
>> community and not cause overhead.
>>
>>
>>
>>
>>
>> Cheers,
>>
>>
>> Kostas
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>