Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Rinat
Chesnay, thx for your reply, I’ve created one 
https://issues.apache.org/jira/browse/FLINK-9558 



> On 8 Jun 2018, at 12:58, Chesnay Schepler  wrote:
> 
> I agree, if the sink doesn't properly work without checkpointing we should 
> make sure that it fails early if it used that way.
> 
> It would be great if you could open a JIRA.
> 
> On 08.06.2018 10:08, Rinat wrote:
>> Piotr, thx for your reply, for now everything is pretty clear. But from my 
>> point of view, it’s better to add some information about leaks in case of 
>> disabled checkpointing into BucketingSink documentation
>> 
>>> On 8 Jun 2018, at 10:35, Piotr Nowojski >> > wrote:
>>> 
>>> Hi,
>>> 
>>> BucketingSink is designed to provide exactly-once writes to file system, 
>>> which is inherently tied to checkpointing. As you just saw, without 
>>> checkpointing, BucketingSink is never notified that it can commit pending 
>>> files. 
>>> 
>>> If you do not want to use checkpointing for some reasons, you could always 
>>> use for example 
>>> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
>>> and write your own simple `OutputFormat` or look if   one 
>>> of the existing ones meet your needs.
>>> 
>>> Piotrek
>>> 
 On 7 Jun 2018, at 14:23, Rinat >>> > wrote:
 
 Hi mates, we got some Flink jobs, that are writing data from kafka into 
 hdfs, using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, 
 it not a big problem for us, if some files are remained opened in case of 
 job reloading.
 
 Periodically, those jobs fail with OutOfMemory exception, and seems, that 
 I found a strange thing in the implementation of BucketingSink.
 
 During the sink lifecycle, we have a state object, implemented as a map, 
 where key is a bucket path, and value is a state, that contains 
 information about opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
 information about ~ 1_000 buckets and their state, all this stuff weights 
 ~ 120 Mb.
 
 I’ve looked through the code, and found, that we removing the buckets from 
 the state, in notifyCheckpointComplete method. 
 
 @Override
 public void notifyCheckpointComplete(long checkpointId) throws Exception {
   Iterator>> bucketStatesIt = 
 state.bucketStates.entrySet().iterator();
   while (bucketStatesIt.hasNext()) {
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
 
// We've dealt with all the pending files and the writer for this 
 bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it 
 from our state.
bucketStatesIt.remove();
 }
 }
 }
 
 So, this looks like an issue, when you are using this sink in 
 checkpointless environment, because the data always added to the state, 
 but never removed.
 Of course, we could enabled checkpointing, and use one of available 
 backends, but as for me, it seems like a non expected behaviour, like I 
 have an opportunity to run the job without checkpointing, but really, if I 
 do so,
 I got an exception in sink component.
 
 What do you think about this ? Do anyone got the same problem, and how’ve 
 you solved it ?
 
 Sincerely yours,
 Rinat Sharipov
 Software Engineer at 1DMP CORE Team
 
 email: r.shari...@cleverdata.ru 
 mobile: +7 (925) 416-37-26
 
 CleverDATA
 make your data clever
 
>>> 
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Chesnay Schepler
I agree, if the sink doesn't properly work without checkpointing we 
should make sure that it fails early if it used that way.


It would be great if you could open a JIRA.

On 08.06.2018 10:08, Rinat wrote:
Piotr, thx for your reply, for now everything is pretty clear. But 
from my point of view, it’s better to add some information about leaks 
in case of disabled checkpointing into BucketingSink documentation


On 8 Jun 2018, at 10:35, Piotr Nowojski > wrote:


Hi,

BucketingSink is designed to provide exactly-once writes to file 
system, which is inherently tied to checkpointing. As you just saw, 
without checkpointing, BucketingSink is never notified that it can 
commit pending files.


If you do not want to use checkpointing for some reasons, you could 
always use for example 
org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
and write your own simple `OutputFormat` or look if one of the 
existing ones meet your needs.


Piotrek

On 7 Jun 2018, at 14:23, Rinat > wrote:


Hi mates, we got some Flink jobs, that are writing data from kafka 
into hdfs, using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For 
now, it not a big problem for us, if some files are remained opened 
in case of job reloading.


Periodically, those jobs fail with *OutOfMemory *exception, and 
seems, that I found a strange thing in the implementation of 
BucketingSink.


During the sink lifecycle, we have a state object, implemented as a 
map, where key is a bucket path, and value is a state, that contains 
information about opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff 
weights ~ 120 Mb.


I’ve looked through the code, and found, that we removing the 
buckets from the state, in *notifyCheckpointComplete *method.


@Override public void notifyCheckpointComplete(long checkpointId)throws 
Exception {
   Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
while (bucketStatesIt.hasNext()) {
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {

// We've dealt with all the pending files and the writer for this 
bucket is not currently open. // Therefore this bucket is currently 
inactive and we can remove it from our state. bucketStatesIt.remove();

 }
}
}

So, this looks like an issue, when you are using this sink in 
checkpointless environment, because the data always added to the 
state, but never removed.
Of course, we could enabled checkpointing, and use one of available 
backends, but as for me, it seems like a non expected behaviour, 
like I have an opportunity to run the job without checkpointing, but 
really, if I do so,

I got an exception in sink component.

What do you think about this ? Do anyone got the same problem, and 
how’ve you solved it ?


Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever





Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever





Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Rinat
Piotr, thx for your reply, for now everything is pretty clear. But from my 
point of view, it’s better to add some information about leaks in case of 
disabled checkpointing into BucketingSink documentation

> On 8 Jun 2018, at 10:35, Piotr Nowojski  wrote:
> 
> Hi,
> 
> BucketingSink is designed to provide exactly-once writes to file system, 
> which is inherently tied to checkpointing. As you just saw, without 
> checkpointing, BucketingSink is never notified that it can commit pending 
> files. 
> 
> If you do not want to use checkpointing for some reasons, you could always 
> use for example 
> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
> and write your own simple `OutputFormat` or look if one of the existing ones 
> meet your needs.
> 
> Piotrek
> 
>> On 7 Jun 2018, at 14:23, Rinat > > wrote:
>> 
>> Hi mates, we got some Flink jobs, that are writing data from kafka into 
>> hdfs, using Bucketing-Sink.
>> For some reasons, those jobs are running without checkpointing. For now, it 
>> not a big problem for us, if some files are remained opened in case of job 
>> reloading.
>> 
>> Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
>> found a strange thing in the implementation of BucketingSink.
>> 
>> During the sink lifecycle, we have a state object, implemented as a map, 
>> where key is a bucket path, and value is a state, that contains information 
>> about opened files and list of pending files.
>> After researching of the heap dump, I found, that those state stores 
>> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
>> 120 Mb.
>> 
>> I’ve looked through the code, and found, that we removing the buckets from 
>> the state, in notifyCheckpointComplete method. 
>> 
>> @Override
>> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>>   Iterator>> bucketStatesIt = 
>> state.bucketStates.entrySet().iterator();
>>   while (bucketStatesIt.hasNext()) {
>>if (!bucketState.isWriterOpen &&
>>bucketState.pendingFiles.isEmpty() &&
>>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>> 
>>// We've dealt with all the pending files and the writer for this 
>> bucket is not currently open.
>>// Therefore this bucket is currently inactive and we can remove it 
>> from our state.
>>bucketStatesIt.remove();
>> }
>> }
>> }
>> 
>> So, this looks like an issue, when you are using this sink in checkpointless 
>> environment, because the data always added to the state, but never removed.
>> Of course, we could enabled checkpointing, and use one of available 
>> backends, but as for me, it seems like a non expected behaviour, like I have 
>> an opportunity to run the job without checkpointing, but really, if I do so,
>> I got an exception in sink component.
>> 
>> What do you think about this ? Do anyone got the same problem, and how’ve 
>> you solved it ?
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Piotr Nowojski
Hi,

BucketingSink is designed to provide exactly-once writes to file system, which 
is inherently tied to checkpointing. As you just saw, without checkpointing, 
BucketingSink is never notified that it can commit pending files. 

If you do not want to use checkpointing for some reasons, you could always use 
for example 
org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat and 
write your own simple `OutputFormat` or look if one of the existing ones meet 
your needs.

Piotrek

> On 7 Jun 2018, at 14:23, Rinat  wrote:
> 
> Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
> using Bucketing-Sink.
> For some reasons, those jobs are running without checkpointing. For now, it 
> not a big problem for us, if some files are remained opened in case of job 
> reloading.
> 
> Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
> found a strange thing in the implementation of BucketingSink.
> 
> During the sink lifecycle, we have a state object, implemented as a map, 
> where key is a bucket path, and value is a state, that contains information 
> about opened files and list of pending files.
> After researching of the heap dump, I found, that those state stores 
> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
> 120 Mb.
> 
> I’ve looked through the code, and found, that we removing the buckets from 
> the state, in notifyCheckpointComplete method. 
> 
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>   Iterator>> bucketStatesIt = 
> state.bucketStates.entrySet().iterator();
>   while (bucketStatesIt.hasNext()) {
>if (!bucketState.isWriterOpen &&
>bucketState.pendingFiles.isEmpty() &&
>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
> 
>// We've dealt with all the pending files and the writer for this 
> bucket is not currently open.
>// Therefore this bucket is currently inactive and we can remove it 
> from our state.
>bucketStatesIt.remove();
> }
> }
> }
> 
> So, this looks like an issue, when you are using this sink in checkpointless 
> environment, because the data always added to the state, but never removed.
> Of course, we could enabled checkpointing, and use one of available backends, 
> but as for me, it seems like a non expected behaviour, like I have an 
> opportunity to run the job without checkpointing, but really, if I do so,
> I got an exception in sink component.
> 
> What do you think about this ? Do anyone got the same problem, and how’ve you 
> solved it ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



[flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-07 Thread Rinat
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.

Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.

During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.

I’ve looked through the code, and found, that we removing the buckets from the 
state, in notifyCheckpointComplete method. 

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
   if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
}
}

So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
Of course, we could enabled checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so,
I got an exception in sink component.

What do you think about this ? Do anyone got the same problem, and how’ve you 
solved it ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever