[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2021-04-27 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334155#comment-17334155
 ] 

Flink Jira Bot commented on FLINK-9592:
---

This issue was marked "stale-assigned" and has not received an update in 7 
days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2021-04-16 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323532#comment-17323532
 ] 

Flink Jira Bot commented on FLINK-9592:
---

This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650448#comment-16650448
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added 
ability to hook file state changing
URL: https://github.com/apache/flink/pull/6824#issuecomment-429922404
 
 
   Perfect @kent2171 ! Looking forward to your design proposal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650447#comment-16650447
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kent2171 closed pull request #6824: [FLINK-9592][flink-connector-filesystem] 
added ability to hook file state changing
URL: https://github.com/apache/flink/pull/6824
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 4f85e3cf8d5..67500ed549e 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -58,6 +58,7 @@
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -113,6 +114,9 @@
  * a part file is closed for writing it becomes {@code pending}. When a 
checkpoint is successful the currently
  * pending files will be moved to {@code finished}.
  *
+ * If it's necessary to perform any additional actions, when state of the 
file is changed, you need to register
+ * the list of callbacks using {@link 
#registerFileStateChangedCallback(FileStateChangedCallback...)} method.
+ * All of them will be called in accordance with the specified order.
  *
  * If case of a failure, and in order to guarantee exactly-once semantics, 
the sink should roll back to the state it
  * had when that last successful checkpoint occurred. To this end, when 
restoring, the restored files in {@code pending}
@@ -323,6 +327,11 @@
 
private transient ProcessingTimeService processingTimeService;
 
+   /**
+* The list of callbacks, that should be called, when state of the file 
is changed.
+*/
+   private List fileStateChangedCallbacks = new 
ArrayList<>();
+
/**
 * Creates a new {@code BucketingSink} that writes files to the given 
base directory.
 *
@@ -368,6 +377,11 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
}
 
+   public BucketingSink 
registerFileStateChangedCallback(FileStateChangedCallback... callbacks) {
+   fileStateChangedCallbacks.addAll(Arrays.asList(callbacks));
+   return this;
+   }
+
@Override
public void initializeState(FunctionInitializationContext context) 
throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
@@ -598,10 +612,14 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
Path inProgressPath = 
getInProgressPathFor(currentPartPath);
Path pendingPath = getPendingPathFor(currentPartPath);
 
+   LOG.debug("Moving in-progress bucket {} to pending file 
{}", inProgressPath, pendingPath);
fs.rename(inProgressPath, pendingPath);
-   LOG.debug("Moving in-progress bucket {} to pending file 
{}",
-   inProgressPath,
-   pendingPath);
+   for (FileStateChangedCallback callback : 
fileStateChangedCallbacks) {
+   callback.onInProgressToPending(fs, pendingPath);
+   }
+
+   LOG.debug("In-progress bucket {} successfully moved to 
pending file {}", inProgressPath, pendingPath);
+

bucketState.pendingFiles.add(currentPartPath.toString());
bucketState.currentFile = null;
}
@@ -702,11 +720,18 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
Path finalPath 
= new Path(filename);
Path 
pendingPath = getPendingPathFor(finalPath);
 
-   
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving 
pending file {} to final location having completed checkpoint {}.",
-   

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650442#comment-16650442
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] 
added ability to hook file state changing
URL: https://github.com/apache/flink/pull/6824#issuecomment-429921703
 
 
   ok, will return with the proposal, thx @kl0u 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650438#comment-16650438
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added 
ability to hook file state changing
URL: https://github.com/apache/flink/pull/6824#issuecomment-429920387
 
 
   Hi @kent2171 ! I am not so sure if the `BucketingSink` is going to be 
deprecated soon. The reason that the new `StreamingFileSink` for now requires 
newer Hadoop versions. 
   
   But specifically for this new feature, I would say to implement it on top of 
the new `StreamingFileSink`, as this is definitely going to be main filesystem 
sink in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650434#comment-16650434
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] 
added ability to hook file state changing
URL: https://github.com/apache/flink/pull/6824#issuecomment-429919058
 
 
   Hi @kl0u , thx for your time
   
   1. totally agree with you
   2. also agree, that it's very important note
   
   Also I got a question, in the latest releases we got a 
**StreamingFileSink**, does it mean, that **BucketingSink** will be deprecated 
soon, and if we need any hooks, we should implement them in 
**StreamingFileSink** instead of **BucketingSink** ?
   
   thx !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647627#comment-16647627
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added 
ability to hook file state changing
URL: https://github.com/apache/flink/pull/6824#issuecomment-429244062
 
 
   Hi @kent2171 ,
   
   I had a look at the PR. I also wrote the same comment at the associated JIRA 
but I also include it here.
   
   In general, as I said earlier, I like the idea of having Callbacks to notify 
when a file changes state.
   As far as the design/implementation of the current PR is concerned, the 
following are my comments:
   
   1) The `FileStateChangedCallback` seems to be pretty limiting, and probably 
designed with a specific usecase in mind. It assumes that the user would like 
to do sth with the underlying file system when the file changes state (e.g. 
write a special file). But other usecases may need to do a REST call, or update 
a DB, or in general communicate with another system.  
   
   Given the above, I would suggest that the function should have an `open()` 
and a `close()` method which are called once and are responsible for allocating 
and freeing resources. The `open()` should potentially take the `flinkConfig` 
as argument and initialize any long-living resources, e.g. connections to 
databases, a connection to the filesystem, etc, and the close should be 
responsible for freeing them. This will allow the sink to accommodate a broader 
variety of usecases. Now for the methods themselves, I do not yet have a 
definite answer on what should be included as argument, but I would also 
include a `Context` as an argument. This will allow for future-proofing the 
method, as we will be able to add stuff in the `Context` if we want to expose 
more stuff in the future, rather than deprecating the already existing API and 
creating a new one.
   
   2) IMPORTANT CONSIDERATIONS to keep in mind: all this is a "best-effort" 
reporting of state changes, as, for example, if a failure happens after 
transitioning a file to its "final" state, but before calling the hook, then 
you will never get the notification. This behavior is aligned with Flink's 
metric system, where metrics are not checkpointed. In our case though, the 
scenario described above is more tricky to accommodate as we are talking about 
integration with external systems.
   
   Let me know what you think about the above!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-12 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647600#comment-16647600
 ] 

Kostas Kloudas commented on FLINK-9592:
---

Hi [~kent2171],

I had a look at the PR.
In general, as I said earlier, I like the idea of having Callbacks to notify 
when a file changes state.
As far as the design/implementation of the current PR is concerned, the 
following are my comments:

1) The FileStateChangedCallback seems to be pretty limitting, and probably 
designed with a specific usecase
in mind. It assumes that the user would like to do sth with the underlying file 
system when the file changes 
state (e.g. write a special file). But other usecases may need to do a REST 
call, or update a DB, or in 
general communicate with another system.  

Given the above, I would suggest that the function should have an open() and a 
close() method which are called 
once and are responsible for allocating and freeing resources. The open() 
should potentially take the flink config 
as argument and initialize any long-living resources, e.g. connections to 
databases, a connection to the filesystem, 
etc, and the close should be responsible for freeing them. This will allow the 
sink to accommodate a broader variety
of usecases. Now for the methods themselves, I do not yet have a 
definite answer on what should be included as argument, but I would also 
include a Context as an argument. This 
will allow for future-proofing the method, as we will be able to add stuff in 
the Context if we want to expose 
more stuff in the future, rather than deprecating the already existing API and 
creating a new one.

2) IMPORTANT CONSIDERATIONS to keep in mind: all this is a "best-effort" 
reporting of state changes, as, for example, if a 
failure happens after transitioning a file to its "final" state, but before 
calling the hook, then you will 
never get the notification. This behavior is alligned with Flink's metric 
system, where metrics are not checkpointed.
In our case though, the scenario described above is more tricky to accomodate 
as we are talking about integration 
with external systems.

Let me know what you think about the above!

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-11 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646693#comment-16646693
 ] 

Kostas Kloudas commented on FLINK-9592:
---

Hi [~kent2171]. Sorry for not posting the design doc earlier. 
That was a big mistake on my side.

Currently we have a new filesystem sink, called `StreamingFileSink`.
You can find it on the master branch.

I will have a look at the PR just to get an idea of the proposed solution but
please open a discussion in the dev mailing list so that other members of 
the community can comment on it.

In the discussion you can include the PR and why you think that  this feature 
is 
interesting and how you implemented it. 

This is the only way that this PR can be merged, as described in the 
contribution 
guidelines https://flink.apache.org/how-to-contribute.html. In addition you can 
follow the discussion about the contributions in the dev ML (search for 
[DISCUSS] [Contributing]).

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-11 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646230#comment-16646230
 ] 

Rinat Sharipov commented on FLINK-9592:
---

Hi [~kkl0u] , during the process of migration to the latest Flink, we've 
decided to try to contribute functionality that adds ability to hook the state 
changes in bucketing sink, so PR with this feature is available.

We are very interested in having those feature in BucketingSink and are always 
open for discussion.

Thx !

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646218#comment-16646218
 ] 

ASF GitHub Bot commented on FLINK-9592:
---

kent2171 opened a new pull request #6824: FLINK-9592 
[flink-connector-filesystem] added ability to add hooks for file state changing
URL: https://github.com/apache/flink/pull/6824
 
 
   What is the purpose of the change:
   This pull-request adds ability to hook the moment of file state changing
   
   Brief change log:
   
   - when file is moved from inProgress to pending state the list of 
pre-configured hooks will be called
   - when file is moved from pending into final state the list of 
pre-configured hooks will be called
   
   Verifying this change:
   The following tests verify that hooks are called in proper time:
   
   - 
testThatOnInProgressToPendingCallbackIsFiredWhenFilesAreMovedToPendingStateByTimeout
   - 
testThatOnInProgressToPendingCallbackIsFiredWhenFilesAreMovedToPendingStateBySize
   - testThatOnInProgressToPendingCallbackIsFiredWhenFunctionIsClosed
   - testThatOnPendingToFinalCallbackIsFiredWhenCheckpointingIsCompleted
   
   Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): (no)
   The public API, i.e., is any changed class annotated with @public(Evolving): 
(no)
   The serializers: (no)
   The runtime per-record code paths (performance sensitive): (no)
   Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
   The S3 file system connector: (yes)
   
   Documentation:
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? add documentation and usage 
information in the description of 
**org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink** class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-15 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513558#comment-16513558
 ] 

Kostas Kloudas commented on FLINK-9592:
---

Sounds good [~kent2171]! We can further chat when the document is out ;)

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink 
> 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-15 Thread Rinat Sharipov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513555#comment-16513555
 ] 

Rinat Sharipov commented on FLINK-9592:
---

[~kkl0u], I will be glad to help )) Thx

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink 
> 

[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state

2018-06-15 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513547#comment-16513547
 ] 

Kostas Kloudas commented on FLINK-9592:
---

Hi [~kent2171]! Thanks for the proposal. It seems like a nice idea.

We are currently thinking about how to redesign the Bucketing Sink. 

I will send out a design document in the next couple of weeks.

> Notify on moving file into pending/ final state
> ---
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Rinat Sharipov
>Assignee: Kostas Kloudas
>Priority: Major
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> 
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> 
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink 
>