[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334155#comment-17334155 ] Flink Jira Bot commented on FLINK-9592: --- This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available, stale-assigned > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system */ > void call(FileSystem fs, Path path) throws IOException; } > And have added an ability
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323532#comment-17323532 ] Flink Jira Bot commented on FLINK-9592: --- This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available, stale-assigned > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650448#comment-16650448 ] ASF GitHub Bot commented on FLINK-9592: --- kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429922404 Perfect @kent2171 ! Looking forward to your design proposal. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * *
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650447#comment-16650447 ] ASF GitHub Bot commented on FLINK-9592: --- kent2171 closed pull request #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 4f85e3cf8d5..67500ed549e 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -58,6 +58,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -113,6 +114,9 @@ * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently * pending files will be moved to {@code finished}. * + * If it's necessary to perform any additional actions, when state of the file is changed, you need to register + * the list of callbacks using {@link #registerFileStateChangedCallback(FileStateChangedCallback...)} method. + * All of them will be called in accordance with the specified order. * * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending} @@ -323,6 +327,11 @@ private transient ProcessingTimeService processingTimeService; + /** +* The list of callbacks, that should be called, when state of the file is changed. +*/ + private List fileStateChangedCallbacks = new ArrayList<>(); + /** * Creates a new {@code BucketingSink} that writes files to the given base directory. * @@ -368,6 +377,11 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi } } + public BucketingSink registerFileStateChangedCallback(FileStateChangedCallback... callbacks) { + fileStateChangedCallbacks.addAll(Arrays.asList(callbacks)); + return this; + } + @Override public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized."); @@ -598,10 +612,14 @@ private void closeCurrentPartFile(BucketState bucketState) throws Exception { Path inProgressPath = getInProgressPathFor(currentPartPath); Path pendingPath = getPendingPathFor(currentPartPath); + LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath); fs.rename(inProgressPath, pendingPath); - LOG.debug("Moving in-progress bucket {} to pending file {}", - inProgressPath, - pendingPath); + for (FileStateChangedCallback callback : fileStateChangedCallbacks) { + callback.onInProgressToPending(fs, pendingPath); + } + + LOG.debug("In-progress bucket {} successfully moved to pending file {}", inProgressPath, pendingPath); + bucketState.pendingFiles.add(currentPartPath.toString()); bucketState.currentFile = null; } @@ -702,11 +720,18 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { Path finalPath = new Path(filename); Path pendingPath = getPendingPathFor(finalPath); - fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", -
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650442#comment-16650442 ] ASF GitHub Bot commented on FLINK-9592: --- kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429921703 ok, will return with the proposal, thx @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650438#comment-16650438 ] ASF GitHub Bot commented on FLINK-9592: --- kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429920387 Hi @kent2171 ! I am not so sure if the `BucketingSink` is going to be deprecated soon. The reason that the new `StreamingFileSink` for now requires newer Hadoop versions. But specifically for this new feature, I would say to implement it on top of the new `StreamingFileSink`, as this is definitely going to be main filesystem sink in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another.
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650434#comment-16650434 ] ASF GitHub Bot commented on FLINK-9592: --- kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429919058 Hi @kl0u , thx for your time 1. totally agree with you 2. also agree, that it's very important note Also I got a question, in the latest releases we got a **StreamingFileSink**, does it mean, that **BucketingSink** will be deprecated soon, and if we need any hooks, we should implement them in **StreamingFileSink** instead of **BucketingSink** ? thx ! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647627#comment-16647627 ] ASF GitHub Bot commented on FLINK-9592: --- kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429244062 Hi @kent2171 , I had a look at the PR. I also wrote the same comment at the associated JIRA but I also include it here. In general, as I said earlier, I like the idea of having Callbacks to notify when a file changes state. As far as the design/implementation of the current PR is concerned, the following are my comments: 1) The `FileStateChangedCallback` seems to be pretty limiting, and probably designed with a specific usecase in mind. It assumes that the user would like to do sth with the underlying file system when the file changes state (e.g. write a special file). But other usecases may need to do a REST call, or update a DB, or in general communicate with another system. Given the above, I would suggest that the function should have an `open()` and a `close()` method which are called once and are responsible for allocating and freeing resources. The `open()` should potentially take the `flinkConfig` as argument and initialize any long-living resources, e.g. connections to databases, a connection to the filesystem, etc, and the close should be responsible for freeing them. This will allow the sink to accommodate a broader variety of usecases. Now for the methods themselves, I do not yet have a definite answer on what should be included as argument, but I would also include a `Context` as an argument. This will allow for future-proofing the method, as we will be able to add stuff in the `Context` if we want to expose more stuff in the future, rather than deprecating the already existing API and creating a new one. 2) IMPORTANT CONSIDERATIONS to keep in mind: all this is a "best-effort" reporting of state changes, as, for example, if a failure happens after transitioning a file to its "final" state, but before calling the hook, then you will never get the notification. This behavior is aligned with Flink's metric system, where metrics are not checkpointed. In our case though, the scenario described above is more tricky to accommodate as we are talking about integration with external systems. Let me know what you think about the above! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, >
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647600#comment-16647600 ] Kostas Kloudas commented on FLINK-9592: --- Hi [~kent2171], I had a look at the PR. In general, as I said earlier, I like the idea of having Callbacks to notify when a file changes state. As far as the design/implementation of the current PR is concerned, the following are my comments: 1) The FileStateChangedCallback seems to be pretty limitting, and probably designed with a specific usecase in mind. It assumes that the user would like to do sth with the underlying file system when the file changes state (e.g. write a special file). But other usecases may need to do a REST call, or update a DB, or in general communicate with another system. Given the above, I would suggest that the function should have an open() and a close() method which are called once and are responsible for allocating and freeing resources. The open() should potentially take the flink config as argument and initialize any long-living resources, e.g. connections to databases, a connection to the filesystem, etc, and the close should be responsible for freeing them. This will allow the sink to accommodate a broader variety of usecases. Now for the methods themselves, I do not yet have a definite answer on what should be included as argument, but I would also include a Context as an argument. This will allow for future-proofing the method, as we will be able to add stuff in the Context if we want to expose more stuff in the future, rather than deprecating the already existing API and creating a new one. 2) IMPORTANT CONSIDERATIONS to keep in mind: all this is a "best-effort" reporting of state changes, as, for example, if a failure happens after transitioning a file to its "final" state, but before calling the hook, then you will never get the notification. This behavior is alligned with Flink's metric system, where metrics are not checkpointed. In our case though, the scenario described above is more tricky to accomodate as we are talking about integration with external systems. Let me know what you think about the above! > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646693#comment-16646693 ] Kostas Kloudas commented on FLINK-9592: --- Hi [~kent2171]. Sorry for not posting the design doc earlier. That was a big mistake on my side. Currently we have a new filesystem sink, called `StreamingFileSink`. You can find it on the master branch. I will have a look at the PR just to get an idea of the proposed solution but please open a discussion in the dev mailing list so that other members of the community can comment on it. In the discussion you can include the PR and why you think that this feature is interesting and how you implemented it. This is the only way that this PR can be merged, as described in the contribution guidelines https://flink.apache.org/how-to-contribute.html. In addition you can follow the discussion about the contributions in the dev ML (search for [DISCUSS] [Contributing]). > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > *
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646230#comment-16646230 ] Rinat Sharipov commented on FLINK-9592: --- Hi [~kkl0u] , during the process of migration to the latest Flink, we've decided to try to contribute functionality that adds ability to hook the state changes in bucketing sink, so PR with this feature is available. We are very interested in having those feature in BucketingSink and are always open for discussion. Thx ! > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system */ > void call(FileSystem
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646218#comment-16646218 ] ASF GitHub Bot commented on FLINK-9592: --- kent2171 opened a new pull request #6824: FLINK-9592 [flink-connector-filesystem] added ability to add hooks for file state changing URL: https://github.com/apache/flink/pull/6824 What is the purpose of the change: This pull-request adds ability to hook the moment of file state changing Brief change log: - when file is moved from inProgress to pending state the list of pre-configured hooks will be called - when file is moved from pending into final state the list of pre-configured hooks will be called Verifying this change: The following tests verify that hooks are called in proper time: - testThatOnInProgressToPendingCallbackIsFiredWhenFilesAreMovedToPendingStateByTimeout - testThatOnInProgressToPendingCallbackIsFiredWhenFilesAreMovedToPendingStateBySize - testThatOnInProgressToPendingCallbackIsFiredWhenFunctionIsClosed - testThatOnPendingToFinalCallbackIsFiredWhenCheckpointingIsCompleted Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) The public API, i.e., is any changed class annotated with @public(Evolving): (no) The serializers: (no) The runtime per-record code paths (performance sensitive): (no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) The S3 file system connector: (yes) Documentation: Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? add documentation and usage information in the description of **org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink** class This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513558#comment-16513558 ] Kostas Kloudas commented on FLINK-9592: --- Sounds good [~kent2171]! We can further chat when the document is out ;) > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system */ > void call(FileSystem fs, Path path) throws IOException; } > And have added an ability to register this callbacks in BucketingSink impl in > the following manner > > public BucketingSink > registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} > public BucketingSink >
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513555#comment-16513555 ] Rinat Sharipov commented on FLINK-9592: --- [~kkl0u], I will be glad to help )) Thx > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system */ > void call(FileSystem fs, Path path) throws IOException; } > And have added an ability to register this callbacks in BucketingSink impl in > the following manner > > public BucketingSink > registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} > public BucketingSink >
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513547#comment-16513547 ] Kostas Kloudas commented on FLINK-9592: --- Hi [~kent2171]! Thanks for the proposal. It seems like a nice idea. We are currently thinking about how to redesign the Bucketing Sink. I will send out a design document in the next couple of weeks. > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs provides access for working with file system * @param > path path to the file, moved into next state * * @throws IOException if > something went wrong, while performing any operations with file system */ > void call(FileSystem fs, Path path) throws IOException; } > And have added an ability to register this callbacks in BucketingSink impl in > the following manner > > public BucketingSink >