RE: YARN : Different cutoff for job and task managers
This works; I had some issues modifying my scripts, but it’s OK and I could confirm by JMX that env.java.opts.jobmanager had priority over the “normal” heap size (calculated from cutoff). Thanks ! From: Yang Wang Sent: mercredi 20 novembre 2019 03:52 To: Gwenhael Pasquiers Cc: user@flink.apache.org Subject: Re: YARN : Different cutoff for job and task managers Hi Gwenhael, I'm afraid that we could not set different cut-off to jobmanager and taskmanager. You could set the jvm args manually to work around. For example, 'env.java.opts.jobmanager=-Xms3072m -Xmx3072m'. In most jvm implementation, the rightmost Xmx Xms will take effect. So i think it should work. Please have a try. Best, Yang Gwenhael Pasquiers mailto:gwenhael.pasqui...@ericsson.com>> 于2019年11月19日周二 下午10:56写道: Hello, In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve an important cutoff. Our issue is that the same cutoff applies to both task and job managers : the heap size of the job manager then becomes too low. Is there a way to apply different cutoffs to job and task managers ? Regards,
RE: YARN : Different cutoff for job and task managers
I see, good idea, I’ll try that and tell you the result. Thanks, From: Yang Wang Sent: mercredi 20 novembre 2019 03:52 To: Gwenhael Pasquiers Cc: user@flink.apache.org Subject: Re: YARN : Different cutoff for job and task managers Hi Gwenhael, I'm afraid that we could not set different cut-off to jobmanager and taskmanager. You could set the jvm args manually to work around. For example, 'env.java.opts.jobmanager=-Xms3072m -Xmx3072m'. In most jvm implementation, the rightmost Xmx Xms will take effect. So i think it should work. Please have a try. Best, Yang Gwenhael Pasquiers mailto:gwenhael.pasqui...@ericsson.com>> 于2019年11月19日周二 下午10:56写道: Hello, In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve an important cutoff. Our issue is that the same cutoff applies to both task and job managers : the heap size of the job manager then becomes too low. Is there a way to apply different cutoffs to job and task managers ? Regards,
YARN : Different cutoff for job and task managers
Hello, In a setup where we allocate most of the memory to rocksdb (off-heap) we ha= ve an important cutoff. Our issue is that the same cutoff applies to both task and job managers : the heap size of the job manager then becomes too low. Is there a way to apply different cutoffs to job and task managers ? Regards,
RE: Streaming : a way to "key by partition id" without redispatching data
>From what I understood, in your case you might solve your issue by using >specific key classes instead of Strings. Maybe you could create key classes that have a user-specified hashcode that could take the previous key's hashcode as a value. That way your data shouldn't be sent over the wire and stay in the same partition thus on the same taskmanager..
RE: Streaming : a way to "key by partition id" without redispatching data
I think I finally found a way to "simulate" a Timer thanks to the the processWatermark function of the AbstractStreamOperator. Sorry for the monologue. From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: vendredi 10 novembre 2017 16:02 To: 'user@flink.apache.org' <user@flink.apache.org> Subject: RE: Streaming : a way to "key by partition id" without redispatching data Hello, Finally, even after creating my operator, I still get the error : "Timers can only be used on keyed operators". Isn't there any way around this ? A way to "key" my stream without shuffling the data ? From: Gwenhael Pasquiers Sent: vendredi 10 novembre 2017 11:42 To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>; 'user@flink.apache.org' <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: RE: Streaming : a way to "key by partition id" without redispatching data Maybe you don't need to bother with that question. I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and Triggerable. That should do it :-) From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: jeudi 9 novembre 2017 18:00 To: 'user@flink.apache.org' <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Streaming : a way to "key by partition id" without redispatching data Hello, (Flink 1.2.1) For performances reasons I'm trying to reduce the volume of data of my stream as soon as possible by windowing/folding it for 15 minutes before continuing to the rest of the chain that contains keyBys and windows that will transfer data everywhere. Because of the huge volume of data, I want to avoid "moving" the data between partitions as much as possible (not like a naïve KeyBy does). I wanted to create a custom ProcessFunction (using timer and state to fold data for X minutes) in order to fold my data over itself before keying the stream but even ProcessFunction needs a keyed stream... Is there a specific "key" value that would ensure me that my data won't be moved to another taskmanager (that it's hashcode will match the partition it is already in) ? I thought about the subtask id but I doubt I'd be that lucky :-) Suggestions · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that would not move data between nodes, for windowing operations that can be parallelized. o Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) => second folding => · Finally, aren't all streams keyed ? Even if they're keyed by a totally arbitrary partition id until the user chooses its own key, shouldn't we be able to do a window (not windowAll) or process over any normal Stream's partition ? B.R. Gwenhaël PASQUIERS
RE: Streaming : a way to "key by partition id" without redispatching data
Hello, Finally, even after creating my operator, I still get the error : "Timers can only be used on keyed operators". Isn't there any way around this ? A way to "key" my stream without shuffling the data ? From: Gwenhael Pasquiers Sent: vendredi 10 novembre 2017 11:42 To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>; 'user@flink.apache.org' <user@flink.apache.org> Subject: RE: Streaming : a way to "key by partition id" without redispatching data Maybe you don't need to bother with that question. I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and Triggerable. That should do it :-) From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: jeudi 9 novembre 2017 18:00 To: 'user@flink.apache.org' <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Streaming : a way to "key by partition id" without redispatching data Hello, (Flink 1.2.1) For performances reasons I'm trying to reduce the volume of data of my stream as soon as possible by windowing/folding it for 15 minutes before continuing to the rest of the chain that contains keyBys and windows that will transfer data everywhere. Because of the huge volume of data, I want to avoid "moving" the data between partitions as much as possible (not like a naïve KeyBy does). I wanted to create a custom ProcessFunction (using timer and state to fold data for X minutes) in order to fold my data over itself before keying the stream but even ProcessFunction needs a keyed stream... Is there a specific "key" value that would ensure me that my data won't be moved to another taskmanager (that it's hashcode will match the partition it is already in) ? I thought about the subtask id but I doubt I'd be that lucky :-) Suggestions · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that would not move data between nodes, for windowing operations that can be parallelized. o Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) => second folding => · Finally, aren't all streams keyed ? Even if they're keyed by a totally arbitrary partition id until the user chooses its own key, shouldn't we be able to do a window (not windowAll) or process over any normal Stream's partition ? B.R. Gwenhaël PASQUIERS
RE: Streaming : a way to "key by partition id" without redispatching data
Maybe you don't need to bother with that question. I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and Triggerable. That should do it :-) From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: jeudi 9 novembre 2017 18:00 To: 'user@flink.apache.org' <user@flink.apache.org> Subject: Streaming : a way to "key by partition id" without redispatching data Hello, (Flink 1.2.1) For performances reasons I'm trying to reduce the volume of data of my stream as soon as possible by windowing/folding it for 15 minutes before continuing to the rest of the chain that contains keyBys and windows that will transfer data everywhere. Because of the huge volume of data, I want to avoid "moving" the data between partitions as much as possible (not like a naïve KeyBy does). I wanted to create a custom ProcessFunction (using timer and state to fold data for X minutes) in order to fold my data over itself before keying the stream but even ProcessFunction needs a keyed stream... Is there a specific "key" value that would ensure me that my data won't be moved to another taskmanager (that it's hashcode will match the partition it is already in) ? I thought about the subtask id but I doubt I'd be that lucky :-) Suggestions · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that would not move data between nodes, for windowing operations that can be parallelized. o Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) => second folding => · Finally, aren't all streams keyed ? Even if they're keyed by a totally arbitrary partition id until the user chooses its own key, shouldn't we be able to do a window (not windowAll) or process over any normal Stream's partition ? B.R. Gwenhaël PASQUIERS
Streaming : a way to "key by partition id" without redispatching data
Hello, (Flink 1.2.1) For performances reasons I'm trying to reduce the volume of data of my stream as soon as possible by windowing/folding it for 15 minutes before continuing to the rest of the chain that contains keyBys and windows that will transfer data everywhere. Because of the huge volume of data, I want to avoid "moving" the data between partitions as much as possible (not like a naïve KeyBy does). I wanted to create a custom ProcessFunction (using timer and state to fold data for X minutes) in order to fold my data over itself before keying the stream but even ProcessFunction needs a keyed stream... Is there a specific "key" value that would ensure me that my data won't be moved to another taskmanager (that it's hashcode will match the partition it is already in) ? I thought about the subtask id but I doubt I'd be that lucky :-) Suggestions · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that would not move data between nodes, for windowing operations that can be parallelized. o Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) => second folding => · Finally, aren't all streams keyed ? Even if they're keyed by a totally arbitrary partition id until the user chooses its own key, shouldn't we be able to do a window (not windowAll) or process over any normal Stream's partition ? B.R. Gwenhaël PASQUIERS
RE: Great number of jobs and numberOfBuffers
Hi, Well yes, I could probably make it work with a constant number of operators (and consequently buffers) by developing specific input and output classes, and that way I'd have a workaround for that buffers issue. The size of my job is input-dependent mostly because my code creates one full processing chain per input folder (there is one folder per hour of data) and that processing chain has many functions. The number of input folders is an argument to the software and can vary from 1 to hundreds (when we re-compute something like a month of data). for(String folder:folders){ env = Environment.getExecutionEnvironment(); env.readText(folder).[..].writeText(folder + "_processed"); env.execute(); } There is one full processing chain per folder (a loop is creating them) because the name of the output is the same as the name of the input, and I did not want to create a specific "rolling-output" with bucketers. So, yes, I could develop a source that supports a list of folders and puts the source name into the produced data. My processing could also handle tuples where one field is the source folder and use it as a key where appropriate, and finally yes I could also create a sink that will "dispatch" the datasets to the correct folder according to that field of the tuple. But at first that seemed too complicated (premature optimization) so I coded my job the "naïve" way then splitted it because I thought that there would be some sort of recycling of those buffers between jobs. If the recycling works in normal conditions maybe the question is whether it also works when multiple jobs are ran from within the same jar VS running the jar multiple times ? PS: I don't have log files at hand, the app is ran on a separate and secured platform. Sure, I could try to reproduce the issue with a mock app but I'm not sure it would help the discussion.
yarn and checkpointing
Hi, Is it possible to use checkpointing to restore the state of an app after a restart on yarn ? From what I've seen it looks like that checkpointing only works within a flink cluster life-time. However the yarn mode has one cluster per app, and (unless the app crashes and is automatically restarted by the restart-strategy) the over-yarn-cluster has the same life time as the app, so when we stop the app, we stop the cluster that will clean it's checkpoints. So when the app is stopped, the cluster dies and cleans the checkpoints folder. Then of course it won't be able to restore the state at the next run. When running flink on yarn are we supposed to cancel with savepoint and then restore from savepoint ?
RE: Great number of jobs and numberOfBuffers
Hello, Sorry to ask you again, but no idea on this ? -Original Message- From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: lundi 21 août 2017 12:04 To: Nico Kruber <n...@data-artisans.com> Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org Subject: RE: Great number of jobs and numberOfBuffers Hi, 1/ Yes, the loop is part of the application I run on yarn. Something like : public class MyFlinkApp { public static void main(String[] args){ // parse arguments etc for(String datehour:datehours){ ExecutionEnvironment env = ExecutionEnvironment.getExectionEnvironment(); env.readText(datehour) .union(env.readText(datehour-1)) .union(env.readText(datehour-2)) .map() .groupby() .sortGroup() .reduceGroup() ... // other steps, unions, processing, inputs, outputs JobExecutionResult result = env.execute(); // read accumulators and send some statsd statistics at the end of batch } } } 2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib per node. 3/ I remember that we had the same error (not enough buffers) right at startup. I guess that it was trying to allocate all buffers at startup as it is now doing it progressively (but still fails at the same limit) 4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 outputs (TextHadoopOutputFormat, one in the middle of the processing, the other at the end), it is composed of multiple union, flatmap, map, groupby, sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app on a whole week of data, we will have to start (24 * 7) batches. Parallelism has the default value except for the output writers (32 and 4) in order to limit the numbers of files on HDFS. -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: vendredi 18 août 2017 14:58 To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org Subject: Re: Great number of jobs and numberOfBuffers Hi Gwenhael, the effect you describe sounds a bit strange. Just to clarify your setup: 1) Is the loop you were posting part of the application you run on yarn? 2) How many nodes are you running with? 3) What is the error you got when you tried to run the full program without splitting it? 4) can you give a rough sketch of what your program is composed of (operators, parallelism,...)? Nico On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote: > Hello, > > This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is > different ?). We've been having this issue for a long time and we were > careful not to schedule too many jobs. > I'm currently upgrading the application towards flink 1.2.1 and I'd > like to try to solve this issue. > I'm not submitting individual jobs to a standalone cluster. > > I'm starting a single application that has a loop in its main function : > for(. . .) { > Environment env = Environment.getExectionEnvironment(); > env. . . .; > env.execute(); > } > > > The job fails at some point later during execution with the following > error: java.io.IOException: Insufficient number of network buffers: > required 96, but only 35 available. The total number of network > buffers is currently set to 36864. You can increase this number by > setting the configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf > ferPo > ol(NetworkBufferPool.java:196) > Before splitting the job in multiple sub-jobs it failed right at startup. > > Each "batch" job takes 10 to 30 minutes and it fails after about dozen > of them (the first ones should have had enough time to be recycled). > We've already increased the jobmanager and "numberOfBuffers" values > quite a bit. That way we can handle days of data, but not weeks or > months. This is not very scalable. And as you say, I felt that those > buffers should be recycled and that way we should have no limit as > long as each batch is small enough. > If I start my command again (removing the datehours that were > successfully > processed) it will work since it's a fresh new cluster. > -Original Message- > From: Ufuk Celebi [mailto:u...@apache.org] > Sent: jeudi 17 août 2017 11:24 > To: Ufuk Celebi <u...@apache.org>
RE: Great number of jobs and numberOfBuffers
Hi, 1/ Yes, the loop is part of the application I run on yarn. Something like : public class MyFlinkApp { public static void main(String[] args){ // parse arguments etc for(String datehour:datehours){ ExecutionEnvironment env = ExecutionEnvironment.getExectionEnvironment(); env.readText(datehour) .union(env.readText(datehour-1)) .union(env.readText(datehour-2)) .map() .groupby() .sortGroup() .reduceGroup() ... // other steps, unions, processing, inputs, outputs JobExecutionResult result = env.execute(); // read accumulators and send some statsd statistics at the end of batch } } } 2/ The prod settings are something like 6 nodes with 8 taskslots each, 32Gib per node. 3/ I remember that we had the same error (not enough buffers) right at startup. I guess that it was trying to allocate all buffers at startup as it is now doing it progressively (but still fails at the same limit) 4/ The program has many steps, it has about 5 inputs (readTextFile) and 2 outputs (TextHadoopOutputFormat, one in the middle of the processing, the other at the end), it is composed of multiple union, flatmap, map, groupby, sortGroup, reduceGroup, filter, for each "batch". And if we start the flink app on a whole week of data, we will have to start (24 * 7) batches. Parallelism has the default value except for the output writers (32 and 4) in order to limit the numbers of files on HDFS. -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: vendredi 18 août 2017 14:58 To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> Cc: Ufuk Celebi <u...@apache.org>; user@flink.apache.org Subject: Re: Great number of jobs and numberOfBuffers Hi Gwenhael, the effect you describe sounds a bit strange. Just to clarify your setup: 1) Is the loop you were posting part of the application you run on yarn? 2) How many nodes are you running with? 3) What is the error you got when you tried to run the full program without splitting it? 4) can you give a rough sketch of what your program is composed of (operators, parallelism,...)? Nico On Thursday, 17 August 2017 11:53:25 CEST Gwenhael Pasquiers wrote: > Hello, > > This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is > different ?). We've been having this issue for a long time and we were > careful not to schedule too many jobs. > I'm currently upgrading the application towards flink 1.2.1 and I'd > like to try to solve this issue. > I'm not submitting individual jobs to a standalone cluster. > > I'm starting a single application that has a loop in its main function : > for(. . .) { > Environment env = Environment.getExectionEnvironment(); > env. . . .; > env.execute(); > } > > > The job fails at some point later during execution with the following > error: java.io.IOException: Insufficient number of network buffers: > required 96, but only 35 available. The total number of network > buffers is currently set to 36864. You can increase this number by > setting the configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBuf > ferPo > ol(NetworkBufferPool.java:196) > Before splitting the job in multiple sub-jobs it failed right at startup. > > Each "batch" job takes 10 to 30 minutes and it fails after about dozen > of them (the first ones should have had enough time to be recycled). > We've already increased the jobmanager and "numberOfBuffers" values > quite a bit. That way we can handle days of data, but not weeks or > months. This is not very scalable. And as you say, I felt that those > buffers should be recycled and that way we should have no limit as > long as each batch is small enough. > If I start my command again (removing the datehours that were > successfully > processed) it will work since it's a fresh new cluster. > -Original Message- > From: Ufuk Celebi [mailto:u...@apache.org] > Sent: jeudi 17 août 2017 11:24 > To: Ufuk Celebi <u...@apache.org> > Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>; > user@flink.apache.org; Nico Kruber <n...@data-artisans.com> Subject: Re: > Great number of jobs and numberOfBuffers > > PS: Also pulling in Nico (CC'd) who is working on the network stack. > > On Thu, Aug 17, 2017 at 11:23
RE: Great number of jobs and numberOfBuffers
Hello, This bug was met in flink 1.0.1 over yarn (maybe the yarn behavior is different ?). We've been having this issue for a long time and we were careful not to schedule too many jobs. I'm currently upgrading the application towards flink 1.2.1 and I'd like to try to solve this issue. I'm not submitting individual jobs to a standalone cluster. I'm starting a single application that has a loop in its main function : for(. . .) { Environment env = Environment.getExectionEnvironment(); env. . . .; env.execute(); } The job fails at some point later during execution with the following error: java.io.IOException: Insufficient number of network buffers: required 96, but only 35 available. The total number of network buffers is currently set to 36864. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) Before splitting the job in multiple sub-jobs it failed right at startup. Each "batch" job takes 10 to 30 minutes and it fails after about dozen of them (the first ones should have had enough time to be recycled). We've already increased the jobmanager and "numberOfBuffers" values quite a bit. That way we can handle days of data, but not weeks or months. This is not very scalable. And as you say, I felt that those buffers should be recycled and that way we should have no limit as long as each batch is small enough. If I start my command again (removing the datehours that were successfully processed) it will work since it's a fresh new cluster. -Original Message- From: Ufuk Celebi [mailto:u...@apache.org] Sent: jeudi 17 août 2017 11:24 To: Ufuk Celebi <u...@apache.org> Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com>; user@flink.apache.org; Nico Kruber <n...@data-artisans.com> Subject: Re: Great number of jobs and numberOfBuffers PS: Also pulling in Nico (CC'd) who is working on the network stack. On Thu, Aug 17, 2017 at 11:23 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Gwenhael, > > the network buffers are recycled automatically after a job terminates. > If this does not happen, it would be quite a major bug. > > To help debug this: > > - Which version of Flink are you using? > - Does the job fail immediately after submission or later during execution? > - Is the following correct: the batch job that eventually fails > because of missing network buffers runs without problems if you submit > it to a fresh cluster with the same memory > > The network buffers are recycled after the task managers report the > task being finished. If you immediately submit the next batch there is > a slight chance that the buffers are not recycled yet. As a possible > temporary work around, could you try waiting for a short amount of > time before submitting the next batch? > > I think we should also be able to run the job without splitting it up > after increasing the network memory configuration. Did you already try > this? > > Best, > > Ufuk > > > On Thu, Aug 17, 2017 at 10:38 AM, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com> wrote: >> Hello, >> >> >> >> We’re meeting a limit with the numberOfBuffers. >> >> >> >> In a quite complex job we do a lot of operations, with a lot of >> operators, on a lot of folders (datehours). >> >> >> >> In order to split the job into smaller “batches” (to limit the >> necessary >> “numberOfBuffers”) I’ve done a loop over the batches (handle the >> datehours 3 by 3), for each batch I create a new env then call the execute() >> method. >> >> >> >> However it looks like there is no cleanup : after a while, if the >> number of batches is too big, there is an error saying that the >> numberOfBuffers isn’t high enough. It kinds of looks like some leak. >> Is there a way to clean them up ?
Great number of jobs and numberOfBuffers
Hello, We're meeting a limit with the numberOfBuffers. In a quite complex job we do a lot of operations, with a lot of operators, on a lot of folders (datehours). In order to split the job into smaller "batches" (to limit the necessary "numberOfBuffers") I've done a loop over the batches (handle the datehours 3 by 3), for each batch I create a new env then call the execute() method. However it looks like there is no cleanup : after a while, if the number of batches is too big, there is an error saying that the numberOfBuffers isn't high enough. It kinds of looks like some leak. Is there a way to clean them up ?
RE: Event-time and first watermark
We're using a AssignerWithPunctuatedWatermarks that extracts a timestamp from the data. It keeps and returns the higher timestamp it has ever seen and returns a new Watermark everytime the value grows. I know it's bad for performances, but for the moment it's not the issue, i want the most possibly up-to-date watermark. -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: vendredi 4 août 2017 12:22 To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> Cc: Nico Kruber <n...@data-artisans.com>; user@flink.apache.org Subject: Re: Event-time and first watermark Hi, How are you defining the watermark, i.e. what kind of watermark extractor are you using? Best, Aljoscha > On 3. Aug 2017, at 17:45, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com> wrote: > > We're not using a Window but a more basic ProcessFunction to handle sessions. > We made this choice because we have to handle (millions of) sessions that can > last from 10 seconds to 24 hours so we wanted to handle things manually using > the State class. > > We're using the watermark as an event-time "clock" to: > * compute "lateness" of a message relatively to the watermark (most > recent message from the stream) > * fire timer events > > We're using event-time instead of processing time because our stream will be > late and data arrive by hourly bursts. > > Maybe we're misusing the watermark ? > > B.R. > > -Original Message- > From: Nico Kruber [mailto:n...@data-artisans.com] > Sent: jeudi 3 août 2017 16:30 > To: user@flink.apache.org > Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> > Subject: Re: Event-time and first watermark > > Hi Gwenhael, > "A Watermark(t) declares that event time has reached time t in that > stream, meaning that there should be no more elements from the stream > with a timestamp t’ <= t (i.e. events with timestamps older or equal > to the watermark)." [1] > > Therefore, they should be behind the actual event with timestamp t. > > What is it that you want to achieve in the end? What do you want to use the > watermark for? They are basically a means to defining when an event time > window ends. > > > Nico > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > event_time.html#event-time-and-watermarks > > On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote: >> Hi, >> >> From my tests it seems that the initial watermark value is >> Long.MIN_VALUE even though my first data passed through the timestamp >> extractor before arriving into my ProcessFunction. It looks like the >> watermark "lags" behind the data by one message. >> >> Is there a way to have a watermark more "up to date" ? Or is the only >> way to compute it myself into my ProcessFunction ? >> >> Thanks. >
RE: Event-time and first watermark
We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class. We're using the watermark as an event-time "clock" to: * compute "lateness" of a message relatively to the watermark (most recent message from the stream) * fire timer events We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts. Maybe we're misusing the watermark ? B.R. -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: jeudi 3 août 2017 16:30 To: user@flink.apache.org Cc: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> Subject: Re: Event-time and first watermark Hi Gwenhael, "A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1] Therefore, they should be behind the actual event with timestamp t. What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ event_time.html#event-time-and-watermarks On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote: > Hi, > > From my tests it seems that the initial watermark value is > Long.MIN_VALUE even though my first data passed through the timestamp > extractor before arriving into my ProcessFunction. It looks like the > watermark "lags" behind the data by one message. > > Is there a way to have a watermark more "up to date" ? Or is the only > way to compute it myself into my ProcessFunction ? > > Thanks.
Event-time and first watermark
Hi, >From my tests it seems that the initial watermark value is Long.MIN_VALUE even >though my first data passed through the timestamp extractor before arriving >into my ProcessFunction. It looks like the watermark "lags" behind the data by >one message. Is there a way to have a watermark more "up to date" ? Or is the only way to compute it myself into my ProcessFunction ? Thanks.
RE: A way to purge an empty session
Thanks ! I didn’t know of this function and indeed it seems to match my needs better than Windows. And I’ll be able to clear my state once it’s empty (and re-create it when necessary). B.R. From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: lundi 26 juin 2017 12:13 To: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> Cc: user@flink.apache.org Subject: Re: A way to purge an empty session Hi Gwenhael, have you considered to use a ProcessFunction? With a ProcessFunction you have direct access to state (that you register yourself) and you can register timers that trigger a callback function when they expire. So you can cleanup state when you receive an element or when a timer expires. Best, Fabian 2017-06-23 18:46 GMT+02:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hello, This may be premature optimization for memory usage but here is my question : I have to do an app that will have to monitor sessions of (millions of) users. I don’t know when the session starts nor ends, nor a reasonable maximum duration. I want to have a maximum duration (timeout) of 24H. However I’d like to be able to PURGE sessions that ended as soon as possible to free memory. I use a Trigger to trig my WindowFunction for each input EVENT. It will (when relevant) output a SESSION (with start and end timestamps). I use the Evictor in order to remove the EVENT used to build the SESSION (they have a Boolean field “removable” set to true from the WindowFunction so that the Evictor knows it can remove them in the evictAfter method)… That way at least I can clean the content of the windows. However From what I’m seeing it looks like the window instance will still stay alive (even if empty) until it reaches its maximum duration (24 hours) even if the session it represents lasted 2 minutes: at the end of the day I might have millions of sessions in memory when in reality only thousands are really alive at a given time. That might also really slow down the backups and restores of the application if it needs to store millions of empty windows. I’m aware that my need looks like the session windows. But the session window works mainly by merging windows that overlap within a session gap. My session gap can be multiple hours long, so I’m afraid that it would not help me… So my question is : is there a way to inform the “Trigger” that the windows has no more elements and that it can be PURGED. Or a way for a WindowFunction to “kill” the window it’s being applied on ? Of course my window might be re-created if new events arrive later for the same key. My other option is to simply use a flatmap operator that will hold an HashMap of sessions, that way I might be able to clean it up when I close my sessions, but I think it would be prettier to rely on Flink’s Windows ;-) Thanks in advance,
A way to purge an empty session
Hello, This may be premature optimization for memory usage but here is my question : I have to do an app that will have to monitor sessions of (millions of) users. I don’t know when the session starts nor ends, nor a reasonable maximum duration. I want to have a maximum duration (timeout) of 24H. However I’d like to be able to PURGE sessions that ended as soon as possible to free memory. I use a Trigger to trig my WindowFunction for each input EVENT. It will (when relevant) output a SESSION (with start and end timestamps). I use the Evictor in order to remove the EVENT used to build the SESSION (they have a Boolean field “removable” set to true from the WindowFunction so that the Evictor knows it can remove them in the evictAfter method)… That way at least I can clean the content of the windows. However From what I’m seeing it looks like the window instance will still stay alive (even if empty) until it reaches its maximum duration (24 hours) even if the session it represents lasted 2 minutes: at the end of the day I might have millions of sessions in memory when in reality only thousands are really alive at a given time. That might also really slow down the backups and restores of the application if it needs to store millions of empty windows. I’m aware that my need looks like the session windows. But the session window works mainly by merging windows that overlap within a session gap. My session gap can be multiple hours long, so I’m afraid that it would not help me… So my question is : is there a way to inform the “Trigger” that the windows has no more elements and that it can be PURGED. Or a way for a WindowFunction to “kill” the window it’s being applied on ? Of course my window might be re-created if new events arrive later for the same key. My other option is to simply use a flatmap operator that will hold an HashMap of sessions, that way I might be able to clean it up when I close my sessions, but I think it would be prettier to rely on Flink’s Windows ;-) Thanks in advance,
Kafka 0.10 jaas multiple clients
Hello, Up to now we’ve been using kafka with jaas (plain login/password) the following way: - yarnship the jaas file - add the jaas file name into “flink-conf.yaml” using property “env.java.opts” How to support multiple secured kafka 0.10 consumers and producers (with different logins and password of course) ? From what I saw in the kafka sources, the entry name “KafkaClient” is hardcoded… Best Regards, Gwenhaël PASQUIERS
RE: Kafka offset commits
We need more tests but we think we found the cause for the loss of our kafka consumer offset in kafka 0.10. It might be because of the server-side parameter “offsets.topic.retention.minutes” that defaults to 1440 minutes (1 day). And our flink consumer was “off” for more than a day before restarting. From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: mercredi 19 avril 2017 11:39 To: user@flink.apache.org Subject: Re: Kafka offset commits Thanks for the clarification Aljoscha! Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the “1.0” part on my first reply). @Gwenhael, I’ll try to reclarify some of the questions you asked: Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ? For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / ZK for the migration. State migration from 1.0 to 1.2 is not possible. As Aljoscha pointed out, if you are using the same “group.id”, then there shouldn’t be a problem w.r.t. retaining the offset position. You just have to keep in mind of [1], as you would need to manually increase all committed offsets in Kafka / ZK by 1 for that consumer group. Note that there is no state migration happening here, but just simply relying on offsets committed in Kafka / ZK to define the starting position when you’re starting the job in 1.2. We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper. For FlinkKafkaConsumer08, the starting offset is actually always read from ZK. Again, this isn’t a “restore”, but just defining start position using committed offsets. Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer). There is no expiration to the offsets stored in the snapshots. The only issue would be if Kafka has expired that offset due to data retention settings. If you’re sure that at the time of the restore the data hasn’t expired yet, there might be something weird going on. AFAIK, the only issue that was previously known to possibly cause this was [2]. Could you check if that issue may be the case? [1] https://issues.apache.org/jira/browse/FLINK-4723 [2] https://issues.apache.org/jira/browse/FLINK-6006 On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek (aljos...@apache.org<mailto:aljos...@apache.org>) wrote: Hi, AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. Only restoring from Flink 1.1 savepoints is supported. @Gordon If the consumer group stays the same the new Flink job should pick up where the old one stopped, right? Best, Aljoscha On 18. Apr 2017, at 16:19, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Thanks for your answer. Does that means that flink does not rely on the offset in written to zookeeper anymore, but relies on the snapshots data, implying that it’s crucial to keep the same snapshot folder before and after the migration to Flink 1.2 ? We were also wondering if the flink consumer was able to restore it’s offset from Zookeeper. Another question : is there an expiration to the snapshots ? We’ve been having issues with an app that we forgot to restart. We did it after a couple of days, but it looks like it did not restore correctly the offset and it started consuming from the oldest offset, creating duplicated data (the kafka queue has over a week of buffer). B.R. From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: lundi 17 avril 2017 07:40 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Kafka offset commits Hi, The FlinkKafkaConsumer in 1.2 is able to restore from older version state snapshots and bridge the migration, so there should be no problem in reading the offsets from older state. The smallest or highest offsets will only be used if the offset no longer exists due to Kafka data retention settings. Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 1 (wrt to how Kafka itself defines the committed offsets). However, this should not affect the behavior of restoring from offsets in savepoints, so it should be fine. Cheers, Gordon [1] https://issues.apache.org/jira/browse/FLINK-4723 On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers (gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>) wrote: Hello, We’re going to migrate some applications that consume data from a Kafka 0.8 from Flink 1.
shaded version of legacy kafka connectors
Hi, Before doing it myself I thought it would be better to ask. We need to consume from kafka 0.8 and produce to kafka 0.10 in a flink app. I guess there will be classes and package names conflicts for a lot of dependencies of both connectors. The obvious solution it to make a “shaded” version of the kafka 0.8 connector so that it can coexist with the 0.10 version. Does it already exists ?
RE: Cross operation on two huge datasets
Maybe I won’t try to broadcast my dataset after all : I finally found again what made me implement it with my own cloning flatmap + partitioning : Quoted from https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#broadcast-variables Note: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the withParameters(...) method to pass in a configuration. From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: vendredi 3 mars 2017 18:10 To: user@flink.apache.org Subject: RE: Cross operation on two huge datasets To answer Ankit, It is a batch application. Yes, I admit I did broadcasting by hand. I did it that way because the only other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I was afraid that “withBroadcast” would make flink load the whole dataset in memory before broadcasting it rather than sending its elements 1 by 1. I’ll try to use it, I’ll take anything that will make my code cleaner ! From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: vendredi 3 mars 2017 17:55 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: RE: Cross operation on two huge datasets I tried putting my structure in a dataset but when serializing kryo went in an infinite recursive loop (crashed in StackOverflowException). So I’m staying with the static reference. As for the partitioning, there is always the case of shapes overlapping on both right and left sections, I think it would take quite a bit of effort to implement. And it’s always better if I don’t have to manually set a frontier between the two (n) zones From: Xingcan Cui [mailto:xingc...@gmail.com] Sent: vendredi 3 mars 2017 02:40 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Hi Gwen, in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem). Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right). Best, Xingcan On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: If I understood correctly, you have just implemented flink broadcasting by hand ☺. You are still sending out the whole points dataset to each shape partition – right? I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node. I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node. You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too. Is this a batch job or streaming? Between I am a newbee to Flink, still only learning – so take my suggestions with caution ☺ Thanks Ankit From: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> Date: Thursday, March 2, 2017 at 7:28 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: RE: Cross operation on two huge datasets I made it so that I don’t care where the next operator will be scheduled. I configured taskslots = 1 and parallelism = yarnnodes so that : • Each node contains 1/N th of the shapes (simple repartition() of the shapes dataset). • The points will be cloned so that each partition of the points dataset will contain the whole original dataset o Flatmap creates “#parallelism” clones of each entry o Custom partitioning so that each clone of each entry is sent to a different partition That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure. I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon,
RE: Cross operation on two huge datasets
To answer Ankit, It is a batch application. Yes, I admit I did broadcasting by hand. I did it that way because the only other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I was afraid that “withBroadcast” would make flink load the whole dataset in memory before broadcasting it rather than sending its elements 1 by 1. I’ll try to use it, I’ll take anything that will make my code cleaner ! From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: vendredi 3 mars 2017 17:55 To: user@flink.apache.org Subject: RE: Cross operation on two huge datasets I tried putting my structure in a dataset but when serializing kryo went in an infinite recursive loop (crashed in StackOverflowException). So I’m staying with the static reference. As for the partitioning, there is always the case of shapes overlapping on both right and left sections, I think it would take quite a bit of effort to implement. And it’s always better if I don’t have to manually set a frontier between the two (n) zones From: Xingcan Cui [mailto:xingc...@gmail.com] Sent: vendredi 3 mars 2017 02:40 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Hi Gwen, in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem). Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right). Best, Xingcan On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: If I understood correctly, you have just implemented flink broadcasting by hand ☺. You are still sending out the whole points dataset to each shape partition – right? I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node. I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node. You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too. Is this a batch job or streaming? Between I am a newbee to Flink, still only learning – so take my suggestions with caution ☺ Thanks Ankit From: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> Date: Thursday, March 2, 2017 at 7:28 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: RE: Cross operation on two huge datasets I made it so that I don’t care where the next operator will be scheduled. I configured taskslots = 1 and parallelism = yarnnodes so that : • Each node contains 1/N th of the shapes (simple repartition() of the shapes dataset). • The points will be cloned so that each partition of the points dataset will contain the whole original dataset o Flatmap creates “#parallelism” clones of each entry o Custom partitioning so that each clone of each entry is sent to a different partition That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure. I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn). From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>] Sent: jeudi 2 mars 2017 15:40 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Yes you’re right about the “split” and broadcasting. Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set. Cheers, Till On Thu, Mar 2, 2017 at 3:29 PM, Gwe
RE: Cross operation on two huge datasets
I tried putting my structure in a dataset but when serializing kryo went in an infinite recursive loop (crashed in StackOverflowException). So I’m staying with the static reference. As for the partitioning, there is always the case of shapes overlapping on both right and left sections, I think it would take quite a bit of effort to implement. And it’s always better if I don’t have to manually set a frontier between the two (n) zones From: Xingcan Cui [mailto:xingc...@gmail.com] Sent: vendredi 3 mars 2017 02:40 To: user@flink.apache.org Subject: Re: Cross operation on two huge datasets Hi Gwen, in my view, indexing and searching are two isolated processes and they should be separated. Maybe you should take the RTree structure as a new dataset (fortunately it's static, right?) and store it to a distributed cache or DFS that can be accessed by operators from any nodes. That will make the mapping from index partition to operator consistent (regardless of the locality problem). Besides, you can make a "weak" index first, e.g., partitioning the points and shapes to "left" and "right", and in that way you do not need to broadcast the points to all index nodes (only left to left and right to right). Best, Xingcan On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: If I understood correctly, you have just implemented flink broadcasting by hand ☺. You are still sending out the whole points dataset to each shape partition – right? I think this could be optimized by using a keyBy or custom partition which is common across shapes & points – that should make sure a given point always go to same shape node. I didn’t understand why Till Rohrmann said “you don’t know where Flink will schedule the new operator instance” – new operators are created when flink job is started – right? So, there should be no more new operators once the job is running and if you use consistent hash partitioning, same input should always end at same task manager node. You could store the output as Flink State – that would be more fault tolerant but storing it as cache in JVM should work too. Is this a batch job or streaming? Between I am a newbee to Flink, still only learning – so take my suggestions with caution ☺ Thanks Ankit From: Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> Date: Thursday, March 2, 2017 at 7:28 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: RE: Cross operation on two huge datasets I made it so that I don’t care where the next operator will be scheduled. I configured taskslots = 1 and parallelism = yarnnodes so that : • Each node contains 1/N th of the shapes (simple repartition() of the shapes dataset). • The points will be cloned so that each partition of the points dataset will contain the whole original dataset o Flatmap creates “#parallelism” clones of each entry o Custom partitioning so that each clone of each entry is sent to a different partition That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure. I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn). From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>] Sent: jeudi 2 mars 2017 15:40 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Yes you’re right about the “split” and broadcasting. Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set. Cheers, Till On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:%5bgwenhael.pasqui...@ericsson.com%5D(mailto:gwenhael.pasqui...@ericsson.com)> wrote: The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand. As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ? From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>] Sent: jeudi 2 mars 2017 14:42 To: user@flink.apache.o
RE: Cross operation on two huge datasets
I managed to avoid the classes reload by controlling the order of operations using “.withBroadcast”. My first task (shapes parsing) now outputs an empty “DataSet synchro” Then whenever I need to wait for that synchro dataset to be ready (and mainly the operations prior to that dataset to be done), I use “.withBroadcast(“synchro”, synchro)” and I do a get for that broadcast variable in my open method. That way I’m sure that I won’t begin testing my points against an incomplete static RTree. And also, since it’s a single job again, my static RTree remains valid ☺ Seems to be good for now even if the static thingie is a bit dirty. However I’m surprised that reading 20 MB of parquet become 21GB of “bytes sent” by the flink reader. From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: jeudi 2 mars 2017 16:28 To: user@flink.apache.org Subject: RE: Cross operation on two huge datasets I made it so that I don’t care where the next operator will be scheduled. I configured taskslots = 1 and parallelism = yarnnodes so that : · Each node contains 1/N th of the shapes (simple repartition() of the shapes dataset). · The points will be cloned so that each partition of the points dataset will contain the whole original dataset o Flatmap creates “#parallelism” clones of each entry o Custom partitioning so that each clone of each entry is sent to a different partition That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure. I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn). From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: jeudi 2 mars 2017 15:40 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Yes you’re right about the “split” and broadcasting. Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set. Cheers, Till On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)> wrote: The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand. As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ? From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>] Sent: jeudi 2 mars 2017 14:42 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Hi Gwenhael, if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set). If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like DataSet input = ... DataSet broadcastSet = ... input.flatMap(new RichFlatMapFunction<Integer, Integer>() { List broadcastSet; @Override public void open(Configuration configuration) { broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast"); } @Override public void flatMap(Integer integer, Collector collector) throws Exception { } }).withBroadcastSet(broadcastSet, "broadcast"); Cheers, Till On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: I (almost) made it work the following way: 1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip). 2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes. This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jo
RE: Cross operation on two huge datasets
I made it so that I don’t care where the next operator will be scheduled. I configured taskslots = 1 and parallelism = yarnnodes so that : · Each node contains 1/N th of the shapes (simple repartition() of the shapes dataset). · The points will be cloned so that each partition of the points dataset will contain the whole original dataset o Flatmap creates “#parallelism” clones of each entry o Custom partitioning so that each clone of each entry is sent to a different partition That way, whatever flink choses to do, each point will be compared to each shape. That’s why I think that in my case I can keep it in the JVM without issues. I’d prefer to avoid ser/deser-ing that structure. I tried to use join (all items have same key) but it looks like flink tried to serialize the RTree anyway and it went in StackOverflowError (locally with only 1 parititon, not even on yarn). From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: jeudi 2 mars 2017 15:40 To: user@flink.apache.org Subject: Re: Cross operation on two huge datasets Yes you’re right about the “split” and broadcasting. Storing it in the JVM is not a good approach, since you don’t know where Flink will schedule the new operator instance. It might be the case that an operator responsible for another partition gets scheduled to this JVM and then it has the wrong RTree information. Maybe you can model the set of RTrees as a DataSet[(PartitionKey, RTree)] and then join with the partitioned point data set. Cheers, Till On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers [gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)> wrote: The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand. As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ? From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>] Sent: jeudi 2 mars 2017 14:42 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Cross operation on two huge datasets Hi Gwenhael, if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set). If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like DataSet input = ... DataSet broadcastSet = ... input.flatMap(new RichFlatMapFunction<Integer, Integer>() { List broadcastSet; @Override public void open(Configuration configuration) { broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast"); } @Override public void flatMap(Integer integer, Collector collector) throws Exception { } }).withBroadcastSet(broadcastSet, "broadcast"); Cheers, Till On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: I (almost) made it work the following way: 1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip). 2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes. This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one). I have two questions : - Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)? o I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points. - Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map. Something like : datasetA.flatmap(new MyMapOperator(datasetB))… And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points
RE: Cross operation on two huge datasets
The best for me would be to make it “persist” inside of the JVM heap in some map since I don’t even know if the structure is Serializable (I could try). But I understand. As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I did to “split” the dataset parsing over the nodes ? From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: jeudi 2 mars 2017 14:42 To: user@flink.apache.org Subject: Re: Cross operation on two huge datasets Hi Gwenhael, if you want to persist operator state, then you would have to persist it (e.g. writing to a shared directory or emitting the model and using one of Flink’s sinks) and when creating the new operators you have to reread it from there (usually in the open method or from a Flink source as part of a broadcasted data set). If you want to give a data set to all instances of an operator, then you should broadcast this data set. You can do something like DataSet input = ... DataSet broadcastSet = ... input.flatMap(new RichFlatMapFunction<Integer, Integer>() { List broadcastSet; @Override public void open(Configuration configuration) { broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast"); } @Override public void flatMap(Integer integer, Collector collector) throws Exception { } }).withBroadcastSet(broadcastSet, "broadcast"); Cheers, Till On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: I (almost) made it work the following way: 1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip). 2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes. This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one). I have two questions : - Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)? o I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points. - Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map. Something like : datasetA.flatmap(new MyMapOperator(datasetB))… And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static Or any other way that would allow me to do something similar. Thanks in advance for your insight. Gwen’ From: Jain, Ankit [mailto:ankit.j...@here.com<mailto:ankit.j...@here.com>] Sent: jeudi 23 février 2017 19:21 To: user@flink.apache.org<mailto:user@flink.apache.org> Cc: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Subject: Re: Cross operation on two huge datasets Hi Gwen, I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region. Thanks Ankit From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Date: Wednesday, February 22, 2017 at 2:41 PM To: <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Cross operation on two huge datasets Hi Gwen, Flink usually performs a block nested loop join to cross two data sets. This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records. You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute. For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross. Best, Fabian 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hi, I need (o
RE: Cross operation on two huge datasets
I (almost) made it work the following way: 1rst job : Read all the shapes, repartition() them equally on my N nodes, then on each node fill a static RTree (thanks for the tip). 2nd job : Read all the points, use a flatmap + custom partitioner to “clone” the dataset to all nodes, then apply a simple flatmap that will use the previously initialized static RTree, adding the Shape information to the point. Then do a groupBy to merge the points that were inside of multiple shapes. This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one). I have two questions : - Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)? o I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points. - Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map. Something like : datasetA.flatmap(new MyMapOperator(datasetB))… And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static Or any other way that would allow me to do something similar. Thanks in advance for your insight. Gwen’ From: Jain, Ankit [mailto:ankit.j...@here.com] Sent: jeudi 23 février 2017 19:21 To: user@flink.apache.org Cc: Fabian Hueske <fhue...@gmail.com> Subject: Re: Cross operation on two huge datasets Hi Gwen, I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region. Thanks Ankit From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Date: Wednesday, February 22, 2017 at 2:41 PM To: <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Cross operation on two huge datasets Hi Gwen, Flink usually performs a block nested loop join to cross two data sets. This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records. You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute. For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross. Best, Fabian 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hi, I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas). I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area. I tried it and my job stucks seems to work for some seconds then, at some point, it stucks. I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times). Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me ☺ I’m using flink 1.0.1. Thanks in advance Gwen’
RE: Cross operation on two huge datasets
Hi and thanks for your answers ! I’m not sure I can define any index to split the workload since in my case any point could be in any zone... I think I’m currently trying to do it the way you call “theta-join”: 1- Trying to split one dataset over the cluster and prepare it for work against with the other one (ex: parse the shapes) a. Either using partitioning b. Either using N sources + filtering based on hash so I get complementary datasets 2- Make my other dataset go “through” all the “splits” of the first one and enrich / filter it a. The dataset would probably have to be entirely read multiple times from hdfs (one time per “split”) I have other ideas but I don’t know if it’s doable in flink. Question: Is there a way for a object (key selector, flatmap) to obtain (and wait for) the result of a previous dataset ? Only way I can think of is a “cross” between my one-record-dataset (the result) and the other dataset. But maybe that’s very bad regarding resources ? I’d like to try using a flatmap that clones the dataset in N parts (adding a partition key 0 to N-1 to each record), then use partitioning to “dispatch” each clone of the dataset to a matching “shape matcher” partition; then I’d use cross to do the work, then group back the results together (in case N clones of a point were inside different shapes). Maybe that would split the workload of the cross by dividing the size of one of the two datasets member of that cross … sorry for my rambling if I’m not clear. B.R. From: Xingcan Cui [mailto:xingc...@gmail.com] Sent: jeudi 23 février 2017 06:00 To: user@flink.apache.org Subject: Re: Cross operation on two huge datasets Hi all, @Gwen From the database's point of view, the only way to avoid Cartesian product in join is to use index, which exhibits as key grouping in Flink. However, it only supports many-to-one mapping now, i.e., a shape or a point can only be distributed to a single group. Only points and shapes belonging to the same group can be joined and that could reduce the inherent pair comparisons (compared with a Cartesian product). It's perfectly suitable for equi-join. @Fabian I saw this thread when I was just considering about theta-join (which will eventually be supported) in Flink. Since it's impossible to group (index) a dataset for an arbitrary theta-join, I think we may need some duplication mechanism here. For example, split a dataset into n parts and send the other dataset to all of these parts. This could be more useful in stream join. BTW, it seems that I've seen another thread discussing about this, but can not find it now. What do you think? Best, Xingcan On Thu, Feb 23, 2017 at 6:41 AM, Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote: Hi Gwen, Flink usually performs a block nested loop join to cross two data sets. This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records. You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute. For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross. Best, Fabian 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hi, I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas). I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area. I tried it and my job stucks seems to work for some seconds then, at some point, it stucks. I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times). Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me ☺ I’m using flink 1.0.1. Thanks in advance Gwen’
RE: Making batches of small messages
Thanks, We are waiting for the 1.2 release eagerly ☺ From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: mercredi 11 janvier 2017 18:32 To: user@flink.apache.org Subject: Re: Making batches of small messages Hi, I think this is a case for the ProcessFunction that was recently added and will be included in Flink 1.2. ProcessFunction allows to register timers (so the 5 secs timeout can be addressed). You can maintain the fault tolerance guarantees if you collect the records in managed state. That way they will be included in checkpoints and restored in case of a failure. If you are on Flink 1.1.x, you will need to implement a custom operator which is a much more low-level interface. Best, Fabian 2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hi, Sorry if this was already asked. For performances reasons (streaming as well as batch) I’d like to “group” messages (let’s say by batches of 1000) before sending them to my sink (kafka, but mainly ES) so that I have a smaller overhead. I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of such an operation is 1. Moreover I’d need some “timeout” (send the current batch to next operator after 5s if it did not reach 1000 messages before that). I could also create a flatMap “String to List” that cumulates messages until it reaches 1000 and then sends them to output, however that does not solve the timeout issue (not sure I could call out.collect() from a Timer thread), and even more importantly I’m afraid that that would screw up the exactly-once policy (flink could not know that I was stacking messages, I could very well be filtering them) in case of a crash. My Sink could also create the chunks, with it’s own timer / counter, but I’m also afraid that it would bread the exactly-once thingie since in case of crash there is no way that flink would know if the message was really sent or stacked … Is there a proper way to do what I want ? Thanks in advance, Gwenhaël PASQUIERS
Making batches of small messages
Hi, Sorry if this was already asked. For performances reasons (streaming as well as batch) I'd like to "group" messages (let's say by batches of 1000) before sending them to my sink (kafka, but mainly ES) so that I have a smaller overhead. I've seen the "countWindow" operation but if I'm not wrong the parallelism of such an operation is 1. Moreover I'd need some "timeout" (send the current batch to next operator after 5s if it did not reach 1000 messages before that). I could also create a flatMap "String to List" that cumulates messages until it reaches 1000 and then sends them to output, however that does not solve the timeout issue (not sure I could call out.collect() from a Timer thread), and even more importantly I'm afraid that that would screw up the exactly-once policy (flink could not know that I was stacking messages, I could very well be filtering them) in case of a crash. My Sink could also create the chunks, with it's own timer / counter, but I'm also afraid that it would bread the exactly-once thingie since in case of crash there is no way that flink would know if the message was really sent or stacked ... Is there a proper way to do what I want ? Thanks in advance, Gwenhaël PASQUIERS
RE: Programmatically get live values of accumulators
Indeed, that looks like what we need. We currently rely on flink 1.0.1, that feature might be a good reason to update our flink version. I’ll test it. Many thanks ! ! From: Jamie Grier [mailto:ja...@data-artisans.com] Sent: lundi 2 janvier 2017 20:56 To: user@flink.apache.org Subject: Re: Programmatically get live values of accumulators Hi Gwenhael, I think what you actually want is to use the Apache Flink metrics interface. See the following: https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html Sending metrics to StatsD is supported out-of-the-box. -Jamie On Mon, Jan 2, 2017 at 1:34 AM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi, and best wishes for the year to come ☺ I’d like to be able to programmatically get the (live) values of accumulators in order to send them using a statsd (or another) client in the JobManager of a yarn-deployed application. I say live because I’d like to use that in streaming (24/7) applications, and send live stats, I cannot way for the application to end. I’ve seen that there is a json API (I’d prefer no to have my app poll itself). I’ve seen some code on github (tests files) where it’s done using the underlying akka framework, I don’t mind doing it the same way and creating an actor to get notifications messages, but I don’t know the best way, and there probably is a better one. Thanks in advance, Gwenhaël PASQUIERS -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier<https://twitter.com/jamiegrier> ja...@data-artisans.com<mailto:ja...@data-artisans.com>
Programmatically get live values of accumulators
Hi, and best wishes for the year to come :) I'd like to be able to programmatically get the (live) values of accumulators in order to send them using a statsd (or another) client in the JobManager of a yarn-deployed application. I say live because I'd like to use that in streaming (24/7) applications, and send live stats, I cannot way for the application to end. I've seen that there is a json API (I'd prefer no to have my app poll itself). I've seen some code on github (tests files) where it's done using the underlying akka framework, I don't mind doing it the same way and creating an actor to get notifications messages, but I don't know the best way, and there probably is a better one. Thanks in advance, Gwenhaël PASQUIERS
RE: Generate _SUCCESS (map-reduce style) when folder has been written
No, don’t worry, I think it’s totally compliant with Hadoop’s behavior but I wanted it to behave more like Flink (to totally clean the destination folder before outputing new files). From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: mardi 20 décembre 2016 16:41 To: user@flink.apache.org Subject: Re: Generate _SUCCESS (map-reduce style) when folder has been written Great to hear! Do you mean that the behavior of Flink's HadoopOutputFormat is not consistent with Hadoop's behavior? If that's the case, could you open a JIRA ticket to report this and maybe also contribute your changes back? Thanks a lot, Fabian 2016-12-20 16:37 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Thanks, it is working properly now. NB : Had to delete the folder by code because Hadoop’s OuputFormats will only overwrite file by file, not the whole folder. From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>] Sent: mardi 20 décembre 2016 14:21 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Generate _SUCCESS (map-reduce style) when folder has been written Hi Gwenhael, The _SUCCESS files were originally generated by Hadoop for successful jobs. AFAIK, Spark leverages Hadoop's Input and OutputFormats and seems to have followed this approach as well to be compatible. You could use Flink's HadoopOutputFormat which is a wrapper for Hadoop OutputFormats (both mapred and mapreduce APIs). The wrapper does also produce the _SUCCESS files. In fact, you might be able to use exactly the same OutputFormat as your Spark job. Best, Fabian 2016-12-20 14:00 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hi, Sorry if it’s already been asked but is there an embedded way for flink to generate a _SUCCESS file in the folders it’s been writing into (using the write method with OutputFormat) ? We are replacing a spark job that was generating those files (and further operations rely on it). Best regards, Gwenhaël PASQUIERS
Generate _SUCCESS (map-reduce style) when folder has been written
Hi, Sorry if it's already been asked but is there an embedded way for flink to generate a _SUCCESS file in the folders it's been writing into (using the write method with OutputFormat) ? We are replacing a spark job that was generating those files (and further operations rely on it). Best regards, Gwenhaël PASQUIERS
RE: Read a given list of HDFS folder
Hi and thanks, i'm not sure that recurive traversal is what I need. Let's say I have the following dir tree : /data/2016_03_21_13/.gz /data/2016_03_21_12/.gz /data/2016_03_21_11/.gz /data/2016_03_21_10/.gz /data/2016_03_21_09/.gz /data/2016_03_21_08/.gz /data/2016_03_21_07/.gz I want my DataSet to include (and nothing else) : /data/2016_03_21_13/*.gz /data/2016_03_21_12/*.gz /data/2016_03_21_11/*.gz And I do not want to include any of the other folders (and their files). Can I create a DataSet that would only contain those folders ? -Original Message- From: Ufuk Celebi [mailto:u...@apache.org] Sent: lundi 21 mars 2016 13:39 To: user@flink.apache.org Subject: Re: Read a given list of HDFS folder Hey Gwenhaël, see here for recursive traversal of input paths: https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#recursive-traversal-of-the-input-path-directory Regarding the phases: the best way to exchange data between batch jobs is via files. You can then execute two programs one after the other, the first one produces the files, which the second jobs uses as input. – Ufuk On Mon, Mar 21, 2016 at 12:14 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com> wrote: > Hello, > > Sorry if this has been already asked or is already in the docs, I did not > find the answer : > > Is there a way to read a given set of folders in Flink batch ? Let's say we > have one folder per hour of data, written by flume, and we'd like to read > only the N last hours (or any other pattern or arbitrary list of folders). > > And while I'm at it I have another question : > > Let's say that in my batch task I need to sequence two "phases" and that the > second phase needs the final result from the first one. > - Do I have to create, in the TaskManager, one Execution environment per > task and execute them one after the other ? > - Can my TaskManagers send back some data (other than counters) to the > JobManager or do I have to use a file to store the result from phase one and > use it in phase Two ? > > Thanks in advance for your answers, > > Gwenhaël
Read a given list of HDFS folder
Hello, Sorry if this has been already asked or is already in the docs, I did not find the answer : Is there a way to read a given set of folders in Flink batch ? Let's say we have one folder per hour of data, written by flume, and we'd like to read only the N last hours (or any other pattern or arbitrary list of folders). And while I'm at it I have another question : Let's say that in my batch task I need to sequence two "phases" and that the second phase needs the final result from the first one. - Do I have to create, in the TaskManager, one Execution environment per task and execute them one after the other ? - Can my TaskManagers send back some data (other than counters) to the JobManager or do I have to use a file to store the result from phase one and use it in phase Two ? Thanks in advance for your answers, Gwenhaël
RE: Distribution of sinks among the nodes
Thanks, One more thing to expect from the next version ! -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: lundi 8 février 2016 13:18 To: user@flink.apache.org Subject: Re: Distribution of sinks among the nodes Hi, I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT builds you should be able to use: env.setParallelism(4); env .addSource(kafkaSource) .rescale() .map(mapper).setParallelism(16); .rescale() .addSink(kafkaSink); to get your desired behavior. For this to work, the parallelism should be set to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. The source will only be connected to the 4 mappers while the 4 mappers will be the only ones connected to the sink. Cheers, Aljoscha > On 04 Feb 2016, at 18:29, Aljoscha Krettek <aljos...@apache.org> wrote: > > I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336 > > This will implement the data shipping pattern that you mentioned in your > initial mail. I also have the Pull request almost ready. > >> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers >> <gwenhael.pasqui...@ericsson.com> wrote: >> >> Okay ; >> >> Then I guess that the best we can do is to disable chaining (we really want >> one thread per operator since they are doing long operations) and have the >> same parallelism for sinks as mapping : that way each map will have it’s own >> sink and there will be no exchanges between flink instances. >> >> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf >> Of Stephan Ewen >> Sent: jeudi 4 février 2016 15:13 >> To: user@flink.apache.org >> Subject: Re: Distribution of sinks among the nodes >> >> To your other question, there are two things in Flink: >> >> (1) Chaining. Tasks are folded together into one task, run by one thread. >> >> (2) Resource groups: Tasks stay separate, have separate threads, but share a >> slot (which means share memory resources). See the link in my previous mail >> for an explanation concerning those. >> >> Greetings, >> Stephan >> >> >> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote: >> Hi Gwen! >> >> You actually need not 24 slots, but only as many as the highest parallelism >> is (16). Slots do not hold individual tasks, but "pipelines". >> >> Here is an illustration how that works. >> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/co >> nfig.html#configuring-taskmanager-processing-slots >> >> You can control whether a task can share the slot with the previous task >> with the function "startNewResourceGroup()" in the streaming API. Sharing >> lots makes a few things easier to reason about, especially when adding >> operators to a program, you need not immediately add new machines. >> >> >> How to solve your program case >> >> >> We can actually make a pretty simple addition to Flink that will cause the >> tasks to be locally connected, which in turn will cause the scheduler to >> distribute them like you intend. >> Rather than let the 4 sources rebalance across all 16 mappers, each one >> should redistribute to 4 local mappers, and these 4 mappers should send data >> to one local sink each. >> >> We'll try and add that today and ping you once it is in. >> >> The following would be sample code to use this: >> >> env.setParallelism(4); >> >> env >>.addSource(kafkaSource) >>.partitionFan() >>.map(mapper).setParallelism(16); >>.partitionFan() >>.addSink(kafkaSink); >> >> >> >> A bit of background why the mechanism is the way that it is right now >> - >> - >> >> You can think of a slot as a slice of resources. In particular, an amount of >> memory from the memory manager, but also memory in the network stack. >> >> What we want to do quite soon is to make streaming programs more elastic. >> Consider for example the case that you have 16 slots on 4 machines, a >> machine fails, and you have no spare resources. In that case Flink should >> recognize that no spare resource can be acquired, and scale the job in. >> Since you have only 12 slots left, the parallelism of the mappers is reduced >> to 12, and the source task that was on the failed machine is moved to a slot >> on another machine
RE: Internal buffers supervision and yarn vCPUs
Ok thanks ! All that’s left is to wait then. B.R. From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: jeudi 4 février 2016 11:19 To: user@flink.apache.org Subject: Re: Internal buffers supervision and yarn vCPUs Concerning the first question: What you are looking for is backpressure monitoring. If a task cannot push its data to the next task, it is backpressured. This pull request adds a first version of backpressure monitoring: https://github.com/apache/flink/pull/1578 We will try and get it merged soon! On Thu, Feb 4, 2016 at 11:03 AM, Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote: Hi Gwen, let me answer the second question: There is a JIRA to reintroduce the configuration parameter: https://issues.apache.org/jira/browse/FLINK-2213. I will try to get a fix for this into the 1.0 release. I think I removed back then because users were unable to define the number of vcores independently of the number of slots ... and too many users were running into issues with the yarn scheduler (containers were not started because there were no CPU resources available anymore). On Thu, Feb 4, 2016 at 10:56 AM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi, I’ve got two more questions on different topic… First one : Is there a way to monitor the buffers status. In order to find bottleneck in our application we though it could be usefull to be able to have a look at the different exchange buffers’ status. To know if they are full (or as an example if a mapper had to wait before being able to push it’s data into the buffer). That way we can know where the bottleneck is. Second one : On type of resources on yarn is vCPU. In flink 0.8 there was a “-tmc” argument that allowed to specify the number of vCPU per task manager. We cannot find it anymore. Was it removed ? Is there another way to set the number of vCPU. Or did it became useless ? Thanks in advance. Gwen’
Internal buffers supervision and yarn vCPUs
Hi, I’ve got two more questions on different topic… First one : Is there a way to monitor the buffers status. In order to find bottleneck in our application we though it could be usefull to be able to have a look at the different exchange buffers’ status. To know if they are full (or as an example if a mapper had to wait before being able to push it’s data into the buffer). That way we can know where the bottleneck is. Second one : On type of resources on yarn is vCPU. In flink 0.8 there was a “-tmc” argument that allowed to specify the number of vCPU per task manager. We cannot find it anymore. Was it removed ? Is there another way to set the number of vCPU. Or did it became useless ? Thanks in advance. Gwen’
RE: Distribution of sinks among the nodes
Sorry I was confused about the number of slots, it’s good now. However, is disableChaing or disableOperatorChaining working properly ? I called those methods everywhere I could but it still seems that some of my operators are being chained together I can’t go over 16 used slot where I should be at 24 if there was no chaining … From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: jeudi 4 février 2016 09:55 To: user@flink.apache.org Subject: RE: Distribution of sinks among the nodes Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ? Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ? As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price. Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ? For the time being we could start the application 4 time : one time per node, put that’s not pretty at all ☺ B.R. From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: mercredi 3 février 2016 17:58 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Distribution of sinks among the nodes Hi Gwenhäel, if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine. Cheers, Till On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: It is one type of mapper with a parallelism of 16 It's the same for the sinks and sources (parallelism of 4) The settings are Env.setParallelism(4) Mapper.setPrallelism(env.getParallelism() * 4) We mean to have X mapper tasks per source / sink The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org<mailto:aljos...@apache.org>] Sent: mercredi 3 février 2016 16:26 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Distribution of sinks among the nodes Hi Gwenhäel, when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators? Regards, Aljoscha > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> > wrote: > > Hi, > > We try to deploy an application with the following “architecture” : > > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we > disabled operator chaining). > > So we’d like on each node : > 1x source => 4x map => 1x sink > > That way there are no exchanges between different instances of flink and > performances would be optimal. > > But we get (according to the flink GUI and the Host column when looking at > the details of each task) : > > Node 1 : 1 source => 2 map > Node 2 : 1 source => 1 map > Node 3 : 1 source => 1 map > Node 4 : 1 source => 12 maps => 4 sinks > > (I think no comments are needed J) > > The the Web UI says that there are 24 slots and they are all used but they > don’t seem evenly dispatched … > > How could we make Flink deploy the tasks the way we want ? > > B.R. > > Gwen’
RE: Distribution of sinks among the nodes
Okay ; Then I guess that the best we can do is to disable chaining (we really want one thread per operator since they are doing long operations) and have the same parallelism for sinks as mapping : that way each map will have it’s own sink and there will be no exchanges between flink instances. From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: jeudi 4 février 2016 15:13 To: user@flink.apache.org Subject: Re: Distribution of sinks among the nodes To your other question, there are two things in Flink: (1) Chaining. Tasks are folded together into one task, run by one thread. (2) Resource groups: Tasks stay separate, have separate threads, but share a slot (which means share memory resources). See the link in my previous mail for an explanation concerning those. Greetings, Stephan On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org<mailto:se...@apache.org>> wrote: Hi Gwen! You actually need not 24 slots, but only as many as the highest parallelism is (16). Slots do not hold individual tasks, but "pipelines". Here is an illustration how that works. https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots You can control whether a task can share the slot with the previous task with the function "startNewResourceGroup()" in the streaming API. Sharing lots makes a few things easier to reason about, especially when adding operators to a program, you need not immediately add new machines. How to solve your program case We can actually make a pretty simple addition to Flink that will cause the tasks to be locally connected, which in turn will cause the scheduler to distribute them like you intend. Rather than let the 4 sources rebalance across all 16 mappers, each one should redistribute to 4 local mappers, and these 4 mappers should send data to one local sink each. We'll try and add that today and ping you once it is in. The following would be sample code to use this: env.setParallelism(4); env .addSource(kafkaSource) .partitionFan() .map(mapper).setParallelism(16); .partitionFan() .addSink(kafkaSink); A bit of background why the mechanism is the way that it is right now -- You can think of a slot as a slice of resources. In particular, an amount of memory from the memory manager, but also memory in the network stack. What we want to do quite soon is to make streaming programs more elastic. Consider for example the case that you have 16 slots on 4 machines, a machine fails, and you have no spare resources. In that case Flink should recognize that no spare resource can be acquired, and scale the job in. Since you have only 12 slots left, the parallelism of the mappers is reduced to 12, and the source task that was on the failed machine is moved to a slot on another machine. It is important that the guaranteed resources for each task do not change when scaling in, to keep behavior predictable. In this case, each slot will still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots before. That is also the reason why the slots are per TaskManager, and not global, to associate them with a constant set of resources (mainly memory). Greetings, Stephan On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ? Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ? As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price. Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ? For the time being we could start the application 4 time : one time per node, put that’s not pretty at all ☺ B.R. From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>] Sent: mercredi 3 février 2016 17:58 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Distribution of sinks among the nodes Hi Gwenhäel, if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out.
RE: Distribution of sinks among the nodes
Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ? Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ? As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price. Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ? For the time being we could start the application 4 time : one time per node, put that’s not pretty at all ☺ B.R. From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: mercredi 3 février 2016 17:58 To: user@flink.apache.org Subject: Re: Distribution of sinks among the nodes Hi Gwenhäel, if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine. Cheers, Till On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: It is one type of mapper with a parallelism of 16 It's the same for the sinks and sources (parallelism of 4) The settings are Env.setParallelism(4) Mapper.setPrallelism(env.getParallelism() * 4) We mean to have X mapper tasks per source / sink The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org<mailto:aljos...@apache.org>] Sent: mercredi 3 février 2016 16:26 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Distribution of sinks among the nodes Hi Gwenhäel, when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators? Regards, Aljoscha > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> > wrote: > > Hi, > > We try to deploy an application with the following “architecture” : > > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we > disabled operator chaining). > > So we’d like on each node : > 1x source => 4x map => 1x sink > > That way there are no exchanges between different instances of flink and > performances would be optimal. > > But we get (according to the flink GUI and the Host column when looking at > the details of each task) : > > Node 1 : 1 source => 2 map > Node 2 : 1 source => 1 map > Node 3 : 1 source => 1 map > Node 4 : 1 source => 12 maps => 4 sinks > > (I think no comments are needed J) > > The the Web UI says that there are 24 slots and they are all used but they > don’t seem evenly dispatched … > > How could we make Flink deploy the tasks the way we want ? > > B.R. > > Gwen’
Distribution of sinks among the nodes
Hi, We try to deploy an application with the following “architecture” : 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining). So we’d like on each node : 1x source => 4x map => 1x sink That way there are no exchanges between different instances of flink and performances would be optimal. But we get (according to the flink GUI and the Host column when looking at the details of each task) : Node 1 : 1 source => 2 map Node 2 : 1 source => 1 map Node 3 : 1 source => 1 map Node 4 : 1 source => 12 maps => 4 sinks (I think no comments are needed ☺) The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched … How could we make Flink deploy the tasks the way we want ? B.R. Gwen’
RE: Distribution of sinks among the nodes
It is one type of mapper with a parallelism of 16 It's the same for the sinks and sources (parallelism of 4) The settings are Env.setParallelism(4) Mapper.setPrallelism(env.getParallelism() * 4) We mean to have X mapper tasks per source / sink The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: mercredi 3 février 2016 16:26 To: user@flink.apache.org Subject: Re: Distribution of sinks among the nodes Hi Gwenhäel, when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators? Regards, Aljoscha > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com> wrote: > > Hi, > > We try to deploy an application with the following “architecture” : > > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we > disabled operator chaining). > > So we’d like on each node : > 1x source => 4x map => 1x sink > > That way there are no exchanges between different instances of flink and > performances would be optimal. > > But we get (according to the flink GUI and the Host column when looking at > the details of each task) : > > Node 1 : 1 source => 2 map > Node 2 : 1 source => 1 map > Node 3 : 1 source => 1 map > Node 4 : 1 source => 12 maps => 4 sinks > > (I think no comments are needed J) > > The the Web UI says that there are 24 slots and they are all used but they > don’t seem evenly dispatched … > > How could we make Flink deploy the tasks the way we want ? > > B.R. > > Gwen’
RE: about blob.storage.dir and .buffer files
Hi ! Here are the answers : - How much data is in the blob-store directory, versus in the buffer files? : around 20MB per application versus 20gb ( one specific windowing app may be). - How many buffer files do you have and how large are they in average? : standard apps ( no buffer files or really small ones) , the one app : around 10 files of 2gb each in (ex /tmp_flink/flink-io-*/b3177f8e1910a686ccb48c1da533581d06305a2a370734b493327b117757.1441.buffer) From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: jeudi 28 janvier 2016 15:35 To: user@flink.apache.org Subject: Re: about blob.storage.dir and .buffer files Hi Gwenhael! Let's look into this and fix anything we find. Can you briefly tell us: - How much data is in the blob-store directory, versus in the buffer files? - How many buffer files do you have and how large are they in average? Greetings, Stephan On Thu, Jan 28, 2016 at 10:18 AM, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> wrote: Hi Gwenhael, in theory the blob storage files can be any binary data. At the moment, this is however only used to distribute the user code jars. The jars are kept around as long as the job is running. Every library-cache-manager.cleanup.interval interval the files are checked and those which are no longer referenced are deleted. In the case of a termination of Flink all files should be purged. If this is not the case, then we have to check what the problem is. Could you check the size of your user jars? Cheers, Till On Wed, Jan 27, 2016 at 4:38 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hello, We got a question about blob.storage.dir and it’s .buffer files : What are they ? And are they cleaned or is there a way to limit their size and to evaluate the necessary space ? We got a node root volume disk filled by those files (~20GB) and it crashed. Well, the root was filled because we changed the path to /tmp_flink and it was on the root filesystem, our bad. We changed it in an urge because the default path (/tmp) was being periodically cleaned by the OS and that made flink crash. B.R.
about blob.storage.dir and .buffer files
Hello, We got a question about blob.storage.dir and it’s .buffer files : What are they ? And are they cleaned or is there a way to limit their size and to evaluate the necessary space ? We got a node root volume disk filled by those files (~20GB) and it crashed. Well, the root was filled because we changed the path to /tmp_flink and it was on the root filesystem, our bad. We changed it in an urge because the default path (/tmp) was being periodically cleaned by the OS and that made flink crash. B.R.
RE: Reading Parquet/Hive
I'll answer to myself :) I think i've managed to make it work by creating my "WrappingReadSupport" that wraps the DataWritableReadSupport but I also insert my "WrappingMaterializer" that converts the ArrayWritable produced by the original Materializer to String[]. Then later on, the String[] poses no issues with Tuple and it seems to be OK. Now ... Let's write those String[] in parquet too :) From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com] Sent: vendredi 18 décembre 2015 10:04 To: user@flink.apache.org Subject: Reading Parquet/Hive Hi, I'm trying to read Parquet/Hive data using parquet's ParquetInputFormat and hive's DataWritableReadSupport. I have an error when the TupleSerializer tries to create an instance of ArrayWritable, using reflection because ArrayWritable has no no-args constructor. I've been able to make it work when executing in a local cluster by copying the ArrayWritable class in my own sources and adding the constructor. I guess that the classpath built by maven puts my code first and allows me to override the original class. However when running into the real cluster (yarn@cloudera) the exception comes back (I guess that the original class is first in the classpath). So you have an idea of how I could make it work ? I'm think I'm tied to the ArrayWritable type because of the DataWritableReadSupport that extends ReadSupport. Would it be possible (and not too complicated) to make a DataSource that would not generate Tuples and allow me to convert the ArrayWritable to a more friendly type like String[] ... Or if you have any other idea, they are welcome ! B.R. Gwenhaël PASQUIERS
RE: YARN High Availability
OK, I understand. Maybe we are not really using flink as you intended. The way we are using it, one cluster equals one job. That way we are sure to isolate the different jobs as much as possible and in case of crashes / bugs / (etc) can completely kill one cluster without interfering with the other jobs. That future behavior seems good :-) Instead of the manual flink commands, is there to manually delete those old jobs before launching my job ? They probably are somewhere in hdfs, aren't they ? B.R. -Original Message- From: Ufuk Celebi [mailto:u...@apache.org] Sent: lundi 23 novembre 2015 12:12 To: user@flink.apache.org Subject: Re: YARN High Availability Hey Gwenhaël, the restarting jobs are most likely old job submissions. They are not cleaned up when you shut down the cluster, but only when they finish (either regular finish or after cancelling). The workaround is to use the command line frontend: bin/flink cancel JOBID for each RESTARTING job. Sorry about the inconvenience! We are in an active discussion about addressing this. The future behaviour will be that the startup or shutdown of a cluster cleans up everything and an option to skip this step. The reasoning for the initial solution (not removing anything) was to make sure that no jobs are deleted by accident. But it looks like this is more confusing than helpful. – Ufuk > On 23 Nov 2015, at 11:45, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com> wrote: > > Hi again ! > > On the same topic I'm still trying to start my streaming job with HA. > The HA part seems to be more or less OK (I killed the JobManager and it came > back), however I have an issue with the TaskManagers. > I configured my job to have only one TaskManager and 1 slot that does > [source=>map=>sink]. > The issue I'm encountering is that other instances of my job appear and are > in the RESTARTING status since there is only one task slot. > > Do you know of this, or have an idea of where to look in order to understand > what's happening ? > > B.R. > > Gwenhaël PASQUIERS > > -Original Message- > From: Maximilian Michels [mailto:m...@apache.org] > Sent: jeudi 19 novembre 2015 13:36 > To: user@flink.apache.org > Subject: Re: YARN High Availability > > The docs have been updated. > > On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi <u...@apache.org> wrote: >> I’ve added a note about this to the docs and asked Max to trigger a new >> build of them. >> >> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for >> configuring the root path. >> >> In any case, it is orthogonal to Till’s proposals. That one we need to >> address as well (see FLINK-2929). The motivation for the current behaviour >> was to be rather defensive when removing state in order to not loose data >> accidentally. But it can be confusing, indeed. >> >> – Ufuk >> >>> On 19 Nov 2015, at 12:08, Till Rohrmann <trohrm...@apache.org> wrote: >>> >>> You mean an additional start-up parameter for the `start-cluster.sh` script >>> for the HA case? That could work. >>> >>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> Maybe we could add a user parameter to specify a cluster name that is used >>> to make the paths unique. >>> >>> >>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann <trohrm...@apache.org> wrote: >>> I agree that this would make the configuration easier. However, it entails >>> also that the user has to retrieve the randomized path from the logs if he >>> wants to restart jobs after the cluster has crashed or intentionally >>> restarted. Furthermore, the system won't be able to clean up old checkpoint >>> and job handles in case that the cluster stop was intentional. >>> >>> Thus, the question is how do we define the behaviour in order to retrieve >>> handles and to clean up old handles so that ZooKeeper won't be cluttered >>> with old handles? >>> >>> There are basically two modes: >>> >>> 1. Keep state handles when shutting down the cluster. Provide a mean to >>> define a fixed path when starting the cluster and also a mean to purge old >>> state handles. Furthermore, add a shutdown mode where the handles under the >>> current path are directly removed. This mode would guarantee to always have >>> the state handles available if not explicitly told differently. However, >>> the downside is that ZooKeeper will be cluttered most certainly. >>> >>> 2. Remove the state handles when shutting down the
RE: YARN High Availability
We are not yet using HA in our cluster instances. But yes, we will have to change the zookeeper.path.root ☺ We package our jobs with their own config folder (we don’t rely on flink’s config folder); we can put the maven project name into this property then they will have different values ☺ From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: lundi 23 novembre 2015 14:51 To: user@flink.apache.org Subject: Re: YARN High Availability The problem is the execution graph handle which is stored in ZooKeeper. You can manually remove it via the ZooKeeper shell by simply deleting everything below your `recovery.zookeeper.path.root` ZNode. But you should be sure that the cluster has been stopped before. Do you start the different clusters with different `recovery.zookeeper.path.root` values? If not, then you should run into troubles when running multiple clusters at the same time. The reason is that then all clusters will think that they belong together. Cheers, Till On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: OK, I understand. Maybe we are not really using flink as you intended. The way we are using it, one cluster equals one job. That way we are sure to isolate the different jobs as much as possible and in case of crashes / bugs / (etc) can completely kill one cluster without interfering with the other jobs. That future behavior seems good :-) Instead of the manual flink commands, is there to manually delete those old jobs before launching my job ? They probably are somewhere in hdfs, aren't they ? B.R. -Original Message- From: Ufuk Celebi [mailto:u...@apache.org<mailto:u...@apache.org>] Sent: lundi 23 novembre 2015 12:12 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: YARN High Availability Hey Gwenhaël, the restarting jobs are most likely old job submissions. They are not cleaned up when you shut down the cluster, but only when they finish (either regular finish or after cancelling). The workaround is to use the command line frontend: bin/flink cancel JOBID for each RESTARTING job. Sorry about the inconvenience! We are in an active discussion about addressing this. The future behaviour will be that the startup or shutdown of a cluster cleans up everything and an option to skip this step. The reasoning for the initial solution (not removing anything) was to make sure that no jobs are deleted by accident. But it looks like this is more confusing than helpful. – Ufuk > On 23 Nov 2015, at 11:45, Gwenhael Pasquiers > <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> > wrote: > > Hi again ! > > On the same topic I'm still trying to start my streaming job with HA. > The HA part seems to be more or less OK (I killed the JobManager and it came > back), however I have an issue with the TaskManagers. > I configured my job to have only one TaskManager and 1 slot that does > [source=>map=>sink]. > The issue I'm encountering is that other instances of my job appear and are > in the RESTARTING status since there is only one task slot. > > Do you know of this, or have an idea of where to look in order to understand > what's happening ? > > B.R. > > Gwenhaël PASQUIERS > > -Original Message- > From: Maximilian Michels [mailto:m...@apache.org<mailto:m...@apache.org>] > Sent: jeudi 19 novembre 2015 13:36 > To: user@flink.apache.org<mailto:user@flink.apache.org> > Subject: Re: YARN High Availability > > The docs have been updated. > > On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi > <u...@apache.org<mailto:u...@apache.org>> wrote: >> I’ve added a note about this to the docs and asked Max to trigger a new >> build of them. >> >> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for >> configuring the root path. >> >> In any case, it is orthogonal to Till’s proposals. That one we need to >> address as well (see FLINK-2929). The motivation for the current behaviour >> was to be rather defensive when removing state in order to not loose data >> accidentally. But it can be confusing, indeed. >> >> – Ufuk >> >>> On 19 Nov 2015, at 12:08, Till Rohrmann >>> <trohrm...@apache.org<mailto:trohrm...@apache.org>> wrote: >>> >>> You mean an additional start-up parameter for the `start-cluster.sh` script >>> for the HA case? That could work. >>> >>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek >>> <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: >>> Maybe we could add a user parameter to specify a cluster name that is used >>> to make the paths unique. >>> &
RE: YARN High Availability
Nevermind, Looking at the logs I saw that it was having issues trying to connect to ZK. To make I short is had the wrong port. It is now starting. Tomorrow I’ll try to kill some JobManagers *evil*. Another question : if I have multiple HA flink jobs, are there some points to check in order to be sure that they won’t collide on hdfs or ZK ? B.R. Gwenhaël PASQUIERS From: Till Rohrmann [mailto:till.rohrm...@gmail.com] Sent: mercredi 18 novembre 2015 18:01 To: user@flink.apache.org Subject: Re: YARN High Availability Hi Gwenhaël, do you have access to the yarn logs? Cheers, Till On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hello, We’re trying to set up high availability using an existing zookeeper quorum already running in our Cloudera cluster. So, as per the doc we’ve changed the max attempt in yarn’s config as well as the flink.yaml. recovery.mode: zookeeper recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181 state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///flink/checkpoints recovery.zookeeper.storageDir: hdfs:///flink/recovery/ yarn.application-attempts: 1000 Everything is ok as long as recovery.mode is commented. As soon as I uncomment recovery.mode the deployment on yarn is stuck on : “Deploying cluster, current state ACCEPTED”. “Deployment took more than 60 seconds….” Every second. And I have more than enough resources available on my yarn cluster. Do you have any idea of what could cause this, and/or what logs I should look for in order to understand ? B.R. Gwenhaël PASQUIERS
MaxPermSize on yarn
Hi, We're having some OOM permgen exceptions when running on yarn. We're not yet sure if it is either a consequence or a cause of our crashes, but we've been trying to increase that value... And we did not find how to do it. I've seen that the yarn-daemon.sh sets a 256m value. It looks to me that it's also possible to customize the YarnClient JVM args, but it will only be for the client, not for the TaskManagers. Do you know of a way to do it ? B.R. Gwenhaël PASQUIERS
RE: Building Flink with hadoop 2.6
The first class that it can not find is : org.apache.log4j.Level The org.apache.log4j package is not present in the fat jar I get from the mvn command, but it is in the one you distributed on your website. From: Robert Metzger [mailto:rmetz...@apache.org] Sent: mercredi 14 octobre 2015 16:54 To: user@flink.apache.org Subject: Re: Building Flink with hadoop 2.6 Great. Which classes can it not find at runtime? I'll try to build and run Flink with exactly the command you've provided. On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi Robert ! I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “. From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>] Sent: mercredi 14 octobre 2015 16:47 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Building Flink with hadoop 2.6 Hi Gwen, can you tell us the "mvn" command you're using for building Flink? On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi ; We need to test some things with flink and hadoop 2.6 (the trunc method). We’ve set up a build task on our Jenkins and everything seem okay. However when we replace the original jar from your 0.10-SNAPSHOT distribution by ours there are some missing dependencies (log4j, curator, and maybe others) and we get some ClassNotFoundException at runtime. Are we missing some build parameters ? Thanks in advance, B.R.
Building Flink with hadoop 2.6
Hi ; We need to test some things with flink and hadoop 2.6 (the trunc method). We've set up a build task on our Jenkins and everything seem okay. However when we replace the original jar from your 0.10-SNAPSHOT distribution by ours there are some missing dependencies (log4j, curator, and maybe others) and we get some ClassNotFoundException at runtime. Are we missing some build parameters ? Thanks in advance, B.R.
RE: Building Flink with hadoop 2.6
Yes, we’re onto the exactly-once ; trying to write RCFiles (Parquet and ORCFiles are not compatible because of their footer). It seems to be working perfectly. As expected, Flink is falling back to .valid-length metadata on HDFS 2.6 (and 2.3). From: Robert Metzger [mailto:rmetz...@apache.org] Sent: mercredi 14 octobre 2015 17:23 To: user@flink.apache.org Subject: Re: Building Flink with hadoop 2.6 Great. We are shading curator now into a different location, that's why you can't find it anymore. I suspect you're trying out our new exactly-once filesystem sinks. Please let us know how well its working for you and if you're missing something. Its a pretty new feature :) Also note that you can use the fs sinks with hadoop versions below 2.7.0, then we'll write some metadata containing the valid offsets. On Wed, Oct 14, 2015 at 5:18 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Yes … You’re right. Anyway, adding the log4j jar solved the issue and our app is working properly, thanks ! About curator, I just observed that it was not there anymore when comparing the old and new fatjars. But it’s probably now in another dependency, anyway there is no curator-related error so it just probably moved. Thanks ! Gwen’ From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>] Sent: mercredi 14 octobre 2015 17:06 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Building Flink with hadoop 2.6 One more thing regarding the truncate method: Its supported as of Hadoop 2.7.0 (https://issues.apache.org/jira/browse/HDFS-3107) On Wed, Oct 14, 2015 at 5:00 PM, Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote: Ah, I know what's causing this issue. In the latest 0.10-SNAPSHOT, we have removed log4j from the fat jar. Can you copy everything from the lib/ folder from your maven build into the lib/ folder of your flink installation? Log4j is now in a separate jar in the lib/ folder . What about the curator dependency issue? On Wed, Oct 14, 2015 at 4:56 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: The first class that it can not find is : org.apache.log4j.Level The org.apache.log4j package is not present in the fat jar I get from the mvn command, but it is in the one you distributed on your website. From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>] Sent: mercredi 14 octobre 2015 16:54 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Building Flink with hadoop 2.6 Great. Which classes can it not find at runtime? I'll try to build and run Flink with exactly the command you've provided. On Wed, Oct 14, 2015 at 4:49 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi Robert ! I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “. From: Robert Metzger [mailto:rmetz...@apache.org<mailto:rmetz...@apache.org>] Sent: mercredi 14 octobre 2015 16:47 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Building Flink with hadoop 2.6 Hi Gwen, can you tell us the "mvn" command you're using for building Flink? On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi ; We need to test some things with flink and hadoop 2.6 (the trunc method). We’ve set up a build task on our Jenkins and everything seem okay. However when we replace the original jar from your 0.10-SNAPSHOT distribution by ours there are some missing dependencies (log4j, curator, and maybe others) and we get some ClassNotFoundException at runtime. Are we missing some build parameters ? Thanks in advance, B.R.
RE: Building Flink with hadoop 2.6
Hi Robert ! I’m using ” mvn clean install -DskipTests -Dhadoop.version=2.6.0 “. From: Robert Metzger [mailto:rmetz...@apache.org] Sent: mercredi 14 octobre 2015 16:47 To: user@flink.apache.org Subject: Re: Building Flink with hadoop 2.6 Hi Gwen, can you tell us the "mvn" command you're using for building Flink? On Wed, Oct 14, 2015 at 4:37 PM, Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote: Hi ; We need to test some things with flink and hadoop 2.6 (the trunc method). We’ve set up a build task on our Jenkins and everything seem okay. However when we replace the original jar from your 0.10-SNAPSHOT distribution by ours there are some missing dependencies (log4j, curator, and maybe others) and we get some ClassNotFoundException at runtime. Are we missing some build parameters ? Thanks in advance, B.R.
Flink 0.9.1 Kafka 0.8.1
Hi everyone, We're trying to use consume a 0.8.1 Kafka on Flink 0.9.1 and we've run into the following issue : My offset became OutOfRange however now when I start my job, it loops on the OutOfRangeException, no matter what the value of auto.offset.reset is... (earliest, latest, largest, smallest) Looks like it doesn't fix the invalid offset and immediately goes into error... Then Flink restarts the job, and failes again ... etc ... Do you have an idea of what is wrong, or could it be an issue in flink ? B.R. Gwenhaël PASQUIERS
RE: Application-specific loggers configuration
Hi ! Yes, we’re starting our job with “flink run --jobmanager yarn-cluster” So it’s perfect, we’ll use your fix and, when it’s out, we’ll switch to flink 0.9.1. B.R. From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: mardi 25 août 2015 19:25 To: user@flink.apache.org Subject: Re: Application-specific loggers configuration Hi Gwenhaël, are you using the one-yarn-cluster-per-job mode of Flink? I.e., you are starting your Flink job with (from the doc): flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar If you are, then this is almost possible on the current version of Flink. What you have to do is copy the conf directory of Flink to a separate directory that is specific to your job. There you make your modifications to the log configuration etc. Then, when you start your job you do this instead: export FLINK_CONF_DIR=/path/to/my/conf flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar You can easily put this into your startup script, of course. I said almost possible because this requires a small fix in bin/flink. Around line 130 this line: FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf needs to be replaced by this line: if [ -z $FLINK_CONF_DIR ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi (We will fix this in the upcoming version and the 0.9.1 bugfix release.) Does this help? Let us know if you are not using the one-yarn-cluster-per-job mode, then we'll have to try to find another solution. Best, Aljoscha On Tue, 25 Aug 2015 at 16:22 Gwenhael Pasquiers gwenhael.pasqui...@ericsson.commailto:gwenhael.pasqui...@ericsson.com wrote: Hi, We’re developing the first of (we hope) many flink streaming app. We’d like to package the logging configuration (log4j) together with the jar. Meaning, different application will probably have different logging configuration (ex: to different logstash ports) … Is there a way to “override” the many log4j properties files that are in flink/conf./*.properties ? In our environment, the flink binaries would be on the PATH, and our apps would be : - Jar file - App configuration files - Log configuration files - Startup script B.R. Gwenhaël PASQUIERS
Application-specific loggers configuration
Hi, We're developing the first of (we hope) many flink streaming app. We'd like to package the logging configuration (log4j) together with the jar. Meaning, different application will probably have different logging configuration (ex: to different logstash ports) ... Is there a way to override the many log4j properties files that are in flink/conf./*.properties ? In our environment, the flink binaries would be on the PATH, and our apps would be : - Jar file - App configuration files - Log configuration files - Startup script B.R. Gwenhaël PASQUIERS