Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-28 Thread Vadim Vararu
Hi,

I've run it on a standalone Flink cluster. No Yarn involved.

From: Haibo Sun 
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: user@flink.apache.org
Subject: Re:Flink batch job memory/disk leak when invoking set method on a 
static Configuration object.

Hi, Vadim

This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.

If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.

Best,
Haibo


At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}

public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.

[cid:86fb55b$1$16b9c126cfc$Coremail$sunhaibotb$163.com]


Also, the problem reproduces only if I actually invoke the set method of 
Configuration:

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

>From my observations, if I change the

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();

to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.

Thanks,
Vadim.



Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-27 Thread Vadim Vararu
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}

public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.

[cid:55491778-9e15-4f39-bb1a-637d855e68fb]


Also, the problem reproduces only if I actually invoke the set method of 
Configuration:

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

>From my observations, if I change the

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();

to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.

Thanks,
Vadim.



Broadcast state before events stream consumption

2019-02-08 Thread Vadim Vararu
Hi all,

I need to use the broadcast state mechanism 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
 for the next scenario.

I have a reference data stream (slow) and an events stream (fast running) and I 
want to do a kind of lookup in the reference stream for each
event. The broadcast state mechanism seems to fit perfect the scenario.

>From documentation:
As an example where broadcast state can emerge as a natural fit, one can 
imagine a low-throughput stream containing a set of rules which we want to 
evaluate against all elements coming from another stream.

However, I am not sure what is the correct way to delay the consumption of the 
fast running stream until the slow one is fully read (in case of a file) or 
until a marker is emitted (in case of some other source). Is there any way to 
accomplish that? It doesn't seem to be a rare use case.

Thanks, Vadim.


Flink streaming. Broadcast reference data map across nodes

2017-02-21 Thread Vadim Vararu

Hi all,


I would like to do something similar to Spark's broadcast mechanism.

Basically, i have a big dictionary of reference data that has to be 
accessible from all the nodes (in order to do some joins of log line 
with reference line).


I did not find yet a way to do it.


Any ideas?



Re: Is it OK to have very many session windows?

2017-02-20 Thread Vadim Vararu

It's something like:

DataStreamSource stream = 
env.addSource(getKafkaConsumer(parameterTool)); stream
.map(getEventToDomainMapper())
.keyBy(getKeySelector())
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(90)))
.reduce(getReducer())
.map(getToJsonMapper())
.addSink(getKafkaProducer(parameterTool));


Each new event may be reduced against the existent state from the 
window, so normally it's okay to have only 1 row in memory.


I'm new to Flink and have not yet reached the "incremental aggregates" 
but, if i understand correctly, it fits my case.


Vadim.



On 20.02.2017 17:47, Timo Walther wrote:

Hi Vadim,

this of course depends on your use case. The question is how large is 
your state per pane and how much memory is available for Flink?
Are you using incremental aggregates such that only the aggregated 
value per pane has to be kept in memory?


Regards,
Timo


Am 20/02/17 um 16:34 schrieb Vadim Vararu:

HI guys,

Is it okay to have very many (tens of thousands or hundreds of 
thousand) of session windows?



Thanks, Vadim.







Is it OK to have very many session windows?

2017-02-20 Thread Vadim Vararu

HI guys,

Is it okay to have very many (tens of thousands or hundreds of thousand) 
of session windows?



Thanks, Vadim.