[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-14184: --- Labels: auto-deprioritized-major auto-deprioritized-minor pull-request-available (was: auto-deprioritized-major pull-request-available stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephen Connolly >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-14184: --- Labels: auto-deprioritized-major pull-request-available stale-minor (was: auto-deprioritized-major pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephen Connolly >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available, > stale-minor > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorat
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-14184: --- Labels: auto-deprioritized-major pull-request-available (was: pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephen Connolly >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@link org.slf4j.MDC} with >* topology specific details, e.g. >* >* Runnable decorate(Runnab
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-14184: --- Labels: pull-request-available stale-major (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephen Connolly >Priority: Major > Labels: pull-request-available, stale-major > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >*
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-14184: -- Component/s: (was: Runtime / Coordination) > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephen Connolly >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@link org.slf4j.MDC} with >* topology specific details, e.g. >* >* Runnable decorate(Runnable task) { >* return () -> { >* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){ >* task.run(); >* }; >* } >* >* >* @param task // might not be the most appropriate type, I haven't >* // checked how Flink implements dispatch. May or may not >* // want a parameters argumen
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-14184: -- Component/s: (was: Runtime / Coordination) Runtime / Task > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephen Connolly >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@link org.slf4j.MDC} with >* topology specific details, e.g. >* >* Runnable decorate(Runnable task) { >* return () -> { >* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){ >* task.run(); >* }; >* } >* >* >* @param task // might not be the most appropriate type, I haven't >* // checked how Flink implements dispatch. May or may not >*
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-14184: -- Component/s: Runtime / Coordination > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Task >Reporter: Stephen Connolly >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@link org.slf4j.MDC} with >* topology specific details, e.g. >* >* Runnable decorate(Runnable task) { >* return () -> { >* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){ >* task.run(); >* }; >* } >* >* >* @param task // might not be the most appropriate type, I haven't >* // checked how Flink implements dispatch. May or may not >* // want a param
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-14184: --- Component/s: Runtime / Coordination > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Stephen Connolly >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@link org.slf4j.MDC} with >* topology specific details, e.g. >* >* Runnable decorate(Runnable task) { >* return () -> { >* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){ >* task.run(); >* }; >* } >* >* >* @param task // might not be the most appropriate type, I haven't >* // checked how Flink implements dispatch. May or may not >* // want a parameters argument
[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager
[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14184: --- Labels: pull-request-available (was: ) > Provide a stage listener API to be invoked per task manager > --- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement >Reporter: Stephen Connolly >Priority: Major > Labels: pull-request-available > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** >* Called immediately prior to the first {@link > RichFunction#open(Configuration)} >* being invoked for the topology on the current task manager JVM for this >* classloader. Will not be called again unless {#close()} has been invoked > first. >* Use this method to eagerly initialize any ClassLoader scoped resources > that >* are pooled across the stages of the topology. >* >* @param parameters // I am unsure if this makes sense >*/ > default void open(Configuration parameters) throws Exception {} > /** >* Called after the last {@link RichFunction#close()} has completed and the >* topology is effectively being stopped (for the current ClassLoader). >* This method will only be invoked if a call to {@link > #open(Configuration)} >* was attempted, and will be invoked irrespective of whether the call to >* {@link #open(Configuration)} terminated normally or exceptionally. >* Use this method to release any ClassLoader scoped resources that have > been >* pooled across the stages of the topology. >*/ > default void close() throws Exception {} > /** >* Decorate the threads that are used to invoke the stages of the topology. >* Use this method, for example, to seed the {@link org.slf4j.MDC} with >* topology specific details, e.g. >* >* Runnable decorate(Runnable task) { >* return () -> { >* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){ >* task.run(); >* }; >* } >* >* >* @param task // might not be the most appropriate type, I haven't >* // checked how Flink implements dispatch. May or may not >* // want a parameters argument also. >*/ > default Runnable decorate(Runnable task) { return task; } > }{code} > (Name