[jira] [Comment Edited] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-11 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646230#comment-16646230
 ] 

Rinat Sharipov edited comment on FLINK-9592 at 10/11/18 11:17 AM:
--

Hi [~kkl0u] , during the process of migration to the latest Flink, we've 
decided to try to contribute functionality that adds ability to hook the state 
changes in bucketing sink, so PR with this feature is available.

We are very interested in having this feature in BucketingSink and are always 
open for discussion.

Thx !


was (Author: kent2171):
Hi [~kkl0u] , during the process of migration to the latest Flink, we've 
decided to try to contribute functionality that adds ability to hook the state 
changes in bucketing sink, so PR with this feature is available.

We are very interested in having those feature in BucketingSink and are always 
open for discussion.

Thx !

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-11 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646230#comment-16646230
 ] 

Rinat Sharipov commented on FLINK-9592:
---

Hi [~kkl0u] , during the process of migration to the latest Flink, we've 
decided to try to contribute functionality that adds ability to hook the state 
changes in bucketing sink, so PR with this feature is available.

We are very interested in having those feature in BucketingSink and are always 
open for discussion.

Thx !

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem 

[jira] [Updated] (FLINK-10525) Deserialization schema, skip data, that couldn't be properly deserialized

2018-10-10 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-10525:
---
Description: 
Hi mates, in accordance with the contract of 
*org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
return *null* value, when content couldn’t be deserialized.

But in most cases (e.x. 
*org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails, if 
data doesn't satisfy expected schema. 
  
 We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
satisfy schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version. 

I think, that it’ll be useful feature, if Flink will support optional skip of 
failed records in avro and other Deserializers

  was:
Hi mates, in accordance with the contract of 
*org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
return *null* value, when content couldn’t be deserialized.

But in most cases (e.x. 
*org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails if data 
is corrupted. 
  
 We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version. 

I think, that it’ll be useful feature, if Flink will support optional skip of 
failed records in avro and other Deserializers


> Deserialization schema, skip data, that couldn't be properly deserialized
> -
>
> Key: FLINK-10525
> URL: https://issues.apache.org/jira/browse/FLINK-10525
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Rinat Sharipov
>Priority: Minor
>
> Hi mates, in accordance with the contract of 
> *org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
> return *null* value, when content couldn’t be deserialized.
> But in most cases (e.x. 
> *org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails, if 
> data doesn't satisfy expected schema. 
>   
>  We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
> satisfy schema, but it’s rather hard to maintain this functionality during 
> migration to the latest Flink version. 
> I think, that it’ll be useful feature, if Flink will support optional skip of 
> failed records in avro and other Deserializers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10525) Deserialization schema, skip data, that couldn't be properly deserialized

2018-10-10 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-10525:
---
Description: 
Hi mates, in accordance with the contract of 
*org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
return *null* value, when content couldn’t be deserialized.

But in most cases (e.x. 
*org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails if data 
is corrupted. 
  
 We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version. 

I think, that it’ll be useful feature, if Flink will support optional skip of 
failed records in avro and other Deserializers

  was:
Hi mates, in accordance with the contract of 
*org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
return *null* value, when content couldn’t be deserialized.

But in most cases (e.x. 
*org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails if data 
is corrupted. 
  
 We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version. 

I think, that it’ll be useful if Flink will support optional skip of failed 
records in avro and other Deserializers in the source code


> Deserialization schema, skip data, that couldn't be properly deserialized
> -
>
> Key: FLINK-10525
> URL: https://issues.apache.org/jira/browse/FLINK-10525
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Rinat Sharipov
>Priority: Minor
>
> Hi mates, in accordance with the contract of 
> *org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
> return *null* value, when content couldn’t be deserialized.
> But in most cases (e.x. 
> *org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails if 
> data is corrupted. 
>   
>  We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
> satisfy avro schema, but it’s rather hard to maintain this functionality 
> during migration to the latest Flink version. 
> I think, that it’ll be useful feature, if Flink will support optional skip of 
> failed records in avro and other Deserializers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10525) Deserialization schema, skip data, that couldn't be properly deserialized

2018-10-10 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-10525:
---
Description: 
Hi mates, in accordance with the contract of 
*org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
return *null* value, when content couldn’t be deserialized.

But in most cases (e.x. 
*org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails if data 
is corrupted. 
  
 We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version. 

I think, that it’ll be useful if Flink will support optional skip of failed 
records in avro and other Deserializers in the source code

  was:
Hi mates, in accordance with the contract of 
org.apache.flink.formats.avro.DeserializationSchema, it should return *null* 
value, when content couldn’t be deserialized.
But in most cases (for example 
org.apache.flink.formats.avro.AvroDeserializationSchema) method fails if data 
is corrupted. 
 
We’ve implemented our own SerDe class, that returns null, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version.


> Deserialization schema, skip data, that couldn't be properly deserialized
> -
>
> Key: FLINK-10525
> URL: https://issues.apache.org/jira/browse/FLINK-10525
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Rinat Sharipov
>Priority: Minor
>
> Hi mates, in accordance with the contract of 
> *org.apache.flink.api.common.serialization.DeserializationSchema*, it should 
> return *null* value, when content couldn’t be deserialized.
> But in most cases (e.x. 
> *org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails if 
> data is corrupted. 
>   
>  We’ve implemented our own SerDe class, that returns *null*, if data doesn’t 
> satisfy avro schema, but it’s rather hard to maintain this functionality 
> during migration to the latest Flink version. 
> I think, that it’ll be useful if Flink will support optional skip of failed 
> records in avro and other Deserializers in the source code



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10525) Deserialization schema, skip data, that couldn't be properly deserialized

2018-10-10 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-10525:
--

 Summary: Deserialization schema, skip data, that couldn't be 
properly deserialized
 Key: FLINK-10525
 URL: https://issues.apache.org/jira/browse/FLINK-10525
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Rinat Sharipov


Hi mates, in accordance with the contract of 
org.apache.flink.formats.avro.DeserializationSchema, it should return *null* 
value, when content couldn’t be deserialized.
But in most cases (for example 
org.apache.flink.formats.avro.AvroDeserializationSchema) method fails if data 
is corrupted. 
 
We’ve implemented our own SerDe class, that returns null, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)

2018-06-17 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515084#comment-16515084
 ] 

Rinat Sharipov commented on FLINK-9603:
---

I've created a PR, that fixes this problem, please, have a look 
https://github.com/apache/flink/pull/6176/files

> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> 
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.5.0
>Reporter: Rinat Sharipov
>Assignee: vinoyang
>Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>   
>  During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
>  So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>   
>  This problem is related with the following code:
>  *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
>  *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>   
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
>partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)

2018-06-17 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515072#comment-16515072
 ] 

Rinat Sharipov commented on FLINK-9603:
---

Hi mates, I got the following proposal about fix of this issue:
 * build path using the same method (or some kind of Builder), instead of using 
the same logic multiple times across the code (DRY)
 * test, that part index is properly incremented, when part suffix is specified
 ** in-progress file exists
 ** pending file exists
 ** file in final state exists
 * and test the same cases when part suffix is not specified

 

 

> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> 
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.5.0
>Reporter: Rinat Sharipov
>Assignee: vinoyang
>Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>   
>  During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
>  So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>   
>  This problem is related with the following code:
>  *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
>  *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>   
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
>partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)

2018-06-16 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9603:
--
Description: 
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
  
 During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
 So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
  
 This problem is related with the following code:
 *{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
 *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
  
{code:java}
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
   bucketState.partCounter++;
   partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
{code}
*{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}*
{code:java}
if (partSuffix != null) {
   partPath = partPath.suffix(partSuffix);
}
{code}

  was:
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
 
During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
 
This problem is related with the following code:
*{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
*{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
 
{code:java}
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
   bucketState.partCounter++;
   partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
{code}
*{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}***
{code:java}
if (partSuffix != null) {
   partPath = partPath.suffix(partSuffix);
}
{code}


> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> 
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.5.0
>Reporter: Rinat Sharipov
>Assignee: vinoyang
>Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>   
>  During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
>  So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>   
>  This problem is related with the following code:
>  *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
>  *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>   
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we 

[jira] [Updated] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)

2018-06-16 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9603:
--
Description: 
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
 
During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
 
This problem is related with the following code:
*{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
*{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
 
{code:java}
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
   bucketState.partCounter++;
   partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
{code}
*{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}***
{code:java}
if (partSuffix != null) {
   partPath = partPath.suffix(partSuffix);
}
{code}

  was:
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
 
During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
 
This problem is related with the following code:
*{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
*{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*Path 
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);while (fs.exists(partPath) ||
 fs.exists(getPendingPathFor(partPath)) ||
 fs.exists(getInProgressPathFor(partPath))) {
 bucketState.partCounter++;
 partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

 *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}* if (partSuffix 
!= null) {
 partPath = partPath.suffix(partSuffix);
}


> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> 
>
> Key: FLINK-9603
> URL: https://issues.apache.org/jira/browse/FLINK-9603
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.5.0
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>  
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>  
> This problem is related with the following code:
> *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
> *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>  
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}***
> 

[jira] [Created] (FLINK-9603) Incorrect indexing of part files, when part suffix is specified (FileAlreadyExistsException)

2018-06-16 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-9603:
-

 Summary: Incorrect indexing of part files, when part suffix is 
specified (FileAlreadyExistsException)
 Key: FLINK-9603
 URL: https://issues.apache.org/jira/browse/FLINK-9603
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.5.0
Reporter: Rinat Sharipov


Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.
 
During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.
 
This problem is related with the following code:
*{color:#e32400}Here we are trying to find the max index of part file, that 
doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
involved into path assembly. This means, that path always doesn’t exist{color}*
*{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*Path 
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);while (fs.exists(partPath) ||
 fs.exists(getPendingPathFor(partPath)) ||
 fs.exists(getInProgressPathFor(partPath))) {
 bucketState.partCounter++;
 partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

 *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
but it should be already appended, before index checks{color}* if (partSuffix 
!= null) {
 partPath = partPath.suffix(partSuffix);
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9596) Ability to configure logging using single log4j file

2018-06-15 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9596:
--
Summary: Ability to configure logging using single log4j file  (was: 
Ability to configure logging using the same log4j file)

> Ability to configure logging using single log4j file
> 
>
> Key: FLINK-9596
> URL: https://issues.apache.org/jira/browse/FLINK-9596
> Project: Flink
>  Issue Type: New Feature
>Reporter: Rinat Sharipov
>Priority: Trivial
>
> when we are using logback, we need only one file, but in case of log4j we 
> should manage three separate files
> maybe it's possible to use the same log4j.properties file to configure 
> logging for all of:
>  * cli
>  * tm/jm
>  * yarn-session
> processes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9596) Ability to configure logging using the same log4j file

2018-06-15 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-9596:
-

 Summary: Ability to configure logging using the same log4j file
 Key: FLINK-9596
 URL: https://issues.apache.org/jira/browse/FLINK-9596
 Project: Flink
  Issue Type: New Feature
Reporter: Rinat Sharipov


when we are using logback, we need only one file, but in case of log4j we 
should manage three separate files

maybe it's possible to use the same log4j.properties file to configure logging 
for all of:
 * cli
 * tm/jm
 * yarn-session

processes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-15 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513555#comment-16513555
 ] 

Rinat Sharipov commented on FLINK-9592:
---

[~kkl0u], I will be glad to help )) Thx

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink 
> 

[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-14 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9592:
--
Description: 
Hi mates, I got a proposal about functionality of BucketingSink.
  
 During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
 Unfortunately such behaviour is currently not available for us. 
  
 We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...
  
 It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
 What do you think, should I make a PR ?

Sincerely yours,
 *Rinat Sharipov*
 Software Engineer at 1DMP CORE Team
  
 email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
 mobile: +7 (925) 416-37-26
 Clever{color:#4f8f00}DATA{color}
 make your data clever
  
 

  
 Hi,
 I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.
 One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 
 Piotrek
 
  
 Hi guys, thx for your reply. 
 The following code info is actual for *release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
  
 For now, BucketingSink has the following lifecycle of files
  
 When moving files from opened to pending state:
 # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
exist, and contain opened file, in case, when opened file doesn’t exist, we 
create one, and write item to it
 # on each item (*method* *invoke:434* *line*), we check that suitable opened 
file doesn’t exceed the limits, and if limits are exceeded, we close it and 
move into pending state using *closeCurrentPartFile:568 line - private method*
 # on each timer request (*onProcessingTime:482 line*), we check, if items 
haven't been added to the opened file longer, than specified period of time, we 
close it, using the same private method *closeCurrentPartFile:588 line*

 
 So, the only way, that we have, is to call our hook from 
*closeCurrentPartFile*, that is private, so we copy-pasted the current impl and 
injected our logic there
  
  
 Files are moving from pending state into final, during checkpointing 
lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and contains 
a lot of logic, including discovery of files in pending states, synchronization 
of state access and it’s modification, etc … 
  
 So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
 for which files state have been changed.
  
 To solve such problem, we've created the following interface
  
 /**
 * The \{@code FileStateChangeCallback}is used to perform any additional 
operations, when
{@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of \{@code BucketingSink}, look

 * through it's official documentation.
 */
 public interface FileStateChangeCallback extends Serializable \{ /** * Used to 
perform any additional operations, related with moving of file into next state. 
* * @param fs provides access for working with file system * @param path path 
to the file, moved into next state * * @throws IOException if something went 
wrong, while performing any operations with file system */ void call(FileSystem 
fs, Path path) throws IOException; }
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner
  
 public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
 public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
\{...}
  
 I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.
  
 Thx for your help, mates =)
  [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
  

Sincerely yours,
 *Rinat Sharipov*
 Software Engineer at 1DMP CORE Team
  
 email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
 mobile: +7 (925) 416-37-26
 Clever{color:#4f8f00}DATA{color}
 make your data clever
  
 

[jira] [Created] (FLINK-9592) Notify on moving into pending/ final state

2018-06-14 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-9592:
-

 Summary: Notify on moving into pending/ final state
 Key: FLINK-9592
 URL: https://issues.apache.org/jira/browse/FLINK-9592
 Project: Flink
  Issue Type: New Feature
  Components: filesystem-connector
Reporter: Rinat Sharipov


Hi mates, I got a proposal about functionality of BucketingSink.
 
During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
Unfortunately such behaviour is currently not available for us. 
 
We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...
 
It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
What do you sink, should I make a PR ?


Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team
 
email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
mobile: +7 (925) 416-37-26
Clever{color:#4f8f00}DATA{color}
make your data clever
 

 
Hi,
I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.
One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 
Piotrek

 
Hi guys, thx for your reply. 
The following code info is actual for *release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
 
For now, BucketingSink has the following lifecycle of files
 
When moving files from opened to pending state:
 # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
exist, and contain opened file, in case, when opened file doesn’t exist, we 
create one, and write item to it
 # on each item (*method* *invoke:434* *line*), we check that suitable opened 
file doesn’t exceed the limits, and if limits are exceeded, we close it and 
move into pending state using *closeCurrentPartFile:568 line - private method*
 # on each timer request (*onProcessingTime:482 line*), we check, if items 
haven't been added to the opened file longer, than specified period of time, we 
close it, using the same private method *closeCurrentPartFile:588 line*

 
So, the only way, that we have, is to call our hook from 
*closeCurrentPartFile*, that is private, so we copy-pasted the current impl and 
injected our logic there
 
 
Files are moving from pending state into final, during checkpointing lifecycle, 
in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of 
logic, including discovery of files in pending states, synchronization of state 
access and it’s modification, etc … 
 
So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
for which files state have been changed.
 
To solve such problem, we've created the following interface
 
/**
 * The {@code FileStateChangeCallback} is used to perform any additional 
operations, when {@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of {@code BucketingSink}, look
 * through it's official documentation.
 */
public interface FileStateChangeCallback extends Serializable {

 /**
 * Used to perform any additional operations, related with moving of file into 
next state.
 *
 * @param fs provides access for working with file system
 * @param path path to the file, moved into next state
 *
 * @throws IOException if something went wrong, while performing any operations 
with file system
 */
 void call(FileSystem fs, Path path) throws IOException;
}
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner
 
public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
\{...}
 
I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.
 
Thx for your help, mates =)
 [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
 


Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team
 
email: 

[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-14 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9592:
--
Summary: Notify on moving file into pending/ final state  (was: Notify on 
moving into pending/ final state)

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The {@code FileStateChangeCallback} is used to perform any additional 
> operations, when {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of {@code BucketingSink}, look
>  * through it's official documentation.
>  */
> public interface FileStateChangeCallback extends Serializable {
>  /**
>  * Used to perform any additional operations, related with moving of file 
> into next state.
>  *
>  * @param fs provides access for working with file system
>  * @param path path to the file, moved into next state
>  *
>  * @throws IOException if something went wrong, while performing any 
> operations with file system
>  */
>  void call(FileSystem fs, Path path) throws IOException;
> }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink 
> 

[jira] [Updated] (FLINK-9558) Memory leaks during usage of bucketing-sink with disabled checkpointing

2018-06-08 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9558:
--
Description: 
Hi mates, we have some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, it 
not a big problem for us, if some files are remained open in case of job 
reloading.
  
 Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
  
 During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
  
 I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
  
 Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.

As for me, it seems, that we should at least document such behaviour, or 
implement any fail-fast implementation, that wouldn't work in env with disabled 
checkpointing

 

  was:
Hi mates, we have some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, it 
not a big problem for us, if some files are remained opened in case of job 
reloading.
  
 Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
  
 During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
  
 I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
  
 Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.

As for me, it seems, that we should at least document such behaviour, or 
implement any fail-fast implementation, that wouldn't work in env with disabled 
checkpointing

 


> Memory leaks during usage of bucketing-sink with disabled checkpointing
> ---
>
> Key: FLINK-9558
> URL: https://issues.apache.org/jira/browse/FLINK-9558
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.3.0
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, we have some Flink jobs, that are writing data from kafka into 
> hdfs, using Bucketing-Sink.
>  For some reasons, those 

[jira] [Updated] (FLINK-9558) Memory leaks during usage of bucketing-sink with disabled checkpointing

2018-06-08 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9558:
--
Description: 
Hi mates, we have some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, it 
not a big problem for us, if some files are remained opened in case of job 
reloading.
  
 Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
  
 During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
  
 I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
  
 Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.

As for me, it seems, that we should at least document such behaviour, or 
implement any fail-fast implementation, that wouldn't work in env with disabled 
checkpointing

 

  was:
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, it 
not a big problem for us, if some files are remained opened in case of job 
reloading.
  
 Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
  
 During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
  
 I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
  
 Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.

As for me, it seems, that we should at least document such behaviour, or 
implement any fail-fast implementation, that wouldn't work in env with disabled 
checkpointing

 


> Memory leaks during usage of bucketing-sink with disabled checkpointing
> ---
>
> Key: FLINK-9558
> URL: https://issues.apache.org/jira/browse/FLINK-9558
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.3.0
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, we have some Flink jobs, that are writing data from kafka into 
> hdfs, using Bucketing-Sink.
>  For some reasons, those 

[jira] [Updated] (FLINK-9558) Memory leaks during usage of bucketing-sink with disabled checkpointing

2018-06-08 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9558:
--
Description: 
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, it 
not a big problem for us, if some files are remained opened in case of job 
reloading.
  
 Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
  
 During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
  
 I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
  
 Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.

As for me, it seems, that we should at least document such behaviour, or 
implement any fail-fast implementation, that wouldn't work in env with disabled 
checkpointing

 

  was:
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.
 
Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
 
During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
 
I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
 
Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.
 


> Memory leaks during usage of bucketing-sink with disabled checkpointing
> ---
>
> Key: FLINK-9558
> URL: https://issues.apache.org/jira/browse/FLINK-9558
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.3.0
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
> using Bucketing-Sink.
>  For some reasons, those jobs are running without checkpointing. For now, it 
> not a big problem for us, if some files are remained opened in case of job 
> reloading.
>   
>  Periodically, those jobs fail 

[jira] [Updated] (FLINK-9558) Memory leaks during usage of bucketing-sink with disabled checkpointing

2018-06-08 Thread Rinat Sharipov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9558:
--
Summary: Memory leaks during usage of bucketing-sink with disabled 
checkpointing  (was: Memory leaks during usage with disabled checkpointing)

> Memory leaks during usage of bucketing-sink with disabled checkpointing
> ---
>
> Key: FLINK-9558
> URL: https://issues.apache.org/jira/browse/FLINK-9558
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.3.0
>Reporter: Rinat Sharipov
>Priority: Major
>
> Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
> using Bucketing-Sink.
> For some reasons, those jobs are running without checkpointing. For now, it 
> not a big problem for us, if some files are remained opened in case of job 
> reloading.
>  
> Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
> found a strange thing in the implementation of BucketingSink.
>  
> During the sink lifecycle, we have a state object, implemented as a map, 
> where key is a bucket path, and value is a state, that contains information 
> about opened files and list of pending files.
> After researching of the heap dump, I found, that those state stores 
> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
> 120 Mb.
>  
> I’ve looked through the code, and found, that we removing the buckets from 
> the state, in *notifyCheckpointComplete* method. 
> {code:java}
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>   Iterator>> bucketStatesIt = 
> state.bucketStates.entrySet().iterator();
>   while (bucketStatesIt.hasNext()) {
>        if (!bucketState.isWriterOpen &&
>bucketState.pendingFiles.isEmpty() &&
>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>// We've dealt with all the pending files and the writer for this 
> bucket is not currently open.
>// Therefore this bucket is currently inactive and we can remove it 
> from our state.
>bucketStatesIt.remove();
> }
>     }
> }
> {code}
> So, this looks like an issue, when you are using this sink in checkpointless 
> environment, because the data always added to the state, but never removed.
>  
> Of course, we could enable checkpointing, and use one of available backends, 
> but as for me, it seems like a non expected behaviour, like I have an 
> opportunity to run the job without checkpointing, but really, if I do so, I 
> got an exception in sink component.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9558) Memory leaks during usage with disabled checkpointing

2018-06-08 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-9558:
-

 Summary: Memory leaks during usage with disabled checkpointing
 Key: FLINK-9558
 URL: https://issues.apache.org/jira/browse/FLINK-9558
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.3.0
Reporter: Rinat Sharipov


Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.
 
Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
 
During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
 
I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
 
Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7963) Add ability to call trigger savepoint on flink cluster shutdown

2017-11-02 Thread Rinat Sharipov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-7963:
--
Description: 
Hi guys, I've got an idea of a little improvement for testing flink jobs.

All my jobs are written in the following manner, I've got a context class, 
which contains details of job components and information about how to wire 
them. Also I've got a bootstrap class, that initializes this context, retrieves 
flink env from there and executes it.

This approach provides an ability to implement jobs in the same manner and 
simplify job testing. All I need, to do, when writing tests is to override 
flink env with local env and override some of job components.

Everything was well, until I wanted to enable checkpointing, and implement some 
kind of business logic, that should be called, when checkpointing is triggered. 
I understood, that I would like to test this logic, and the best approach for 
me, is to trigger savepoint on flink cluster shutdown, but, when I've looked 
through the source code, I understood, that it's quite challenging and couldn't 
be realised using only configuration.

So, I would like to discuss the further proposals:

* add ability to create local env using configuration 
`org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#createLocalEnv(parallelism,
 configiuation), currently, using scala api we have only ability to specifiy 
parallelizm, but java api (that is used by scala api) contains such method
* add ability to trigger savepoint in flink mini cluster on `stop`, if such 
kind of property were specified in configuration

What do you sink about it ? As for me, it'll give as more flexibility in tests, 
and will not force us to use special test templates, such as 
`SavepointMigrationTestBase`

Thx



  was:
Hi guys, I've got an idea of a little improvement for testing flink jobs.

All my jobs are written in the following manner, I've got a context class, 
which contains details of job components and information about how to wire 
them. Also I've got a bootstrap class, that initializes this context, retrieves 
flink env from there and executes it.

This approach provides an ability to implement jobs in the same manner and 
simplify job testing. All I need, to do, when writing tests is to override 
flink env with local env and override some of job components.

Everything was well, until I wanted to enable checkpointing, and implement some 
kind of business logic, that should be called, when checkpointing is triggered. 
I understood, that I would like to test this logic, and the best approach for 
me, is to trigger savepoint on flink cluster shutdown, but, when I've looked 
through the source code, I understood, that it's quite challenging and couldn't 
be realised using only configuration.

So, I would like to discuss the further proposals:

* add ability to create local env using configuration 
`org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#createLocalEnv(parallelism,
 configiuation)`, currently, using scala api we have only ability to specifiy 
parallelizm, but java api (that is used by scala api) contains such method
* add ability to trigger savepoint in flink mini cluster on `stop`, if such 
kind of property were specified in configuration

What do you sink about it ? As for me, it'll give as more flexibility in tests, 
and will not force us to use special test templates, such as 
`SavepointMigrationTestBase`

Thx




> Add ability to call trigger savepoint on flink cluster shutdown
> ---
>
> Key: FLINK-7963
> URL: https://issues.apache.org/jira/browse/FLINK-7963
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration
>Reporter: Rinat Sharipov
>
> Hi guys, I've got an idea of a little improvement for testing flink jobs.
> All my jobs are written in the following manner, I've got a context class, 
> which contains details of job components and information about how to wire 
> them. Also I've got a bootstrap class, that initializes this context, 
> retrieves flink env from there and executes it.
> This approach provides an ability to implement jobs in the same manner and 
> simplify job testing. All I need, to do, when writing tests is to override 
> flink env with local env and override some of job components.
> Everything was well, until I wanted to enable checkpointing, and implement 
> some kind of business logic, that should be called, when checkpointing is 
> triggered. I understood, that I would like to test this logic, and the best 
> approach for me, is to trigger savepoint on flink cluster shutdown, but, when 
> I've looked through the source code, I understood, that it's quite 
> challenging and couldn't be realised using only configuration.
> So, I would like to discuss the further 

[jira] [Updated] (FLINK-7963) Add ability to call trigger savepoint on flink cluster shutdown

2017-11-02 Thread Rinat Sharipov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-7963:
--
Description: 
Hi guys, I've got an idea of a little improvement for testing flink jobs.

All my jobs are written in the following manner, I've got a context class, 
which contains details of job components and information about how to wire 
them. Also I've got a bootstrap class, that initializes this context, retrieves 
flink env from there and executes it.

This approach provides an ability to implement jobs in the same manner and 
simplify job testing. All I need, to do, when writing tests is to override 
flink env with local env and override some of job components.

Everything was well, until I wanted to enable checkpointing, and implement some 
kind of business logic, that should be called, when checkpointing is triggered. 
I understood, that I would like to test this logic, and the best approach for 
me, is to trigger savepoint on flink cluster shutdown, but, when I've looked 
through the source code, I understood, that it's quite challenging and couldn't 
be realised using only configuration.

So, I would like to discuss the further proposals:

* add ability to create local env using configuration 
`org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#createLocalEnv(parallelism,
 configiuation)`, currently, using scala api we have only ability to specifiy 
parallelizm, but java api (that is used by scala api) contains such method
* add ability to trigger savepoint in flink mini cluster on `stop`, if such 
kind of property were specified in configuration

What do you sink about it ? As for me, it'll give as more flexibility in tests, 
and will not force us to use special test templates, such as 
`SavepointMigrationTestBase`

Thx



  was:
Hi guys, I've got an idea of a little improvement for testing flink jobs.
All my jobs are written in further style. 

I've got a some kind of context, in which all my components, used by job are 
initialized, also I've a some kind of a bootstrap, that wires all components 
from context, looks for a flink streaming environment component and runs job, 
using it.

This approach provides an ability to implement all jobs in the same manner and 
simplify job testing. All I need, is to override some of context components and 
use local stream env instead of stream execution environment.

Everything was quite well, until I wanted to enable checkpointing, and 
implement some kind of business logic, that is called when checkpointing is 
triggered. I understood, that I would like to test this logic, and the best 
approach for me, is to trigger savepoint on flink cluster shutdown, but, when 
I've looked through the source code, I understood, that it's quite challenging 
and couldn't be realised using configuration.

So, I would like to discuss the further proposals:

* add ability to create local env using specified configuration, add method 
`org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#createLocalEnv(parallelism,
 configiuation)`
* provide an ability to trigger savepoint in flink mini cluster on `stop`, if 
such kind of property were specified in configuration

What do you sink about it ? As for me, it'll give as more flexibility in tests, 
and will not force us to use `SavepointMigrationTestBase` such kind of test 
templates

Thx




> Add ability to call trigger savepoint on flink cluster shutdown
> ---
>
> Key: FLINK-7963
> URL: https://issues.apache.org/jira/browse/FLINK-7963
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration
>Reporter: Rinat Sharipov
>
> Hi guys, I've got an idea of a little improvement for testing flink jobs.
> All my jobs are written in the following manner, I've got a context class, 
> which contains details of job components and information about how to wire 
> them. Also I've got a bootstrap class, that initializes this context, 
> retrieves flink env from there and executes it.
> This approach provides an ability to implement jobs in the same manner and 
> simplify job testing. All I need, to do, when writing tests is to override 
> flink env with local env and override some of job components.
> Everything was well, until I wanted to enable checkpointing, and implement 
> some kind of business logic, that should be called, when checkpointing is 
> triggered. I understood, that I would like to test this logic, and the best 
> approach for me, is to trigger savepoint on flink cluster shutdown, but, when 
> I've looked through the source code, I understood, that it's quite 
> challenging and couldn't be realised using only configuration.
> So, I would like to discuss the further proposals:
> * add ability to create local env using configuration 
> 

[jira] [Created] (FLINK-7963) Add ability to call trigger savepoint on flink cluster shutdown

2017-11-02 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-7963:
-

 Summary: Add ability to call trigger savepoint on flink cluster 
shutdown
 Key: FLINK-7963
 URL: https://issues.apache.org/jira/browse/FLINK-7963
 Project: Flink
  Issue Type: New Feature
  Components: Configuration
Reporter: Rinat Sharipov


Hi guys, I've got an idea of a little improvement for testing flink jobs.
All my jobs are written in further style. 

I've got a some kind of context, in which all my components, used by job are 
initialized, also I've a some kind of a bootstrap, that wires all components 
from context, looks for a flink streaming environment component and runs job, 
using it.

This approach provides an ability to implement all jobs in the same manner and 
simplify job testing. All I need, is to override some of context components and 
use local stream env instead of stream execution environment.

Everything was quite well, until I wanted to enable checkpointing, and 
implement some kind of business logic, that is called when checkpointing is 
triggered. I understood, that I would like to test this logic, and the best 
approach for me, is to trigger savepoint on flink cluster shutdown, but, when 
I've looked through the source code, I understood, that it's quite challenging 
and couldn't be realised using configuration.

So, I would like to discuss the further proposals:

* add ability to create local env using specified configuration, add method 
`org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#createLocalEnv(parallelism,
 configiuation)`
* provide an ability to trigger savepoint in flink mini cluster on `stop`, if 
such kind of property were specified in configuration

What do you sink about it ? As for me, it'll give as more flexibility in tests, 
and will not force us to use `SavepointMigrationTestBase` such kind of test 
templates

Thx





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)