RE: YARN : Different cutoff for job and task managers

2019-11-20 Thread Gwenhael Pasquiers
This works; I had some issues modifying my scripts, but it’s OK and I could 
confirm by JMX that env.java.opts.jobmanager had priority over the “normal” 
heap size (calculated from cutoff).

Thanks ! 

From: Yang Wang 
Sent: mercredi 20 novembre 2019 03:52
To: Gwenhael Pasquiers 
Cc: user@flink.apache.org
Subject: Re: YARN : Different cutoff for job and task managers

Hi Gwenhael,

I'm afraid that we could not set different cut-off to jobmanager and 
taskmanager. You could set
the jvm args manually to work around. For example, 
'env.java.opts.jobmanager=-Xms3072m -Xmx3072m'.
In most jvm implementation, the rightmost Xmx Xms will take effect. So i think 
it should work.
Please have a try.


Best,
Yang

Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> 
于2019年11月19日周二 下午10:56写道:

Hello,



In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve 
an important cutoff.



Our issue is that the same cutoff applies to both task and job managers : the 
heap size of the job manager then becomes too low.



Is there a way to apply different cutoffs to job and task managers ?



Regards,


RE: YARN : Different cutoff for job and task managers

2019-11-20 Thread Gwenhael Pasquiers
I see, good idea,  I’ll try that and tell you the result.

Thanks,

From: Yang Wang 
Sent: mercredi 20 novembre 2019 03:52
To: Gwenhael Pasquiers 
Cc: user@flink.apache.org
Subject: Re: YARN : Different cutoff for job and task managers

Hi Gwenhael,

I'm afraid that we could not set different cut-off to jobmanager and 
taskmanager. You could set
the jvm args manually to work around. For example, 
'env.java.opts.jobmanager=-Xms3072m -Xmx3072m'.
In most jvm implementation, the rightmost Xmx Xms will take effect. So i think 
it should work.
Please have a try.


Best,
Yang

Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> 
于2019年11月19日周二 下午10:56写道:

Hello,



In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve 
an important cutoff.



Our issue is that the same cutoff applies to both task and job managers : the 
heap size of the job manager then becomes too low.



Is there a way to apply different cutoffs to job and task managers ?



Regards,


YARN : Different cutoff for job and task managers

2019-11-19 Thread Gwenhael Pasquiers
Hello,



In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve 
an important cutoff.



Our issue is that the same cutoff applies to both task and job managers : the 
heap size of the job manager then becomes too low.



Is there a way to apply different cutoffs to job and task managers ?



Regards,


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-13 Thread Gwenhael Pasquiers
>From what I understood, in your case you might solve your issue by using 
>specific key classes instead of Strings.

Maybe you could create key classes that have a user-specified hashcode that 
could take the previous key's hashcode as a value. That way your data shouldn't 
be sent over the wire and stay in the same partition thus on the same 
taskmanager..


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
I think I finally found a way to "simulate" a Timer thanks to the the 
processWatermark function of the AbstractStreamOperator.

Sorry for the monologue.

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 10 novembre 2017 16:02
To: 'user@flink.apache.org' <user@flink.apache.org>
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Hello,

Finally, even after creating my operator, I still get the error : "Timers can 
only be used on keyed operators".

Isn't there any way around this ? A way to "key" my stream without shuffling 
the data ?

From: Gwenhael Pasquiers
Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>; 
'user@flink.apache.org' <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
Hello,

Finally, even after creating my operator, I still get the error : "Timers can 
only be used on keyed operators".

Isn't there any way around this ? A way to "key" my stream without shuffling 
the data ?

From: Gwenhael Pasquiers
Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>; 
'user@flink.apache.org' <user@flink.apache.org>
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' <user@flink.apache.org>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


Streaming : a way to "key by partition id" without redispatching data

2017-11-09 Thread Gwenhael Pasquiers
Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


RE: Great number of jobs and numberOfBuffers

2017-08-31 Thread Gwenhael Pasquiers
Hi,

Well yes, I could probably make it work with a constant number of operators 
(and consequently buffers) by developing specific input and output classes, and 
that way I'd have a workaround for that buffers issue.

The size of my job is input-dependent mostly because my code creates one full 
processing chain per input folder (there is one folder per hour of data) and 
that processing chain has many functions. The number of input folders is an 
argument to the software and can vary from 1 to hundreds (when we re-compute 
something like a month of data).

for(String folder:folders){
env = Environment.getExecutionEnvironment();
env.readText(folder).[..].writeText(folder + "_processed");
env.execute();
}

There is one full processing chain per folder (a loop is creating them) because 
the name of the output is the same as the name of the input, and I did not want 
to create a specific "rolling-output" with bucketers.

So, yes, I could develop a source that supports a list of folders and puts the 
source name into the produced data. My processing could also handle tuples 
where one field is the source folder and use it as a key where appropriate, and 
finally yes I could also create a sink that will "dispatch" the datasets to the 
correct folder according to that field of the tuple.

But at first that seemed too complicated (premature optimization) so I coded my 
job the "naïve" way then splitted it because I thought that there would be some 
sort of recycling of those buffers between jobs.

If the recycling works in normal conditions maybe the question is whether it 
also works when multiple jobs are ran from within the same jar VS running the 
jar multiple times ?




PS: I don't have log files at hand, the app is ran on a separate and secured 
platform. Sure, I could try to reproduce the issue with a mock app but I'm not 
sure it would help the discussion.


yarn and checkpointing

2017-08-29 Thread Gwenhael Pasquiers
Hi,

Is it possible to use checkpointing to restore the state of an app after a 
restart on yarn ?

From what I've seen it looks like that checkpointing only works within a flink 
cluster life-time. However the yarn mode has one cluster per app, and (unless 
the app crashes and is automatically restarted by the restart-strategy) the 
over-yarn-cluster has the same life time as the app, so when we stop the app, 
we stop the cluster that will clean it's checkpoints. 

So when the app is stopped, the cluster dies and cleans the checkpoints folder. 
Then of course it won't be able to restore the state at the next run.

When running flink on yarn are we supposed to cancel with savepoint and then 
restore from savepoint ?


RE: Great number of jobs and numberOfBuffers

2017-08-29 Thread Gwenhael Pasquiers
Hello,

Sorry to ask you again, but no idea on this ?

-Original Message-
From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] 
Sent: lundi 21 août 2017 12:04
To: Nico Kruber <n...@data-artisans.com>
Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org
Subject: RE: Great number of jobs and numberOfBuffers

Hi,

1/ Yes, the loop is part of the application I run on yarn. Something like :
public class MyFlinkApp {
public static void main(String[] args){
// parse arguments etc
for(String datehour:datehours){
ExecutionEnvironment env = 
ExecutionEnvironment.getExectionEnvironment();
env.readText(datehour)
.union(env.readText(datehour-1))
.union(env.readText(datehour-2))
.map()
.groupby()
.sortGroup()
.reduceGroup()
...

// other steps, unions, processing, inputs, outputs

JobExecutionResult result = env.execute();

// read accumulators and send some statsd statistics at 
the end of batch
}
}
}

2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib 
per node.

3/ I remember that we had the same error (not enough buffers) right at startup. 
I guess that it was trying to allocate all buffers at startup as it is now 
doing it progressively (but still fails at the same limit)

4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 
outputs (TextHadoopOutputFormat, one in the middle of the processing, the other 
at the end), it is composed of multiple union, flatmap, map, groupby, 
sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app 
on a whole week of data, we will have to start (24 * 7) batches. Parallelism 
has the default value except for the output writers (32 and 4) in order to 
limit the numbers of files on HDFS.



-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com]
Sent: vendredi 18 août 2017 14:58
To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org
Subject: Re: Great number of jobs and numberOfBuffers

Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without 
splitting it?
4) can you give a rough sketch of what your program is composed of (operators, 
parallelism,...)? 


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
> 
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is 
> different ?). We've been having this issue for a long time and we were 
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd 
> like to try to solve this issue.
 
> I'm not submitting individual jobs to a standalone cluster.
> 
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
>   Environment env = Environment.getExectionEnvironment();
>   env. . . .;
>   env.execute();
> }
> 
> 
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:
> required 96, but only 35 available. The total number of network 
> buffers is currently set to 36864. You can increase this number by 
> setting the configuration key 'taskmanager.network.numberOfBuffers'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> ferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
> 
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen 
> of them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values 
> quite a bit. That way we can handle days of data, but not weeks or 
> months. This is not very scalable. And as you say, I felt that those 
> buffers should be recycled and that way we should have no limit as 
> long as each batch is small enough.
 
> If I start my command again (removing the datehours that were 
> successfully
> processed) it will work since it's a fresh new cluster.
 
> -Original Message-
> From: Ufuk Celebi [mailto:u...@apache.org]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi <u...@apache.org>

RE: Great number of jobs and numberOfBuffers

2017-08-21 Thread Gwenhael Pasquiers
Hi,

1/ Yes, the loop is part of the application I run on yarn. Something like :
public class MyFlinkApp {
public static void main(String[] args){
// parse arguments etc
for(String datehour:datehours){
ExecutionEnvironment env = 
ExecutionEnvironment.getExectionEnvironment();
env.readText(datehour)
.union(env.readText(datehour-1))
.union(env.readText(datehour-2))
.map()
.groupby()
.sortGroup()
.reduceGroup()
...

// other steps, unions, processing, inputs, outputs

JobExecutionResult result = env.execute();

// read accumulators and send some statsd statistics at 
the end of batch
}
}
}

2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib 
per node.

3/ I remember that we had the same error (not enough buffers) right at startup. 
I guess that it was trying to allocate all buffers at startup as it is now 
doing it progressively (but still fails at the same limit)

4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 
outputs (TextHadoopOutputFormat, one in the middle of the processing, the other 
at the end), it is composed of multiple union, flatmap, map, groupby, 
sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app 
on a whole week of data, we will have to start (24 * 7) batches. Parallelism 
has the default value except for the output writers (32 and 4) in order to 
limit the numbers of files on HDFS.



-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: vendredi 18 août 2017 14:58
To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org
Subject: Re: Great number of jobs and numberOfBuffers

Hi Gwenhael,
the effect you describe sounds a bit strange. Just to clarify your setup:

1) Is the loop you were posting part of the application you run on yarn?
2) How many nodes are you running with?
3) What is the error you got when you tried to run the full program without 
splitting it?
4) can you give a rough sketch of what your program is composed of (operators, 
parallelism,...)? 


Nico

On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote:
> Hello,
> 
> This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is 
> different ?). We've been having this issue for a long time and we were 
> careful not to schedule too many jobs.
 
> I'm currently upgrading the application towards flink 1.2.1 and I'd 
> like to try to solve this issue.
 
> I'm not submitting individual jobs to a standalone cluster.
> 
> I'm starting a single application that has a loop in its main function :
> for(. . .) {
>   Environment env = Environment.getExectionEnvironment();
>   env. . . .;
>   env.execute();
> }
> 
> 
> The job fails at some point later during execution with the following
> error:
 java.io.IOException: Insufficient number of network buffers:
> required 96, but only 35 available. The total number of network 
> buffers is currently set to 36864. You can increase this number by 
> setting the configuration key 'taskmanager.network.numberOfBuffers'. 
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf
> ferPo
> ol(NetworkBufferPool.java:196)
> Before splitting the job in multiple sub-jobs it failed right at startup.
> 
> Each "batch" job takes 10 to 30 minutes and it fails after about dozen 
> of them (the first ones should have had enough time to be recycled).
 
> We've already increased the jobmanager and "numberOfBuffers" values 
> quite a bit. That way we can handle days of data, but not weeks or 
> months. This is not very scalable. And as you say, I felt that those 
> buffers should be recycled and that way we should have no limit as 
> long as each batch is small enough.
 
> If I start my command again (removing the datehours that were 
> successfully
> processed) it will work since it's a fresh new cluster.
 
> -Original Message-
> From: Ufuk Celebi [mailto:u...@apache.org]
> Sent: jeudi 17 août 2017 11:24
> To: Ufuk Celebi <u...@apache.org>
> Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>;
> user@flink.apache.org; Nico Kruber <n...@data-artisans.com>
 Subject: Re:
> Great number of jobs and numberOfBuffers
> 
> PS: Also pulling in Nico (CC'd) who is working on the network stack.
> 
> On Thu, Aug 17, 2017 at 11:23 

RE: Great number of jobs and numberOfBuffers

2017-08-17 Thread Gwenhael Pasquiers
Hello,

This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is different 
?). We've been having this issue for a long time and we were careful not to 
schedule too many jobs.

I'm currently upgrading the application towards flink 1.2.1 and I'd like to try 
to solve this issue.

I'm not submitting individual jobs to a standalone cluster.

I'm starting a single application that has a loop in its main function :
for(. . .) {
Environment env = Environment.getExectionEnvironment();
env. . . .;
env.execute();
}


The job fails at some point later during execution with the following error:
java.io.IOException: Insufficient number of network buffers: required 96, but 
only 35 available. The total number of network buffers is currently set to 
36864. You can increase this number by setting the configuration key 
'taskmanager.network.numberOfBuffers'.
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)

Before splitting the job in multiple sub-jobs it failed right at startup.

Each "batch" job takes 10 to 30 minutes and it fails after about dozen of them 
(the first ones should have had enough time to be recycled).

We've already increased the jobmanager and "numberOfBuffers" values quite a 
bit. That way we can handle days of data, but not weeks or months. This is not 
very scalable. And as you say, I felt that those buffers should be recycled and 
that way we should have no limit as long as each batch is small enough.

If I start my command again (removing the datehours that were successfully 
processed) it will work since it's a fresh new cluster.

-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org] 
Sent: jeudi 17 août 2017 11:24
To: Ufuk Celebi <u...@apache.org>
Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>; 
user@flink.apache.org; Nico Kruber <n...@data-artisans.com>
Subject: Re: Great number of jobs and numberOfBuffers

PS: Also pulling in Nico (CC'd) who is working on the network stack.

On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Gwenhael,
>
> the network buffers are recycled automatically after a job terminates.
> If this does not happen, it would be quite a major bug.
>
> To help debug this:
>
> - Which version of Flink are you using?
> - Does the job fail immediately after submission or later during execution?
> - Is the following correct: the batch job that eventually fails 
> because of missing network buffers runs without problems if you submit 
> it to a fresh cluster with the same memory
>
> The network buffers are recycled after the task managers report the 
> task being finished. If you immediately submit the next batch there is 
> a slight chance that the buffers are not recycled yet. As a possible 
> temporary work around, could you try waiting for a short amount of 
> time before submitting the next batch?
>
> I think we should also be able to run the job without splitting it up 
> after increasing the network memory configuration. Did you already try 
> this?
>
> Best,
>
> Ufuk
>
>
> On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com> wrote:
>> Hello,
>>
>>
>>
>> We’re meeting a limit with the numberOfBuffers.
>>
>>
>>
>> In a quite complex job we do a lot of operations, with a lot of 
>> operators, on a lot of folders (datehours).
>>
>>
>>
>> In order to split the job into smaller “batches” (to limit the 
>> necessary
>> “numberOfBuffers”) I’ve done a loop over the batches (handle the 
>> datehours 3 by 3), for each batch I create a new env then call the execute() 
>> method.
>>
>>
>>
>> However it looks like there is no cleanup : after a while, if the 
>> number of batches is too big, there is an error saying that the 
>> numberOfBuffers isn’t high enough. It kinds of looks like some leak. 
>> Is there a way to clean them up ?


Great number of jobs and numberOfBuffers

2017-08-17 Thread Gwenhael Pasquiers
Hello,

We're meeting a limit with the numberOfBuffers.

In a quite complex job we do a lot of operations, with a lot of operators, on a 
lot of folders (datehours).

In order to split the job into smaller "batches" (to limit the necessary 
"numberOfBuffers") I've done a loop over the batches (handle the datehours 3 by 
3), for each batch I create a new env then call the execute() method.

However it looks like there is no cleanup : after a while, if the number of 
batches is too big, there is an error saying that the numberOfBuffers isn't 
high enough. It kinds of looks like some leak. Is there a way to clean them up ?


RE: Event-time and first watermark

2017-08-04 Thread Gwenhael Pasquiers
We're using a AssignerWithPunctuatedWatermarks that extracts a timestamp from 
the data. It keeps and returns the higher timestamp it has ever seen and 
returns a new Watermark everytime the value grows.

I know it's bad for performances, but for the moment it's not the issue, i want 
the most possibly up-to-date watermark.

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: vendredi 4 août 2017 12:22
To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Cc: Nico Kruber <n...@data-artisans.com>; user@flink.apache.org
Subject: Re: Event-time and first watermark

Hi,

How are you defining the watermark, i.e. what kind of watermark extractor are 
you using?

Best,
Aljoscha

> On 3. Aug 2017, at 17:45, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com> wrote:
> 
> We're not using a Window but a more basic ProcessFunction to handle sessions. 
> We made this choice because we have to handle (millions of) sessions that can 
> last from 10 seconds to 24 hours so we wanted to handle things manually using 
> the State class.
> 
> We're using the watermark as an event-time "clock" to:
> * compute "lateness" of a message relatively to the watermark (most 
> recent message from the stream)
> * fire timer events
> 
> We're using event-time instead of processing time because our stream will be 
> late and data arrive by hourly bursts.
> 
> Maybe we're misusing the watermark ?
> 
> B.R.
> 
> -Original Message-
> From: Nico Kruber [mailto:n...@data-artisans.com]
> Sent: jeudi 3 août 2017 16:30
> To: user@flink.apache.org
> Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
> Subject: Re: Event-time and first watermark
> 
> Hi Gwenhael,
> "A Watermark(t) declares that event time has reached time t in that 
> stream, meaning that there should be no more elements from the stream 
> with a timestamp t’ <= t (i.e. events with timestamps older or equal 
> to the watermark)." [1]
> 
> Therefore, they should be behind the actual event with timestamp t.
> 
> What is it that you want to achieve in the end? What do you want to use the 
> watermark for? They are basically a means to defining when an event time 
> window ends.
> 
> 
> Nico
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> event_time.html#event-time-and-watermarks
> 
> On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
>> Hi,
>> 
>> From my tests it seems that the initial watermark value is 
>> Long.MIN_VALUE even though my first data passed through the timestamp 
>> extractor before arriving into my ProcessFunction. It looks like the 
>> watermark "lags" behind the data by one message.
>> 
>> Is there a way to have a watermark more "up to date" ? Or is the only 
>> way to compute it myself into my ProcessFunction ?
>> 
>> Thanks.
> 



RE: Event-time and first watermark

2017-08-03 Thread Gwenhael Pasquiers
We're not using a Window but a more basic ProcessFunction to handle sessions. 
We made this choice because we have to handle (millions of) sessions that can 
last from 10 seconds to 24 hours so we wanted to handle things manually using 
the State class.
 
We're using the watermark as an event-time "clock" to:
* compute "lateness" of a message relatively to the watermark (most recent 
message from the stream)
* fire timer events 

We're using event-time instead of processing time because our stream will be 
late and data arrive by hourly bursts.
 
Maybe we're misusing the watermark ?

B.R.

-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: jeudi 3 août 2017 16:30
To: user@flink.apache.org
Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Subject: Re: Event-time and first watermark

Hi Gwenhael,
"A Watermark(t) declares that event time has reached time t in that stream, 
meaning that there should be no more elements from the stream with a timestamp 
t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1]

Therefore, they should be behind the actual event with timestamp t.

What is it that you want to achieve in the end? What do you want to use the 
watermark for? They are basically a means to defining when an event time window 
ends.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
event_time.html#event-time-and-watermarks

On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
> Hi,
> 
> From my tests it seems that the initial watermark value is 
> Long.MIN_VALUE even though my first data passed through the timestamp 
> extractor before arriving into my ProcessFunction. It looks like the 
> watermark "lags" behind the data by one message.
> 
> Is there a way to have a watermark more "up to date" ? Or is the only 
> way to compute it myself into my ProcessFunction ?
> 
> Thanks.



Event-time and first watermark

2017-08-03 Thread Gwenhael Pasquiers
Hi,

>From my tests it seems that the initial watermark value is Long.MIN_VALUE even 
>though my first data passed through the timestamp extractor before arriving 
>into my ProcessFunction. It looks like the watermark "lags" behind the data by 
>one message.

Is there a way to have a watermark more "up to date" ? Or is the only way to 
compute it myself into my ProcessFunction ?

Thanks.


RE: A way to purge an empty session

2017-06-26 Thread Gwenhael Pasquiers
Thanks !

I didn’t know of this function and indeed it seems to match my needs better 
than Windows. And I’ll be able to clear my state once it’s empty (and re-create 
it when necessary).

B.R.

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: lundi 26 juin 2017 12:13
To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>
Cc: user@flink.apache.org
Subject: Re: A way to purge an empty session

Hi Gwenhael,
have you considered to use a ProcessFunction? With a ProcessFunction you have 
direct access to state (that you register yourself) and you can register timers 
that trigger a callback function when they expire.
So you can cleanup state when you receive an element or when a timer expires.

Best, Fabian

2017-06-23 18:46 GMT+02:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hello,

This may be premature optimization for memory usage but here is my question :

I have to do an app that will have to monitor sessions of (millions of) users. 
I don’t know when the session starts nor ends, nor a reasonable maximum 
duration.
I want to have a maximum duration (timeout) of 24H.

However I’d like to be able to PURGE sessions that ended as soon as possible to 
free memory.

I use a Trigger to trig my WindowFunction for each input EVENT. It will (when 
relevant) output a SESSION (with start and end timestamps). I use the Evictor 
in order to remove the EVENT used to build the SESSION  (they have a Boolean 
field “removable” set to true from the WindowFunction so that the Evictor knows 
it can remove them in the evictAfter method)… That way at least I can clean the 
content of the windows.
However From what I’m seeing it looks like the window instance will still stay 
alive (even if empty) until it reaches its maximum duration (24 hours) even if 
the session it represents lasted 2 minutes: at the end of the day I might have 
millions of sessions in memory when in reality only thousands are really alive 
at a given time. That might also really slow down the backups and restores of 
the application if it needs to store millions of empty windows.

I’m aware that my need looks like the session windows. But the session window 
works mainly by merging windows that overlap within a session gap. My session 
gap can be multiple hours long, so I’m afraid that it would not help me…

So my question is : is there a way to inform the “Trigger” that the windows has 
no more elements and that it can be PURGED. Or a way for a WindowFunction to 
“kill” the window it’s being applied on ? Of course my window might be 
re-created if new events arrive later for the same key.

My other option is to simply use a flatmap operator that will hold an HashMap 
of sessions, that way I might be able to clean it up when I close my sessions, 
but I think it would be prettier to rely on Flink’s Windows ;-)

Thanks in advance,



A way to purge an empty session

2017-06-23 Thread Gwenhael Pasquiers
Hello,

This may be premature optimization for memory usage but here is my question :

I have to do an app that will have to monitor sessions of (millions of) users. 
I don’t know when the session starts nor ends, nor a reasonable maximum 
duration.

I want to have a maximum duration (timeout) of 24H.

However I’d like to be able to PURGE sessions that ended as soon as possible to 
free memory.

I use a Trigger to trig my WindowFunction for each input EVENT. It will (when 
relevant) output a SESSION (with start and end timestamps). I use the Evictor 
in order to remove the EVENT used to build the SESSION  (they have a Boolean 
field “removable” set to true from the WindowFunction so that the Evictor knows 
it can remove them in the evictAfter method)… That way at least I can clean the 
content of the windows.
However From what I’m seeing it looks like the window instance will still stay 
alive (even if empty) until it reaches its maximum duration (24 hours) even if 
the session it represents lasted 2 minutes: at the end of the day I might have 
millions of sessions in memory when in reality only thousands are really alive 
at a given time. That might also really slow down the backups and restores of 
the application if it needs to store millions of empty windows.

I’m aware that my need looks like the session windows. But the session window 
works mainly by merging windows that overlap within a session gap. My session 
gap can be multiple hours long, so I’m afraid that it would not help me…

So my question is : is there a way to inform the “Trigger” that the windows has 
no more elements and that it can be PURGED. Or a way for a WindowFunction to 
“kill” the window it’s being applied on ? Of course my window might be 
re-created if new events arrive later for the same key.

My other option is to simply use a flatmap operator that will hold an HashMap 
of sessions, that way I might be able to clean it up when I close my sessions, 
but I think it would be prettier to rely on Flink’s Windows ;-)

Thanks in advance,


Kafka 0.10 jaas multiple clients

2017-04-26 Thread Gwenhael Pasquiers
Hello,
Up to now we’ve been using kafka with jaas (plain login/password) the following 
way:

-  yarnship the jaas file

-  add the jaas file name into “flink-conf.yaml” using property 
“env.java.opts”

How to support multiple secured kafka 0.10 consumers and producers (with 
different logins and password of course) ?
From what I saw in the kafka sources, the entry name “KafkaClient” is hardcoded…
Best Regards,

Gwenhaël PASQUIERS


RE: Kafka offset commits

2017-04-21 Thread Gwenhael Pasquiers
We need more tests but we think we found the cause for the loss of our kafka 
consumer offset in kafka 0.10.  It might be because of the server-side 
parameter “offsets.topic.retention.minutes” that defaults to 1440 minutes (1 
day). And our flink consumer was “off” for more than a day before restarting.

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: mercredi 19 avril 2017 11:39
To: user@flink.apache.org
Subject: Re: Kafka offset commits

Thanks for the clarification Aljoscha!
Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the 
“1.0” part on my first reply).

@Gwenhael, I’ll try to reclarify some of the questions you asked:

Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?

For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / 
ZK for the migration. State migration from 1.0 to 1.2 is not possible.

As Aljoscha pointed out, if you are using the same “group.id”, then there 
shouldn’t be a problem w.r.t. retaining the offset position. You just have to 
keep in mind of [1], as you would need to manually increase all committed 
offsets in Kafka / ZK by 1 for that consumer group.

Note that there is no state migration happening here, but just simply relying 
on offsets committed in Kafka / ZK to define the starting position when you’re 
starting the job in 1.2.

We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.

For FlinkKafkaConsumer08, the starting offset is actually always read from ZK.
Again, this isn’t a “restore”, but just defining start position using committed 
offsets.

Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).

There is no expiration to the offsets stored in the snapshots. The only issue 
would be if Kafka has expired that offset due to data retention settings.
If you’re sure that at the time of the restore the data hasn’t expired yet, 
there might be something weird going on.
AFAIK, the only issue that was previously known to possibly cause this was [2].
Could you check if that issue may be the case?

[1] https://issues.apache.org/jira/browse/FLINK-4723
[2] https://issues.apache.org/jira/browse/FLINK-6006

On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek 
(aljos...@apache.org<mailto:aljos...@apache.org>) wrote:
Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. 
Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up 
where the old one stopped, right?

Best,
Aljoscha

On 18. Apr 2017, at 16:19, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:

Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).
B.R.

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Kafka offset commits

Hi,

The FlinkKafkaConsumer in 1.2 is able to restore from older version state 
snapshots and bridge the migration, so there should be no problem in reading 
the offsets from older state. The smallest or highest offsets will only be used 
if the offset no longer exists due to Kafka data retention settings.

Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 
1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 
1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in 
savepoints, so it should be fine.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4723


On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>) wrote:
Hello,

We’re going to migrate some applications that consume data from a Kafka 0.8 
from Flink 1.

shaded version of legacy kafka connectors

2017-03-20 Thread Gwenhael Pasquiers
Hi,

Before doing it myself I thought it would be better to ask.
We need to consume from kafka 0.8 and produce to kafka 0.10 in a flink app.
I guess there will be classes and package names conflicts for a lot of 
dependencies of both connectors.

The obvious solution it to make a “shaded” version of the kafka 0.8 connector 
so that it can coexist with the 0.10 version.

Does it already exists ?


RE: Cross operation on two huge datasets

2017-03-03 Thread Gwenhael Pasquiers
Maybe I won’t try to broadcast my dataset after all : I finally found again 
what made me implement it with my own cloning flatmap + partitioning :

Quoted from 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#broadcast-variables

Note: As the content of broadcast variables is kept in-memory on each node, it 
should not become too large. For simpler things like scalar values you can 
simply make parameters part of the closure of a function, or use the 
withParameters(...) method to pass in a configuration.

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 3 mars 2017 18:10
To: user@flink.apache.org
Subject: RE: Cross operation on two huge datasets

To answer Ankit,

It is a batch application.

Yes, I admit I did broadcasting by hand. I did it that way because the only 
other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I 
was afraid that “withBroadcast” would make flink load the whole dataset in 
memory before broadcasting it rather than sending its elements 1 by 1.

I’ll try to use it, I’ll take anything that will make my code cleaner !

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 3 mars 2017 17:55
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: RE: Cross operation on two huge datasets

I tried putting my structure in a dataset but when serializing  kryo went in an 
infinite recursive loop (crashed in StackOverflowException). So I’m staying 
with the static reference.

As for the partitioning, there is always the case of shapes overlapping on both 
right and left sections, I think it would take quite a bit of effort to 
implement. And it’s always better if I don’t have to manually set a frontier 
between the two (n) zones

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: vendredi 3 mars 2017 02:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,

in my view, indexing and searching are two isolated processes and they should 
be separated. Maybe you should take the RTree structure as a new dataset 
(fortunately it's static, right?) and store it to a distributed cache or DFS 
that can be accessed by operators from any nodes. That will make the mapping 
from index partition to operator consistent (regardless of the locality 
problem).

Besides, you can make a "weak" index first, e.g., partitioning the points and 
shapes to "left" and "right", and in that way you do not need to broadcast the 
points to all index nodes (only left to left and right to right).

Best,
Xingcan

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:
If I understood correctly, you have just implemented flink broadcasting by hand 
☺.

You are still sending out the whole points dataset to each shape partition – 
right?

I think this could be optimized by using a keyBy or custom partition which is 
common across shapes & points – that should make sure a given point always go 
to same shape node.

I didn’t understand why Till Rohrmann said “you don’t know where Flink will 
schedule the new operator instance” – new operators are created when flink job 
is started – right? So, there should be no more new operators once the job is 
running and if you use consistent hash partitioning, same input should always 
end at same task manager node.

You could store the output as Flink State – that would be more fault tolerant 
but storing it as cache in JVM should work too.

Is this a batch job or streaming?

Between I am a newbee to Flink, still only learning – so take my suggestions 
with caution ☺

Thanks
Ankit

From: Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Cross operation on two huge datasets

I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

• Each node contains 1/N th  of the shapes (simple repartition() of the 
shapes dataset).

• The points will be cloned so that each partition of the points 
dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different 
partition

That way, whatever flink choses to do, each point will be compared to each 
shape. That’s why I think that in my case I can keep it in the JVM without 
issues. I’d prefer to avoid ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to 
serialize the RTree anyway and it went in StackOverflowError (locally with only 
1 parititon,

RE: Cross operation on two huge datasets

2017-03-03 Thread Gwenhael Pasquiers
To answer Ankit,

It is a batch application.

Yes, I admit I did broadcasting by hand. I did it that way because the only 
other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I 
was afraid that “withBroadcast” would make flink load the whole dataset in 
memory before broadcasting it rather than sending its elements 1 by 1.

I’ll try to use it, I’ll take anything that will make my code cleaner !

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 3 mars 2017 17:55
To: user@flink.apache.org
Subject: RE: Cross operation on two huge datasets

I tried putting my structure in a dataset but when serializing  kryo went in an 
infinite recursive loop (crashed in StackOverflowException). So I’m staying 
with the static reference.

As for the partitioning, there is always the case of shapes overlapping on both 
right and left sections, I think it would take quite a bit of effort to 
implement. And it’s always better if I don’t have to manually set a frontier 
between the two (n) zones

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: vendredi 3 mars 2017 02:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,

in my view, indexing and searching are two isolated processes and they should 
be separated. Maybe you should take the RTree structure as a new dataset 
(fortunately it's static, right?) and store it to a distributed cache or DFS 
that can be accessed by operators from any nodes. That will make the mapping 
from index partition to operator consistent (regardless of the locality 
problem).

Besides, you can make a "weak" index first, e.g., partitioning the points and 
shapes to "left" and "right", and in that way you do not need to broadcast the 
points to all index nodes (only left to left and right to right).

Best,
Xingcan

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:
If I understood correctly, you have just implemented flink broadcasting by hand 
☺.

You are still sending out the whole points dataset to each shape partition – 
right?

I think this could be optimized by using a keyBy or custom partition which is 
common across shapes & points – that should make sure a given point always go 
to same shape node.

I didn’t understand why Till Rohrmann said “you don’t know where Flink will 
schedule the new operator instance” – new operators are created when flink job 
is started – right? So, there should be no more new operators once the job is 
running and if you use consistent hash partitioning, same input should always 
end at same task manager node.

You could store the output as Flink State – that would be more fault tolerant 
but storing it as cache in JVM should work too.

Is this a batch job or streaming?

Between I am a newbee to Flink, still only learning – so take my suggestions 
with caution ☺

Thanks
Ankit

From: Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Cross operation on two huge datasets

I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

• Each node contains 1/N th  of the shapes (simple repartition() of the 
shapes dataset).

• The points will be cloned so that each partition of the points 
dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different 
partition

That way, whatever flink choses to do, each point will be compared to each 
shape. That’s why I think that in my case I can keep it in the JVM without 
issues. I’d prefer to avoid ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to 
serialize the RTree anyway and it went in StackOverflowError (locally with only 
1 parititon, not even on yarn).


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 15:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink 
will schedule the new operator instance. It might be the case that an operator 
responsible for another partition gets scheduled to this JVM and then it has 
the wrong RTree information. Maybe you can model the set of RTrees as a 
DataSet[(PartitionKey, RTree)] and then join with the partitioned point data 
set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwe

RE: Cross operation on two huge datasets

2017-03-03 Thread Gwenhael Pasquiers
I tried putting my structure in a dataset but when serializing  kryo went in an 
infinite recursive loop (crashed in StackOverflowException). So I’m staying 
with the static reference.

As for the partitioning, there is always the case of shapes overlapping on both 
right and left sections, I think it would take quite a bit of effort to 
implement. And it’s always better if I don’t have to manually set a frontier 
between the two (n) zones

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: vendredi 3 mars 2017 02:40
To: user@flink.apache.org
Subject: Re: Cross operation on two huge datasets

Hi Gwen,

in my view, indexing and searching are two isolated processes and they should 
be separated. Maybe you should take the RTree structure as a new dataset 
(fortunately it's static, right?) and store it to a distributed cache or DFS 
that can be accessed by operators from any nodes. That will make the mapping 
from index partition to operator consistent (regardless of the locality 
problem).

Besides, you can make a "weak" index first, e.g., partitioning the points and 
shapes to "left" and "right", and in that way you do not need to broadcast the 
points to all index nodes (only left to left and right to right).

Best,
Xingcan

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:
If I understood correctly, you have just implemented flink broadcasting by hand 
☺.

You are still sending out the whole points dataset to each shape partition – 
right?

I think this could be optimized by using a keyBy or custom partition which is 
common across shapes & points – that should make sure a given point always go 
to same shape node.

I didn’t understand why Till Rohrmann said “you don’t know where Flink will 
schedule the new operator instance” – new operators are created when flink job 
is started – right? So, there should be no more new operators once the job is 
running and if you use consistent hash partitioning, same input should always 
end at same task manager node.

You could store the output as Flink State – that would be more fault tolerant 
but storing it as cache in JVM should work too.

Is this a batch job or streaming?

Between I am a newbee to Flink, still only learning – so take my suggestions 
with caution ☺

Thanks
Ankit

From: Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Cross operation on two huge datasets

I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

• Each node contains 1/N th  of the shapes (simple repartition() of the 
shapes dataset).

• The points will be cloned so that each partition of the points 
dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different 
partition

That way, whatever flink choses to do, each point will be compared to each 
shape. That’s why I think that in my case I can keep it in the JVM without 
issues. I’d prefer to avoid ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to 
serialize the RTree anyway and it went in StackOverflowError (locally with only 
1 parititon, not even on yarn).


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 15:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink 
will schedule the new operator instance. It might be the case that an operator 
responsible for another partition gets scheduled to this JVM and then it has 
the wrong RTree information. Maybe you can model the set of RTrees as a 
DataSet[(PartitionKey, RTree)] and then join with the partitioned point data 
set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers 
[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:%5bgwenhael.pasqui...@ericsson.com%5D(mailto:gwenhael.pasqui...@ericsson.com)>
 wrote:
The best for me would be to make it “persist” inside of the JVM heap in some 
map since I don’t even know if the structure is Serializable (I could try). But 
I understand.

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I 
did to “split” the dataset parsing over the nodes ?


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 14:42

To: user@flink.apache.o

RE: Cross operation on two huge datasets

2017-03-03 Thread Gwenhael Pasquiers
I managed to avoid the classes reload by controlling the order of operations 
using “.withBroadcast”.

My first task (shapes parsing) now outputs an empty “DataSet synchro”

Then whenever I need to wait for that synchro dataset to be ready (and mainly 
the operations prior to that dataset to be done), I use 
“.withBroadcast(“synchro”, synchro)” and I do a get for that broadcast variable 
in my open method.

That way I’m sure that I won’t begin testing my points against an incomplete 
static RTree. And also, since it’s a single job again, my static RTree remains 
valid ☺

Seems to be good for now even if the static thingie is a bit dirty.

However I’m surprised that reading 20 MB of parquet become 21GB of “bytes sent” 
by the flink reader.


From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 2 mars 2017 16:28
To: user@flink.apache.org
Subject: RE: Cross operation on two huge datasets

I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

· Each node contains 1/N th  of the shapes (simple repartition() of the 
shapes dataset).

· The points will be cloned so that each partition of the points 
dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different 
partition

That way, whatever flink choses to do, each point will be compared to each 
shape. That’s why I think that in my case I can keep it in the JVM without 
issues. I’d prefer to avoid ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to 
serialize the RTree anyway and it went in StackOverflowError (locally with only 
1 parititon, not even on yarn).


From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: jeudi 2 mars 2017 15:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink 
will schedule the new operator instance. It might be the case that an operator 
responsible for another partition gets scheduled to this JVM and then it has 
the wrong RTree information. Maybe you can model the set of RTrees as a 
DataSet[(PartitionKey, RTree)] and then join with the partitioned point data 
set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers 
[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)>
 wrote:
The best for me would be to make it “persist” inside of the JVM heap in some 
map since I don’t even know if the structure is Serializable (I could try). But 
I understand.

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I 
did to “split” the dataset parsing over the nodes ?


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 14:42

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. 
writing to a shared directory or emitting the model and using one of Flink’s 
sinks) and when creating the new operators you have to reread it from there 
(usually in the open method or from a Flink source as part of a broadcasted 
data set).

If you want to give a data set to all instances of an operator, then you should 
broadcast this data set. You can do something like

DataSet input = ...

DataSet broadcastSet = ...



input.flatMap(new RichFlatMapFunction<Integer, Integer>() {

List broadcastSet;



@Override

public void open(Configuration configuration) {

broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");

}



@Override

public void flatMap(Integer integer, Collector collector) throws 
Exception {



}

}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till
​

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
I (almost) made it work the following way:

1rst job : Read all the shapes, repartition() them equally on my N nodes, then 
on each node fill a static RTree (thanks for the tip).

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” 
the dataset to all nodes, then apply a simple flatmap that will use the 
previously initialized static RTree, adding the Shape information to the point. 
Then do a groupBy to merge the points that were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that 
the taskmanager reloads the jar file between two jo

RE: Cross operation on two huge datasets

2017-03-02 Thread Gwenhael Pasquiers
I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

· Each node contains 1/N th  of the shapes (simple repartition() of the 
shapes dataset).

· The points will be cloned so that each partition of the points 
dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different 
partition

That way, whatever flink choses to do, each point will be compared to each 
shape. That’s why I think that in my case I can keep it in the JVM without 
issues. I’d prefer to avoid ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to 
serialize the RTree anyway and it went in StackOverflowError (locally with only 
1 parititon, not even on yarn).


From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: jeudi 2 mars 2017 15:40
To: user@flink.apache.org
Subject: Re: Cross operation on two huge datasets


Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink 
will schedule the new operator instance. It might be the case that an operator 
responsible for another partition gets scheduled to this JVM and then it has 
the wrong RTree information. Maybe you can model the set of RTrees as a 
DataSet[(PartitionKey, RTree)] and then join with the partitioned point data 
set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers 
[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)>
 wrote:
The best for me would be to make it “persist” inside of the JVM heap in some 
map since I don’t even know if the structure is Serializable (I could try). But 
I understand.

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I 
did to “split” the dataset parsing over the nodes ?


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 14:42

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. 
writing to a shared directory or emitting the model and using one of Flink’s 
sinks) and when creating the new operators you have to reread it from there 
(usually in the open method or from a Flink source as part of a broadcasted 
data set).

If you want to give a data set to all instances of an operator, then you should 
broadcast this data set. You can do something like

DataSet input = ...

DataSet broadcastSet = ...



input.flatMap(new RichFlatMapFunction<Integer, Integer>() {

List broadcastSet;



@Override

public void open(Configuration configuration) {

broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");

}



@Override

public void flatMap(Integer integer, Collector collector) throws 
Exception {



}

}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till
​

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
I (almost) made it work the following way:

1rst job : Read all the shapes, repartition() them equally on my N nodes, then 
on each node fill a static RTree (thanks for the tip).

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” 
the dataset to all nodes, then apply a simple flatmap that will use the 
previously initialized static RTree, adding the Shape information to the point. 
Then do a groupBy to merge the points that were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that 
the taskmanager reloads the jar file between two jobs, making me lose my static 
RTree (I guess that newly loaded class overwrites the older one).

I have two questions :

-  Is there a way to avoid that jar reload // can I store my RTree 
somewhere in jdk or flink, locally to the taskmanager, in a way that it 
wouldn’t be affected by the jar reload (since it would not be stored in any 
class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure 
that some operations are done (parsing of shape) BEFORE starting others 
handling the points.

-  Is there a way to do that in a clean way using flink operators ? I’d 
need to be able to use the result of the iteration of a dataset inside of my 
map.

Something like :

datasetA.flatmap(new MyMapOperator(datasetB))…

And In my implementation I would be able to iterate the whole datasetB BEFORE 
doing any operation in datasetA. That way I could parse all my shapes in an 
RTree before handling my points

RE: Cross operation on two huge datasets

2017-03-02 Thread Gwenhael Pasquiers
The best for me would be to make it “persist” inside of the JVM heap in some 
map since I don’t even know if the structure is Serializable (I could try). But 
I understand.

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I 
did to “split” the dataset parsing over the nodes ?


From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: jeudi 2 mars 2017 14:42
To: user@flink.apache.org
Subject: Re: Cross operation on two huge datasets


Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. 
writing to a shared directory or emitting the model and using one of Flink’s 
sinks) and when creating the new operators you have to reread it from there 
(usually in the open method or from a Flink source as part of a broadcasted 
data set).

If you want to give a data set to all instances of an operator, then you should 
broadcast this data set. You can do something like

DataSet input = ...

DataSet broadcastSet = ...



input.flatMap(new RichFlatMapFunction<Integer, Integer>() {

List broadcastSet;



@Override

public void open(Configuration configuration) {

broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");

}



@Override

public void flatMap(Integer integer, Collector collector) throws 
Exception {



}

}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till
​

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
I (almost) made it work the following way:

1rst job : Read all the shapes, repartition() them equally on my N nodes, then 
on each node fill a static RTree (thanks for the tip).

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” 
the dataset to all nodes, then apply a simple flatmap that will use the 
previously initialized static RTree, adding the Shape information to the point. 
Then do a groupBy to merge the points that were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that 
the taskmanager reloads the jar file between two jobs, making me lose my static 
RTree (I guess that newly loaded class overwrites the older one).

I have two questions :

-  Is there a way to avoid that jar reload // can I store my RTree 
somewhere in jdk or flink, locally to the taskmanager, in a way that it 
wouldn’t be affected by the jar reload (since it would not be stored in any 
class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure 
that some operations are done (parsing of shape) BEFORE starting others 
handling the points.

-  Is there a way to do that in a clean way using flink operators ? I’d 
need to be able to use the result of the iteration of a dataset inside of my 
map.

Something like :

datasetA.flatmap(new MyMapOperator(datasetB))…

And In my implementation I would be able to iterate the whole datasetB BEFORE 
doing any operation in datasetA. That way I could parse all my shapes in an 
RTree before handling my points, without relying on static

Or any other way that would allow me to do something similar.

Thanks in advance for your insight.

Gwen’

From: Jain, Ankit [mailto:ankit.j...@here.com<mailto:ankit.j...@here.com>]
Sent: jeudi 23 février 2017 19:21
To: user@flink.apache.org<mailto:user@flink.apache.org>
Cc: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>

Subject: Re: Cross operation on two huge datasets

Hi Gwen,
I would recommend looking into a data structure called RTree that is designed 
specifically for this use case, i.e matching point to a region.

Thanks
Ankit

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

I need (o

RE: Cross operation on two huge datasets

2017-03-02 Thread Gwenhael Pasquiers
I (almost) made it work the following way:

1rst job : Read all the shapes, repartition() them equally on my N nodes, then 
on each node fill a static RTree (thanks for the tip).

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” 
the dataset to all nodes, then apply a simple flatmap that will use the 
previously initialized static RTree, adding the Shape information to the point. 
Then do a groupBy to merge the points that were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that 
the taskmanager reloads the jar file between two jobs, making me lose my static 
RTree (I guess that newly loaded class overwrites the older one).

I have two questions :

-  Is there a way to avoid that jar reload // can I store my RTree 
somewhere in jdk or flink, locally to the taskmanager, in a way that it 
wouldn’t be affected by the jar reload (since it would not be stored in any 
class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure 
that some operations are done (parsing of shape) BEFORE starting others 
handling the points.

-  Is there a way to do that in a clean way using flink operators ? I’d 
need to be able to use the result of the iteration of a dataset inside of my 
map.

Something like :

datasetA.flatmap(new MyMapOperator(datasetB))…

And In my implementation I would be able to iterate the whole datasetB BEFORE 
doing any operation in datasetA. That way I could parse all my shapes in an 
RTree before handling my points, without relying on static

Or any other way that would allow me to do something similar.

Thanks in advance for your insight.

Gwen’

From: Jain, Ankit [mailto:ankit.j...@here.com]
Sent: jeudi 23 février 2017 19:21
To: user@flink.apache.org
Cc: Fabian Hueske <fhue...@gmail.com>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
I would recommend looking into a data structure called RTree that is designed 
specifically for this use case, i.e matching point to a region.

Thanks
Ankit

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

I need (or at least I think I do) to do a cross operation between two huge 
datasets. One dataset is a list of points. The other one is a list of shapes 
(areas).

I want to know, for each point, the areas (they might overlap so a point can be 
in multiple areas) it belongs to so I thought I’d “cross” my points and areas 
since I need to test each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some 
point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two 
datasets into RAM or if it’s able to split the job in multiple iterations (even 
if it means reading one of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free 
to correct me ☺

I’m using flink 1.0.1.

Thanks in advance

Gwen’



RE: Cross operation on two huge datasets

2017-02-23 Thread Gwenhael Pasquiers
Hi and thanks for your answers !

I’m not sure I can define any index to split the workload since in my case any 
point could be in any zone...
I think I’m currently trying to do it the way you call “theta-join”:

1-  Trying to split one dataset over the cluster and prepare it for work 
against with the other one (ex: parse the shapes)

a.   Either using partitioning

b.   Either using N sources + filtering based on hash so I get 
complementary datasets

2-  Make my other dataset go “through” all the “splits” of the first one 
and enrich / filter it

a.   The dataset would probably have to be entirely read multiple times 
from hdfs (one time per “split”)

I have other ideas but I don’t know if it’s doable in flink.

Question:

Is there a way for a object (key selector, flatmap) to obtain (and wait for) 
the result of a previous dataset ? Only way I can think of is a “cross” between 
my one-record-dataset (the result) and the other dataset. But maybe that’s very 
bad regarding resources ?

I’d like to try using a flatmap that clones the dataset in N parts (adding a 
partition key 0 to N-1 to each record), then use partitioning to “dispatch” 
each clone of the dataset to a matching “shape matcher” partition; then I’d use 
cross to do the work, then group back the results together (in case N clones of 
a point were inside different shapes). Maybe that would split the workload of 
the cross by dividing the size of one of the two datasets member of that cross …

sorry for my rambling if I’m not clear.

B.R.


From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: jeudi 23 février 2017 06:00
To: user@flink.apache.org
Subject: Re: Cross operation on two huge datasets

Hi all,

@Gwen From the database's point of view, the only way to avoid Cartesian 
product in join is to use index, which exhibits as key grouping in Flink. 
However, it only supports many-to-one mapping now, i.e., a shape or a point can 
only be distributed to a single group. Only points and shapes belonging to the 
same group can be joined and that could reduce the inherent pair comparisons 
(compared with a Cartesian product). It's perfectly suitable for equi-join.

@Fabian I saw this thread when I was just considering about theta-join (which 
will eventually be supported) in Flink. Since it's impossible to group (index) 
a dataset for an arbitrary theta-join, I think we may need some duplication 
mechanism here. For example, split a dataset into n parts and send the other 
dataset to all of these parts. This could be more useful in stream join. BTW, 
it seems that I've seen another thread discussing about this, but can not find 
it now. What do you think?

Best,
Xingcan

On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:
Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

I need (or at least I think I do) to do a cross operation between two huge 
datasets. One dataset is a list of points. The other one is a list of shapes 
(areas).

I want to know, for each point, the areas (they might overlap so a point can be 
in multiple areas) it belongs to so I thought I’d “cross” my points and areas 
since I need to test each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some 
point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two 
datasets into RAM or if it’s able to split the job in multiple iterations (even 
if it means reading one of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free 
to correct me ☺

I’m using flink 1.0.1.

Thanks in advance

Gwen’




RE: Making batches of small messages

2017-01-12 Thread Gwenhael Pasquiers
Thanks,

We are waiting for the 1.2 release eagerly ☺


From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: mercredi 11 janvier 2017 18:32
To: user@flink.apache.org
Subject: Re: Making batches of small messages

Hi,
I think this is a case for the ProcessFunction that was recently added and will 
be included in Flink 1.2.
ProcessFunction allows to register timers (so the 5 secs timeout can be 
addressed). You can maintain the fault tolerance guarantees if you collect the 
records in managed state. That way they will be included in checkpoints and 
restored in case of a failure.
If you are on Flink 1.1.x, you will need to implement a custom operator which 
is a much more low-level interface.
Best, Fabian

2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

Sorry if this was already asked.

For performances reasons (streaming as well as batch) I’d like to “group” 
messages (let’s say by batches of 1000) before sending them to my sink (kafka, 
but mainly ES) so that I have a smaller overhead.

I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of 
such an operation is 1. Moreover I’d need some “timeout” (send the current 
batch to next operator after 5s if it did not reach 1000 messages before that).

I could also create a flatMap “String to List” that cumulates messages 
until it reaches 1000 and then sends them to output, however that does not 
solve the timeout issue (not sure I could call out.collect() from a Timer 
thread), and even more importantly I’m afraid that that would screw up the 
exactly-once policy (flink could not know that I was stacking messages, I could 
very well be filtering them) in case of a crash.

My Sink could also create the chunks, with it’s own timer / counter, but I’m 
also afraid that it would bread the exactly-once thingie since in case of crash 
there is no way that flink would know if the message was really sent or stacked 
…

Is there a proper way to do what I want ?

Thanks in advance,

Gwenhaël PASQUIERS



Making batches of small messages

2017-01-11 Thread Gwenhael Pasquiers
Hi,

Sorry if this was already asked.

For performances reasons (streaming as well as batch) I'd like to "group" 
messages (let's say by batches of 1000) before sending them to my sink (kafka, 
but mainly ES) so that I have a smaller overhead.

I've seen the "countWindow" operation but if I'm not wrong the parallelism of 
such an operation is 1. Moreover I'd need some "timeout" (send the current 
batch to next operator after 5s if it did not reach 1000 messages before that).

I could also create a flatMap "String to List" that cumulates messages 
until it reaches 1000 and then sends them to output, however that does not 
solve the timeout issue (not sure I could call out.collect() from a Timer 
thread), and even more importantly I'm afraid that that would screw up the 
exactly-once policy (flink could not know that I was stacking messages, I could 
very well be filtering them) in case of a crash.

My Sink could also create the chunks, with it's own timer / counter, but I'm 
also afraid that it would bread the exactly-once thingie since in case of crash 
there is no way that flink would know if the message was really sent or stacked 
...

Is there a proper way to do what I want ?

Thanks in advance,

Gwenhaël PASQUIERS


RE: Programmatically get live values of accumulators

2017-01-03 Thread Gwenhael Pasquiers
Indeed, that looks like what we need. We currently rely on flink 1.0.1, that 
feature might be a good reason to update our flink version. I’ll test it. Many 
thanks ! !

From: Jamie Grier [mailto:ja...@data-artisans.com]
Sent: lundi 2 janvier 2017 20:56
To: user@flink.apache.org
Subject: Re: Programmatically get live values of accumulators

Hi Gwenhael,

I think what you actually want is to use the Apache Flink metrics interface.  
See the following: 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html

Sending metrics to StatsD is supported out-of-the-box.

-Jamie


On Mon, Jan 2, 2017 at 1:34 AM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi, and best wishes for the year to come ☺

I’d like to be able to programmatically get the (live) values of accumulators 
in order to send them using a statsd (or another) client in the JobManager of a 
yarn-deployed application. I say live because I’d like to use that in streaming 
(24/7) applications, and send live stats, I cannot way for the application to 
end.

I’ve seen that there is a json API (I’d prefer no to have my app poll itself).
I’ve seen some code on github (tests files) where it’s done using the 
underlying akka framework, I don’t mind doing it the same way and creating an 
actor to get notifications messages, but I don’t know the best way, and there 
probably is a better one.

Thanks in advance,

Gwenhaël PASQUIERS



--

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier<https://twitter.com/jamiegrier>
ja...@data-artisans.com<mailto:ja...@data-artisans.com>



Programmatically get live values of accumulators

2017-01-02 Thread Gwenhael Pasquiers
Hi, and best wishes for the year to come :)

I'd like to be able to programmatically get the (live) values of accumulators 
in order to send them using a statsd (or another) client in the JobManager of a 
yarn-deployed application. I say live because I'd like to use that in streaming 
(24/7) applications, and send live stats, I cannot way for the application to 
end.

I've seen that there is a json API (I'd prefer no to have my app poll itself).
I've seen some code on github (tests files) where it's done using the 
underlying akka framework, I don't mind doing it the same way and creating an 
actor to get notifications messages, but I don't know the best way, and there 
probably is a better one.

Thanks in advance,

Gwenhaël PASQUIERS


RE: Generate _SUCCESS (map-reduce style) when folder has been written

2016-12-20 Thread Gwenhael Pasquiers
No, don’t worry, I think it’s totally compliant with Hadoop’s behavior but I 
wanted it to behave more like Flink (to totally clean the destination folder 
before outputing new files).

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: mardi 20 décembre 2016 16:41
To: user@flink.apache.org
Subject: Re: Generate _SUCCESS (map-reduce style) when folder has been written

Great to hear!
Do you mean that the behavior of Flink's HadoopOutputFormat is not consistent 
with Hadoop's behavior?
If that's the case, could you open a JIRA ticket to report this and maybe also 
contribute your changes back?
Thanks a lot,
Fabian

2016-12-20 16:37 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Thanks, it is working properly now.
NB : Had to delete the folder by code because Hadoop’s OuputFormats will only 
overwrite file by file, not the whole folder.

From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>]
Sent: mardi 20 décembre 2016 14:21
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Generate _SUCCESS (map-reduce style) when folder has been written

Hi Gwenhael,
The _SUCCESS files were originally generated by Hadoop for successful jobs. 
AFAIK, Spark leverages Hadoop's Input and OutputFormats and seems to have 
followed this approach as well to be compatible.
You could use Flink's HadoopOutputFormat which is a wrapper for Hadoop 
OutputFormats (both mapred and mapreduce APIs).
The wrapper does also produce the _SUCCESS files. In fact, you might be able to 
use exactly the same OutputFormat as your Spark job.
Best,
Fabian

2016-12-20 14:00 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

Sorry if it’s already been asked but is there an embedded way for flink to 
generate a _SUCCESS file in the folders it’s been writing into (using the write 
method with OutputFormat) ?

We are replacing a spark job that was generating those files (and further 
operations rely on it).

Best regards,

Gwenhaël PASQUIERS




Generate _SUCCESS (map-reduce style) when folder has been written

2016-12-20 Thread Gwenhael Pasquiers
Hi,

Sorry if it's already been asked but is there an embedded way for flink to 
generate a _SUCCESS file in the folders it's been writing into (using the write 
method with OutputFormat) ?

We are replacing a spark job that was generating those files (and further 
operations rely on it).

Best regards,

Gwenhaël PASQUIERS


RE: Read a given list of HDFS folder

2016-03-21 Thread Gwenhael Pasquiers
Hi and thanks, i'm not sure that recurive traversal is what I need.

Let's say I have the following dir tree :

/data/2016_03_21_13/.gz
/data/2016_03_21_12/.gz
/data/2016_03_21_11/.gz
/data/2016_03_21_10/.gz
/data/2016_03_21_09/.gz
/data/2016_03_21_08/.gz
/data/2016_03_21_07/.gz


I want my DataSet to include (and nothing else) :

/data/2016_03_21_13/*.gz
/data/2016_03_21_12/*.gz
/data/2016_03_21_11/*.gz

And I do not want to include any of the other folders (and their files).

Can I create a DataSet that would only contain those folders ? 

-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org] 
Sent: lundi 21 mars 2016 13:39
To: user@flink.apache.org
Subject: Re: Read a given list of HDFS folder

Hey Gwenhaël,

see here for recursive traversal of input paths:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#recursive-traversal-of-the-input-path-directory

Regarding the phases: the best way to exchange data between batch jobs is via 
files. You can then execute two programs one after the other, the first one 
produces the files, which the second jobs uses as input.

– Ufuk



On Mon, Mar 21, 2016 at 12:14 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com> wrote:
> Hello,
>
> Sorry if this has been already asked or is already in the docs, I did not 
> find the answer :
>
> Is there a way to read a given set of folders in Flink batch ? Let's say we 
> have one folder per hour of data, written by flume, and we'd like to read 
> only the N last hours (or any other pattern or arbitrary list of folders).
>
> And while I'm at it I have another question :
>
> Let's say that in my batch task I need to sequence two "phases" and that the 
> second phase needs the final result from the first one.
>  - Do I have to create, in the TaskManager, one Execution environment per 
> task and execute them one after the other ?
>  - Can my TaskManagers send back some data (other than counters) to the 
> JobManager or do I have to use a file to store the result from phase one and 
> use it in phase Two ?
>
> Thanks in advance for your answers,
>
> Gwenhaël


Read a given list of HDFS folder

2016-03-21 Thread Gwenhael Pasquiers
Hello,

Sorry if this has been already asked or is already in the docs, I did not find 
the answer :

Is there a way to read a given set of folders in Flink batch ? Let's say we 
have one folder per hour of data, written by flume, and we'd like to read only 
the N last hours (or any other pattern or arbitrary list of folders).

And while I'm at it I have another question :

Let's say that in my batch task I need to sequence two "phases" and that the 
second phase needs the final result from the first one.
 - Do I have to create, in the TaskManager, one Execution environment per task 
and execute them one after the other ?
 - Can my TaskManagers send back some data (other than counters) to the 
JobManager or do I have to use a file to store the result from phase one and 
use it in phase Two ?

Thanks in advance for your answers,

Gwenhaël


RE: Distribution of sinks among the nodes

2016-02-11 Thread Gwenhael Pasquiers
Thanks,
One more thing to expect from the next version !

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: lundi 8 février 2016 13:18
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes

Hi,
I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT 
builds you should be able to use:

env.setParallelism(4);

env
.addSource(kafkaSource)
.rescale()
.map(mapper).setParallelism(16);
.rescale()
.addSink(kafkaSink);

to get your desired behavior. For this to work, the parallelism should be set 
to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. 
The source will only be connected to the 4 mappers while the 4 mappers will be 
the only ones connected to the sink.

Cheers,
Aljoscha

> On 04 Feb 2016, at 18:29, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336
> 
> This will implement the data shipping pattern that you mentioned in your 
> initial mail. I also have the Pull request almost ready.
> 
>> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers 
>> <gwenhael.pasqui...@ericsson.com> wrote:
>> 
>> Okay ;
>> 
>> Then I guess that the best we can do is to disable chaining (we really want 
>> one thread per operator since they are doing long operations) and have the 
>> same parallelism for sinks as mapping : that way each map will have it’s own 
>> sink and there will be no exchanges between flink instances.
>> 
>> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf 
>> Of Stephan Ewen
>> Sent: jeudi 4 février 2016 15:13
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> To your other question, there are two things in Flink:
>> 
>> (1) Chaining. Tasks are folded together into one task, run by one thread.
>> 
>> (2) Resource groups: Tasks stay separate, have separate threads, but share a 
>> slot (which means share memory resources). See the link in my previous mail 
>> for an explanation concerning those.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:
>> Hi Gwen!
>> 
>> You actually need not 24 slots, but only as many as the highest parallelism 
>> is (16). Slots do not hold individual tasks, but "pipelines". 
>> 
>> Here is an illustration how that works.
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/co
>> nfig.html#configuring-taskmanager-processing-slots
>> 
>> You can control whether a task can share the slot with the previous task 
>> with the function "startNewResourceGroup()" in the streaming API. Sharing 
>> lots makes a few things easier to reason about, especially when adding 
>> operators to a program, you need not immediately add new machines.
>> 
>> 
>> How to solve your program case
>> 
>> 
>> We can actually make a pretty simple addition to Flink that will cause the 
>> tasks to be locally connected, which in turn will cause the scheduler to 
>> distribute them like you intend.
>> Rather than let the 4 sources rebalance across all 16 mappers, each one 
>> should redistribute to 4 local mappers, and these 4 mappers should send data 
>> to one local sink each.
>> 
>> We'll try and add that today and ping you once it is in.
>> 
>> The following would be sample code to use this:
>> 
>> env.setParallelism(4);
>> 
>> env
>>.addSource(kafkaSource)
>>.partitionFan()
>>.map(mapper).setParallelism(16);
>>.partitionFan()
>>.addSink(kafkaSink);
>> 
>> 
>> 
>> A bit of background why the mechanism is the way that it is right now
>> -
>> -
>> 
>> You can think of a slot as a slice of resources. In particular, an amount of 
>> memory from the memory manager, but also memory in the network stack.
>> 
>> What we want to do quite soon is to make streaming programs more elastic. 
>> Consider for example the case that you have 16 slots on 4 machines, a 
>> machine fails, and you have no spare resources. In that case Flink should 
>> recognize that no spare resource can be acquired, and scale the job in. 
>> Since you have only 12 slots left, the parallelism of the mappers is reduced 
>> to 12, and the source task that was on the failed machine is moved to a slot 
>> on another machine

RE: Internal buffers supervision and yarn vCPUs

2016-02-04 Thread Gwenhael Pasquiers
Ok thanks !

All that’s left is to wait then.

B.R.

From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: jeudi 4 février 2016 11:19
To: user@flink.apache.org
Subject: Re: Internal buffers supervision and yarn vCPUs

Concerning the first question:

What you are looking for is backpressure monitoring. If a task cannot push its 
data to the next task, it is backpressured.

This pull request adds a first version of backpressure monitoring: 
https://github.com/apache/flink/pull/1578

We will try and get it merged soon!


On Thu, Feb 4, 2016 at 11:03 AM, Robert Metzger 
<rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote:
Hi Gwen,

let me answer the second question: There is a JIRA to reintroduce the 
configuration parameter: https://issues.apache.org/jira/browse/FLINK-2213. I 
will try to get a fix for this into the 1.0 release.

I think I removed back then because users were unable to define the number of 
vcores independently of the number of slots ... and too many users were running 
into issues with the yarn scheduler (containers were not started because there 
were no CPU resources available anymore).



On Thu, Feb 4, 2016 at 10:56 AM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi,

I’ve got two more questions on different topic…

First one :
Is there a way to monitor the buffers status. In order to  find bottleneck in 
our application we though it could be usefull to be able to have a look at the 
different exchange buffers’ status. To know if they are full (or as an example 
if a mapper had to wait before being able to push it’s data into the buffer). 
That way we can know where the bottleneck is.

Second one :
On type of resources on yarn is vCPU. In flink 0.8 there was a “-tmc” argument 
that allowed to specify the number of vCPU per task manager. We cannot find it 
anymore. Was it removed ? Is there another way to set the number of vCPU. Or 
did it became useless ?

Thanks in advance.

Gwen’




Internal buffers supervision and yarn vCPUs

2016-02-04 Thread Gwenhael Pasquiers
Hi,

I’ve got two more questions on different topic…

First one :
Is there a way to monitor the buffers status. In order to  find bottleneck in 
our application we though it could be usefull to be able to have a look at the 
different exchange buffers’ status. To know if they are full (or as an example 
if a mapper had to wait before being able to push it’s data into the buffer). 
That way we can know where the bottleneck is.

Second one :
On type of resources on yarn is vCPU. In flink 0.8 there was a “-tmc” argument 
that allowed to specify the number of vCPU per task manager. We cannot find it 
anymore. Was it removed ? Is there another way to set the number of vCPU. Or 
did it became useless ?

Thanks in advance.

Gwen’


RE: Distribution of sinks among the nodes

2016-02-04 Thread Gwenhael Pasquiers
Sorry I was confused about the number of slots, it’s good now.

However, is disableChaing or disableOperatorChaining working properly ?
I called those methods everywhere I could but it still seems that some of my 
operators are being chained together I can’t go over 16 used slot where I 
should be at 24 if there was no chaining …



From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 4 février 2016 09:55
To: user@flink.apache.org
Subject: RE: Distribution of sinks among the nodes

Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 
sinks) ?

Or is there a way not to set the number of slots per TaskManager instead of 
globally so that they are at least equally dispatched among the nodes ?

As for the sink deployment : that’s not good news ; I mean we will have a 
non-negligible overhead : all the data generated by 3 of the 4 nodes will be 
sent to a third node instead of being sent to the “local” sink. Network I/O 
have a price.

Do you have some sort of “topology” feature coming in the roadmap ? Maybe a 
listener on the JobManager / env that would be trigerred, asking usk on which 
node we would prefer each node to be deployed. That way you keep the standard 
behavior, don’t have to make a complicated generic-optimized algorithm, and let 
the user make it’s choices. Should I create a JIRA ?

For the time being we could start the application 4 time : one time per node, 
put that’s not pretty at all ☺

B.R.

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: mercredi 3 février 2016 17:58
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes


Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your 
mapper will be evenly spread out. The sources should also be evenly spread out. 
However, for the sinks since they depend on all mappers, it will be most likely 
random where they are deployed. So you might end up with 4 sink tasks on one 
machine.

Cheers,
Till
​

On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. 
That's why we need more mappers than sources / sinks


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org<mailto:aljos...@apache.org>]
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 
unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> 
> wrote:
>
> Hi,
>
> We try to deploy an application with the following “architecture” :
>
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we 
> disabled operator chaining).
>
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>
> That way there are no exchanges between different instances of flink and 
> performances would be optimal.
>
> But we get (according to the flink GUI and the Host column when looking at 
> the details of each task) :
>
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>
> (I think no comments are needed J)
>
> The the Web UI says that there are 24 slots and they are all used but they 
> don’t seem evenly dispatched …
>
> How could we make Flink deploy the tasks the way we want ?
>
> B.R.
>
> Gwen’



RE: Distribution of sinks among the nodes

2016-02-04 Thread Gwenhael Pasquiers
Okay ;

Then I guess that the best we can do is to disable chaining (we really want one 
thread per operator since they are doing long operations) and have the same 
parallelism for sinks as mapping : that way each map will have it’s own sink 
and there will be no exchanges between flink instances.

From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: jeudi 4 février 2016 15:13
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes

To your other question, there are two things in Flink:

(1) Chaining. Tasks are folded together into one task, run by one thread.

(2) Resource groups: Tasks stay separate, have separate threads, but share a 
slot (which means share memory resources). See the link in my previous mail for 
an explanation concerning those.

Greetings,
Stephan


On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:
Hi Gwen!

You actually need not 24 slots, but only as many as the highest parallelism is 
(16). Slots do not hold individual tasks, but "pipelines".

Here is an illustration how that works.
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots

You can control whether a task can share the slot with the previous task with 
the function "startNewResourceGroup()" in the streaming API. Sharing lots makes 
a few things easier to reason about, especially when adding operators to a 
program, you need not immediately add new machines.


How to solve your program case


We can actually make a pretty simple addition to Flink that will cause the 
tasks to be locally connected, which in turn will cause the scheduler to 
distribute them like you intend.
Rather than let the 4 sources rebalance across all 16 mappers, each one should 
redistribute to 4 local mappers, and these 4 mappers should send data to one 
local sink each.

We'll try and add that today and ping you once it is in.

The following would be sample code to use this:

env.setParallelism(4);

env
.addSource(kafkaSource)
.partitionFan()
.map(mapper).setParallelism(16);
.partitionFan()
.addSink(kafkaSink);



A bit of background why the mechanism is the way that it is right now
--

You can think of a slot as a slice of resources. In particular, an amount of 
memory from the memory manager, but also memory in the network stack.

What we want to do quite soon is to make streaming programs more elastic. 
Consider for example the case that you have 16 slots on 4 machines, a machine 
fails, and you have no spare resources. In that case Flink should recognize 
that no spare resource can be acquired, and scale the job in. Since you have 
only 12 slots left, the parallelism of the mappers is reduced to 12, and the 
source task that was on the failed machine is moved to a slot on another 
machine.

It is important that the guaranteed resources for each task do not change when 
scaling in, to keep behavior predictable. In this case, each slot will still at 
most host 1 source, 1 mapper, and 1 sink, as did some of the slots before. That 
is also the reason why the slots are per TaskManager, and not global, to 
associate them with a constant set of resources (mainly memory).


Greetings,
Stephan



On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 
sinks) ?

Or is there a way not to set the number of slots per TaskManager instead of 
globally so that they are at least equally dispatched among the nodes ?

As for the sink deployment : that’s not good news ; I mean we will have a 
non-negligible overhead : all the data generated by 3 of the 4 nodes will be 
sent to a third node instead of being sent to the “local” sink. Network I/O 
have a price.

Do you have some sort of “topology” feature coming in the roadmap ? Maybe a 
listener on the JobManager / env that would be trigerred, asking usk on which 
node we would prefer each node to be deployed. That way you keep the standard 
behavior, don’t have to make a complicated generic-optimized algorithm, and let 
the user make it’s choices. Should I create a JIRA ?

For the time being we could start the application 4 time : one time per node, 
put that’s not pretty at all ☺

B.R.

From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: mercredi 3 février 2016 17:58

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes


Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your 
mapper will be evenly spread out. The sources should also be evenly spread out. 

RE: Distribution of sinks among the nodes

2016-02-04 Thread Gwenhael Pasquiers
Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 
sinks) ?

Or is there a way not to set the number of slots per TaskManager instead of 
globally so that they are at least equally dispatched among the nodes ?

As for the sink deployment : that’s not good news ; I mean we will have a 
non-negligible overhead : all the data generated by 3 of the 4 nodes will be 
sent to a third node instead of being sent to the “local” sink. Network I/O 
have a price.

Do you have some sort of “topology” feature coming in the roadmap ? Maybe a 
listener on the JobManager / env that would be trigerred, asking usk on which 
node we would prefer each node to be deployed. That way you keep the standard 
behavior, don’t have to make a complicated generic-optimized algorithm, and let 
the user make it’s choices. Should I create a JIRA ?

For the time being we could start the application 4 time : one time per node, 
put that’s not pretty at all ☺

B.R.

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: mercredi 3 février 2016 17:58
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes


Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your 
mapper will be evenly spread out. The sources should also be evenly spread out. 
However, for the sinks since they depend on all mappers, it will be most likely 
random where they are deployed. So you might end up with 4 sink tasks on one 
machine.

Cheers,
Till
​

On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. 
That's why we need more mappers than sources / sinks


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org<mailto:aljos...@apache.org>]
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 
unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> 
> wrote:
>
> Hi,
>
> We try to deploy an application with the following “architecture” :
>
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we 
> disabled operator chaining).
>
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>
> That way there are no exchanges between different instances of flink and 
> performances would be optimal.
>
> But we get (according to the flink GUI and the Host column when looking at 
> the details of each task) :
>
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>
> (I think no comments are needed J)
>
> The the Web UI says that there are 24 slots and they are all used but they 
> don’t seem evenly dispatched …
>
> How could we make Flink deploy the tasks the way we want ?
>
> B.R.
>
> Gwen’



Distribution of sinks among the nodes

2016-02-03 Thread Gwenhael Pasquiers
Hi,

We try to deploy an application with the following “architecture” :

4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we 
disabled operator chaining).

So we’d like on each node :
1x source => 4x map => 1x sink

That way there are no exchanges between different instances of flink and 
performances would be optimal.

But we get (according to the flink GUI and the Host column when looking at the 
details of each task) :

Node 1 : 1 source =>  2 map
Node 2 : 1 source =>  1 map
Node 3 : 1 source =>  1 map
Node 4 : 1 source =>  12 maps => 4 sinks

(I think no comments are needed ☺)

The the Web UI says that there are 24 slots and they are all used but they 
don’t seem evenly dispatched …

How could we make Flink deploy the tasks the way we want ?

B.R.

Gwen’


RE: Distribution of sinks among the nodes

2016-02-03 Thread Gwenhael Pasquiers
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are 
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. 
That's why we need more mappers than sources / sinks


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 
unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com> wrote:
> 
> Hi,
>  
> We try to deploy an application with the following “architecture” :
>  
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we 
> disabled operator chaining).
>  
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>  
> That way there are no exchanges between different instances of flink and 
> performances would be optimal.
>  
> But we get (according to the flink GUI and the Host column when looking at 
> the details of each task) :
>  
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>  
> (I think no comments are needed J)
>  
> The the Web UI says that there are 24 slots and they are all used but they 
> don’t seem evenly dispatched …
>  
> How could we make Flink deploy the tasks the way we want ?
>  
> B.R.
>  
> Gwen’



RE: about blob.storage.dir and .buffer files

2016-01-29 Thread Gwenhael Pasquiers
Hi !

Here are the answers :
  - How much data is in the blob-store directory, versus in the buffer files? :
   around 20MB  per application  versus 20gb ( one specific windowing app may 
be).

  - How many buffer files do you have and how large are they in average? :
standard apps ( no buffer files or really small ones) , the one app : around 10 
files of 2gb each in (ex 
/tmp_flink/flink-io-*/b3177f8e1910a686ccb48c1da533581d06305a2a370734b493327b117757.1441.buffer)






From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: jeudi 28 janvier 2016 15:35
To: user@flink.apache.org
Subject: Re: about blob.storage.dir and .buffer files

Hi Gwenhael!

Let's look into this and fix anything we find. Can you briefly tell us:

  - How much data is in the blob-store directory, versus in the buffer files?

  - How many buffer files do you have and how large are they in average?

Greetings,
Stephan




On Thu, Jan 28, 2016 at 10:18 AM, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>> wrote:

Hi Gwenhael,

in theory the blob storage files can be any binary data. At the moment, this is 
however only used to distribute the user code jars. The jars are kept around as 
long as the job is running. Every library-cache-manager.cleanup.interval 
interval the files are checked and those which are no longer referenced are 
deleted. In the case of a termination of Flink all files should be purged. If 
this is not the case, then we have to check what the problem is.

Could you check the size of your user jars?

Cheers,
Till
​

On Wed, Jan 27, 2016 at 4:38 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hello,

We got a question about blob.storage.dir and it’s .buffer files :

What are they ? And are they cleaned or is there a way to limit their size and 
to evaluate the necessary space ?
We got a node root volume disk filled by those files (~20GB) and it crashed.

Well, the root was filled because we changed the path to /tmp_flink and it was 
on the root filesystem, our bad. We changed it in an urge because the default 
path (/tmp) was being periodically cleaned by the OS and that made flink crash.

B.R.





about blob.storage.dir and .buffer files

2016-01-27 Thread Gwenhael Pasquiers
Hello,

We got a question about blob.storage.dir and it’s .buffer files :

What are they ? And are they cleaned or is there a way to limit their size and 
to evaluate the necessary space ?
We got a node root volume disk filled by those files (~20GB) and it crashed.

Well, the root was filled because we changed the path to /tmp_flink and it was 
on the root filesystem, our bad. We changed it in an urge because the default 
path (/tmp) was being periodically cleaned by the OS and that made flink crash.

B.R.



RE: Reading Parquet/Hive

2015-12-18 Thread Gwenhael Pasquiers
I'll answer to myself :)

I think i've managed to make it work by creating my "WrappingReadSupport" that 
wraps the DataWritableReadSupport but I also insert my "WrappingMaterializer" 
that converts the ArrayWritable produced by the original Materializer to 
String[]. Then later on, the String[] poses no issues with Tuple and it seems 
to be OK.

Now ... Let's write those String[] in parquet too :)


From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 18 décembre 2015 10:04
To: user@flink.apache.org
Subject: Reading Parquet/Hive

Hi,

I'm trying to read Parquet/Hive data using parquet's ParquetInputFormat and 
hive's DataWritableReadSupport.

I have an error when the TupleSerializer tries to create an instance of 
ArrayWritable, using reflection because ArrayWritable has no no-args 
constructor.

I've been able to make it work when executing in a local cluster by copying the 
ArrayWritable class in my own sources and adding the constructor. I guess that 
the classpath built by maven puts my code first and allows me to override the 
original class. However when running into the real cluster (yarn@cloudera) the 
exception comes back (I guess that the original class is first in the 
classpath).

So you have an idea of how I could make it work ?

I'm think I'm tied to the ArrayWritable type because of the 
DataWritableReadSupport that extends ReadSupport.

Would it be possible (and not too complicated) to make a DataSource that would 
not generate Tuples and allow me to convert the ArrayWritable to a more 
friendly type like String[] ... Or if you have any other idea, they are welcome 
!

B.R.

Gwenhaël PASQUIERS


RE: YARN High Availability

2015-11-23 Thread Gwenhael Pasquiers
OK, I understand.

Maybe we are not really using flink as you intended. The way we are using it, 
one cluster equals one job. That way we are sure to isolate the different jobs 
as much as possible and in case of crashes / bugs / (etc) can completely kill 
one cluster without interfering with the other jobs.

That future behavior seems good :-)

Instead of the manual flink commands, is there to manually delete those old 
jobs before launching my job ? They probably are somewhere in hdfs, aren't they 
?

B.R.


-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org] 
Sent: lundi 23 novembre 2015 12:12
To: user@flink.apache.org
Subject: Re: YARN High Availability

Hey Gwenhaël,

the restarting jobs are most likely old job submissions. They are not cleaned 
up when you shut down the cluster, but only when they finish (either regular 
finish or after cancelling).

The workaround is to use the command line frontend:

bin/flink cancel JOBID

for each RESTARTING job. Sorry about the inconvenience!

We are in an active discussion about addressing this. The future behaviour will 
be that the startup or shutdown of a cluster cleans up everything and an option 
to skip this step.

The reasoning for the initial solution (not removing anything) was to make sure 
that no jobs are deleted by accident. But it looks like this is more confusing 
than helpful.

– Ufuk

> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com> wrote:
> 
> Hi again !
> 
> On the same topic I'm still trying to start my streaming job with HA.
> The HA part seems to be more or less OK (I killed the JobManager and it came 
> back), however I have an issue with the TaskManagers.
> I configured my job to have only one TaskManager and 1 slot that does 
> [source=>map=>sink].
> The issue I'm encountering is that other instances of my job appear and are 
> in the RESTARTING status since there is only one task slot.
> 
> Do you know of this, or have an idea of where to look in order to understand 
> what's happening ?
> 
> B.R.
> 
> Gwenhaël PASQUIERS
> 
> -Original Message-
> From: Maximilian Michels [mailto:m...@apache.org] 
> Sent: jeudi 19 novembre 2015 13:36
> To: user@flink.apache.org
> Subject: Re: YARN High Availability
> 
> The docs have been updated.
> 
> On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi <u...@apache.org> wrote:
>> I’ve added a note about this to the docs and asked Max to trigger a new 
>> build of them.
>> 
>> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for 
>> configuring the root path.
>> 
>> In any case, it is orthogonal to Till’s proposals. That one we need to 
>> address as well (see FLINK-2929). The motivation for the current behaviour 
>> was to be rather defensive when removing state in order to not loose data 
>> accidentally. But it can be confusing, indeed.
>> 
>> – Ufuk
>> 
>>> On 19 Nov 2015, at 12:08, Till Rohrmann <trohrm...@apache.org> wrote:
>>> 
>>> You mean an additional start-up parameter for the `start-cluster.sh` script 
>>> for the HA case? That could work.
>>> 
>>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek <aljos...@apache.org> 
>>> wrote:
>>> Maybe we could add a user parameter to specify a cluster name that is used 
>>> to make the paths unique.
>>> 
>>> 
>>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann <trohrm...@apache.org> wrote:
>>> I agree that this would make the configuration easier. However, it entails 
>>> also that the user has to retrieve the randomized path from the logs if he 
>>> wants to restart jobs after the cluster has crashed or intentionally 
>>> restarted. Furthermore, the system won't be able to clean up old checkpoint 
>>> and job handles in case that the cluster stop was intentional.
>>> 
>>> Thus, the question is how do we define the behaviour in order to retrieve 
>>> handles and to clean up old handles so that ZooKeeper won't be cluttered 
>>> with old handles?
>>> 
>>> There are basically two modes:
>>> 
>>> 1. Keep state handles when shutting down the cluster. Provide a mean to 
>>> define a fixed path when starting the cluster and also a mean to purge old 
>>> state handles. Furthermore, add a shutdown mode where the handles under the 
>>> current path are directly removed. This mode would guarantee to always have 
>>> the state handles available if not explicitly told differently. However, 
>>> the downside is that ZooKeeper will be cluttered most certainly.
>>> 
>>> 2. Remove the state handles when shutting down the 

RE: YARN High Availability

2015-11-23 Thread Gwenhael Pasquiers
We are not yet using HA in our cluster instances.
But yes, we will have to change the zookeeper.path.root ☺

We package our jobs with their own config folder (we don’t rely on flink’s 
config folder); we can put the maven project name into this property then they 
will have different values ☺


From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: lundi 23 novembre 2015 14:51
To: user@flink.apache.org
Subject: Re: YARN High Availability

The problem is the execution graph handle which is stored in ZooKeeper. You can 
manually remove it via the ZooKeeper shell by simply deleting everything below 
your `recovery.zookeeper.path.root` ZNode. But you should be sure that the 
cluster has been stopped before.

Do you start the different clusters with different 
`recovery.zookeeper.path.root` values? If not, then you should run into 
troubles when running multiple clusters at the same time. The reason is that 
then all clusters will think that they belong together.

Cheers,
Till

On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
OK, I understand.

Maybe we are not really using flink as you intended. The way we are using it, 
one cluster equals one job. That way we are sure to isolate the different jobs 
as much as possible and in case of crashes / bugs / (etc) can completely kill 
one cluster without interfering with the other jobs.

That future behavior seems good :-)

Instead of the manual flink commands, is there to manually delete those old 
jobs before launching my job ? They probably are somewhere in hdfs, aren't they 
?

B.R.


-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org<mailto:u...@apache.org>]
Sent: lundi 23 novembre 2015 12:12
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: YARN High Availability

Hey Gwenhaël,

the restarting jobs are most likely old job submissions. They are not cleaned 
up when you shut down the cluster, but only when they finish (either regular 
finish or after cancelling).

The workaround is to use the command line frontend:

bin/flink cancel JOBID

for each RESTARTING job. Sorry about the inconvenience!

We are in an active discussion about addressing this. The future behaviour will 
be that the startup or shutdown of a cluster cleans up everything and an option 
to skip this step.

The reasoning for the initial solution (not removing anything) was to make sure 
that no jobs are deleted by accident. But it looks like this is more confusing 
than helpful.

– Ufuk

> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers 
> <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> 
> wrote:
>
> Hi again !
>
> On the same topic I'm still trying to start my streaming job with HA.
> The HA part seems to be more or less OK (I killed the JobManager and it came 
> back), however I have an issue with the TaskManagers.
> I configured my job to have only one TaskManager and 1 slot that does 
> [source=>map=>sink].
> The issue I'm encountering is that other instances of my job appear and are 
> in the RESTARTING status since there is only one task slot.
>
> Do you know of this, or have an idea of where to look in order to understand 
> what's happening ?
>
> B.R.
>
> Gwenhaël PASQUIERS
>
> -Original Message-
> From: Maximilian Michels [mailto:m...@apache.org<mailto:m...@apache.org>]
> Sent: jeudi 19 novembre 2015 13:36
> To: user@flink.apache.org<mailto:user@flink.apache.org>
> Subject: Re: YARN High Availability
>
> The docs have been updated.
>
> On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi 
> <u...@apache.org<mailto:u...@apache.org>> wrote:
>> I’ve added a note about this to the docs and asked Max to trigger a new 
>> build of them.
>>
>> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for 
>> configuring the root path.
>>
>> In any case, it is orthogonal to Till’s proposals. That one we need to 
>> address as well (see FLINK-2929). The motivation for the current behaviour 
>> was to be rather defensive when removing state in order to not loose data 
>> accidentally. But it can be confusing, indeed.
>>
>> – Ufuk
>>
>>> On 19 Nov 2015, at 12:08, Till Rohrmann 
>>> <trohrm...@apache.org<mailto:trohrm...@apache.org>> wrote:
>>>
>>> You mean an additional start-up parameter for the `start-cluster.sh` script 
>>> for the HA case? That could work.
>>>
>>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek 
>>> <aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
>>> Maybe we could add a user parameter to specify a cluster name that is used 
>>> to make the paths unique.
>>>
&

RE: YARN High Availability

2015-11-18 Thread Gwenhael Pasquiers
Nevermind,

Looking at the logs I saw that it was having issues trying to connect to ZK.
To make I short is had the wrong port.

It is now starting.

Tomorrow I’ll try to kill some JobManagers *evil*.

Another question : if I have multiple HA flink jobs, are there some points to 
check in order to be sure that they won’t collide on hdfs or ZK ?

B.R.

Gwenhaël PASQUIERS

From: Till Rohrmann [mailto:till.rohrm...@gmail.com]
Sent: mercredi 18 novembre 2015 18:01
To: user@flink.apache.org
Subject: Re: YARN High Availability

Hi Gwenhaël,

do you have access to the yarn logs?

Cheers,
Till

On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hello,

We’re trying to set up high availability using an existing zookeeper quorum 
already running in our Cloudera cluster.

So, as per the doc we’ve changed the max attempt in yarn’s config as well as 
the flink.yaml.

recovery.mode: zookeeper
recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
recovery.zookeeper.storageDir: hdfs:///flink/recovery/
yarn.application-attempts: 1000

Everything is ok as long as recovery.mode is commented.
As soon as I uncomment recovery.mode the deployment on yarn is stuck on :

“Deploying cluster, current state ACCEPTED”.
“Deployment took more than 60 seconds….”
Every second.

And I have more than enough resources available on my yarn cluster.

Do you have any idea of what could cause this, and/or what logs I should look 
for in order to understand ?

B.R.

Gwenhaël PASQUIERS



MaxPermSize on yarn

2015-11-16 Thread Gwenhael Pasquiers
Hi,

We're having some OOM permgen exceptions when running on yarn.

We're not yet sure if it is either a consequence or a cause of our crashes, but 
we've been trying to increase that value... And we did not find how to do it.

I've seen that the yarn-daemon.sh sets a 256m value.
It looks to me that it's also possible to customize the YarnClient JVM args, 
but it will only be for the client, not for the TaskManagers.

Do you know of a way to do it ?

B.R.

Gwenhaël PASQUIERS


RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
The first class that it can not find is :
org.apache.log4j.Level

The org.apache.log4j package is not present in the fat jar I get from the mvn 
command, but it is in the one you distributed on your website.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:54
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Great.
Which classes can it not find at runtime?

I'll try to build and run Flink with exactly the command you've provided.

On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.




Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We've set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.


RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Yes, we’re onto the exactly-once ; trying to write RCFiles (Parquet and 
ORCFiles are not compatible because of their footer).

It seems to be working perfectly.

As expected, Flink is falling back to .valid-length metadata on HDFS 2.6 (and 
2.3).

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 17:23
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Great. We are shading curator now into a different location, that's why you 
can't find it anymore.

I suspect you're trying out our new exactly-once filesystem sinks. Please let 
us know how well its working for you and if you're missing something.
Its a pretty new feature :)
Also note that you can use the fs sinks with hadoop versions below 2.7.0, then 
we'll write some metadata containing the valid offsets.

On Wed, Oct 14, 2015 at 5:18 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Yes … You’re right.

Anyway, adding the log4j jar solved the issue and our app is working properly, 
thanks !

About curator, I just observed that it was not there anymore when comparing the 
old and new fatjars. But it’s probably now in another dependency, anyway there 
is no curator-related error so it just probably moved.

Thanks !

Gwen’

From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>]
Sent: mercredi 14 octobre 2015 17:06

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Building Flink with hadoop 2.6

One more thing regarding the truncate method: Its supported as of Hadoop 2.7.0 
(https://issues.apache.org/jira/browse/HDFS-3107)

On Wed, Oct 14, 2015 at 5:00 PM, Robert Metzger 
<rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote:
Ah, I know what's causing this issue.
In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar.

Can you copy everything from the lib/ folder from your maven build into the 
lib/ folder of your flink installation?
Log4j is now in a separate jar in the lib/ folder .

What about the curator dependency issue?

On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
The first class that it can not find is :
org.apache.log4j.Level

The org.apache.log4j package is not present in the fat jar I get from the mvn 
command, but it is in the one you distributed on your website.

From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>]
Sent: mercredi 14 octobre 2015 16:54
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Building Flink with hadoop 2.6

Great.
Which classes can it not find at runtime?

I'll try to build and run Flink with exactly the command you've provided.

On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.







RE: Building Flink with hadoop 2.6

2015-10-14 Thread Gwenhael Pasquiers
Hi Robert !

I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “.


From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mercredi 14 octobre 2015 16:47
To: user@flink.apache.org
Subject: Re: Building Flink with hadoop 2.6

Hi Gwen,

can you tell us the "mvn" command you're using for building Flink?



On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi ;

We need to test some things with flink and hadoop 2.6 (the trunc method).

We’ve set up a build task on our Jenkins and everything seem okay.

However when we replace the original jar from your 0.10-SNAPSHOT distribution 
by ours there are some missing dependencies (log4j, curator, and maybe others) 
and we get some ClassNotFoundException at runtime.

Are we missing some build parameters ?

Thanks in advance,

B.R.



Flink 0.9.1 Kafka 0.8.1

2015-09-10 Thread Gwenhael Pasquiers
Hi everyone,

We're trying to use consume a 0.8.1 Kafka on Flink 0.9.1 and we've run into the 
following issue :

My offset became OutOfRange however now when I start my job, it loops on the 
OutOfRangeException, no matter what the value of auto.offset.reset is... 
(earliest, latest, largest, smallest)

Looks like it doesn't fix the invalid offset and immediately goes into error... 
Then Flink restarts the job, and failes again ... etc ...

Do you have an idea of what is wrong, or could it be an issue in flink ?

B.R.

Gwenhaël PASQUIERS


RE: Application-specific loggers configuration

2015-08-26 Thread Gwenhael Pasquiers
Hi !

Yes, we’re starting our job with  “flink run --jobmanager yarn-cluster”

So it’s perfect, we’ll use your fix and, when it’s out, we’ll switch to flink 
0.9.1.

B.R.

From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: mardi 25 août 2015 19:25
To: user@flink.apache.org
Subject: Re: Application-specific loggers configuration

Hi Gwenhaël,
are you using the one-yarn-cluster-per-job mode of Flink? I.e., you are 
starting your Flink job with (from the doc):

flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar

If you are, then this is almost possible on the current version of Flink. What 
you have to do is copy the conf directory of Flink to a separate directory that 
is specific to your job. There you make your modifications to the log 
configuration etc. Then, when you start your job you do this instead:

export FLINK_CONF_DIR=/path/to/my/conf
flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar

You can easily put this into your startup script, of course.

I said almost possible because this requires a small fix in bin/flink. Around 
line 130 this line:
FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf
needs to be replaced by this line:
if [ -z $FLINK_CONF_DIR ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; 
fi

(We will fix this in the upcoming version and the 0.9.1 bugfix release.)

Does this help? Let us know if you are not using the one-yarn-cluster-per-job 
mode, then we'll have to try to find another solution.

Best,
Aljoscha



On Tue, 25 Aug 2015 at 16:22 Gwenhael Pasquiers 
gwenhael.pasqui...@ericsson.commailto:gwenhael.pasqui...@ericsson.com wrote:
Hi,

We’re developing the first of (we hope) many flink streaming app.

We’d like to package the logging configuration (log4j) together with the jar. 
Meaning, different application will probably have different logging 
configuration (ex: to different logstash ports) …

Is there a way to “override” the many log4j properties files that are in 
flink/conf./*.properties ?

In our environment, the flink binaries would be on the PATH, and our apps would 
be :

-  Jar file

-  App configuration files

-  Log configuration files

-  Startup script

B.R.

Gwenhaël PASQUIERS


Application-specific loggers configuration

2015-08-25 Thread Gwenhael Pasquiers
Hi,

We're developing the first of (we hope) many flink streaming app.

We'd like to package the logging configuration (log4j) together with the jar. 
Meaning, different application will probably have different logging 
configuration (ex: to different logstash ports) ...

Is there a way to override the many log4j properties files that are in 
flink/conf./*.properties ?

In our environment, the flink binaries would be on the PATH, and our apps would 
be :

-  Jar file

-  App configuration files

-  Log configuration files

-  Startup script

B.R.

Gwenhaël PASQUIERS