Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.
Hi, I've run it on a standalone Flink cluster. No Yarn involved. From: Haibo Sun Sent: Friday, June 28, 2019 6:13 AM To: Vadim Vararu Cc: user@flink.apache.org Subject: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object. Hi, Vadim This similar issue has occurred in earlier versions, see https://issues.apache.org/jira/browse/FLINK-4485. Are you running a Flink session cluster on YARN? I think it might be a bug. I'll see if I can reproduce it with the master branch code, and if yes, I will try to investigate it. If someone already knows the cause of the problem, that's the best, it won't need to be re-investigated. Best, Haibo At 2019-06-28 00:46:43, "Vadim Vararu" wrote: Hi guys, I have a simple batch job with a custom output formatter that writes to a local file. public class JobHadoop { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Sets.newHashSet("line1", "line2", "line3")) .map(line -> line + "dd") .write(new HadoopUsageOutputFormat(), "file:///tmp/out"); env.execute(JobHadoop.class.getName()); } } public class HadoopUsageOutputFormat extends FileOutputFormat implements OutputFormat { private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration(); public static final String DEFAULT_LINE_DELIMITER = "\n"; private Writer writer; static { DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1"); } @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); writer = new OutputStreamWriter(new BufferedOutputStream(stream)); } @Override public void writeRecord(String record) throws IOException { writer.write(record); writer.write(DEFAULT_LINE_DELIMITER); } @Override public void close() throws IOException { if (writer != null) { this.writer.flush(); this.writer.close(); } super.close(); } } The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion. [cid:86fb55b$1$16b9c126cfc$Coremail$sunhaibotb$163.com] Also, the problem reproduces only if I actually invoke the set method of Configuration: static { DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1"); } >From my observations, if I change the private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration(); to a non-static field, then the problem does no reproduce any more. However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself. Thanks, Vadim.
Flink batch job memory/disk leak when invoking set method on a static Configuration object.
Hi guys, I have a simple batch job with a custom output formatter that writes to a local file. public class JobHadoop { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Sets.newHashSet("line1", "line2", "line3")) .map(line -> line + "dd") .write(new HadoopUsageOutputFormat(), "file:///tmp/out"); env.execute(JobHadoop.class.getName()); } } public class HadoopUsageOutputFormat extends FileOutputFormat implements OutputFormat { private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration(); public static final String DEFAULT_LINE_DELIMITER = "\n"; private Writer writer; static { DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1"); } @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); writer = new OutputStreamWriter(new BufferedOutputStream(stream)); } @Override public void writeRecord(String record) throws IOException { writer.write(record); writer.write(DEFAULT_LINE_DELIMITER); } @Override public void close() throws IOException { if (writer != null) { this.writer.flush(); this.writer.close(); } super.close(); } } The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion. [cid:55491778-9e15-4f39-bb1a-637d855e68fb] Also, the problem reproduces only if I actually invoke the set method of Configuration: static { DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1"); } >From my observations, if I change the private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration(); to a non-static field, then the problem does no reproduce any more. However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself. Thanks, Vadim.
Broadcast state before events stream consumption
Hi all, I need to use the broadcast state mechanism (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html) for the next scenario. I have a reference data stream (slow) and an events stream (fast running) and I want to do a kind of lookup in the reference stream for each event. The broadcast state mechanism seems to fit perfect the scenario. >From documentation: As an example where broadcast state can emerge as a natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream. However, I am not sure what is the correct way to delay the consumption of the fast running stream until the slow one is fully read (in case of a file) or until a marker is emitted (in case of some other source). Is there any way to accomplish that? It doesn't seem to be a rare use case. Thanks, Vadim.
Flink streaming. Broadcast reference data map across nodes
Hi all, I would like to do something similar to Spark's broadcast mechanism. Basically, i have a big dictionary of reference data that has to be accessible from all the nodes (in order to do some joins of log line with reference line). I did not find yet a way to do it. Any ideas?
Re: Is it OK to have very many session windows?
It's something like: DataStreamSource stream = env.addSource(getKafkaConsumer(parameterTool)); stream .map(getEventToDomainMapper()) .keyBy(getKeySelector()) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(90))) .reduce(getReducer()) .map(getToJsonMapper()) .addSink(getKafkaProducer(parameterTool)); Each new event may be reduced against the existent state from the window, so normally it's okay to have only 1 row in memory. I'm new to Flink and have not yet reached the "incremental aggregates" but, if i understand correctly, it fits my case. Vadim. On 20.02.2017 17:47, Timo Walther wrote: Hi Vadim, this of course depends on your use case. The question is how large is your state per pane and how much memory is available for Flink? Are you using incremental aggregates such that only the aggregated value per pane has to be kept in memory? Regards, Timo Am 20/02/17 um 16:34 schrieb Vadim Vararu: HI guys, Is it okay to have very many (tens of thousands or hundreds of thousand) of session windows? Thanks, Vadim.
Is it OK to have very many session windows?
HI guys, Is it okay to have very many (tens of thousands or hundreds of thousand) of session windows? Thanks, Vadim.