[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2022-04-18 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14184:
---
  Labels: auto-deprioritized-major auto-deprioritized-minor 
pull-request-available  (was: auto-deprioritized-major pull-request-available 
stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephen Connolly
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2022-04-10 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14184:
---
Labels: auto-deprioritized-major pull-request-available stale-minor  (was: 
auto-deprioritized-major pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephen Connolly
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available, 
> stale-minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorat

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2021-06-06 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14184:
---
  Labels: auto-deprioritized-major pull-request-available  (was: 
pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephen Connolly
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@link org.slf4j.MDC} with
>* topology specific details, e.g.
>* 
>* Runnable decorate(Runnab

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2021-05-27 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-14184:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephen Connolly
>Priority: Major
>  Labels: pull-request-available, stale-major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* 

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2019-10-24 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-14184:
--
Component/s: (was: Runtime / Coordination)

> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephen Connolly
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@link org.slf4j.MDC} with
>* topology specific details, e.g.
>* 
>* Runnable decorate(Runnable task) {
>*   return () -> {
>* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
>*   task.run();
>* };
>* }
>* 
>*
>* @param task // might not be the most appropriate type, I haven't 
>* // checked how Flink implements dispatch. May or may not
>* // want a parameters argumen

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2019-10-23 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-14184:
--
Component/s: (was: Runtime / Coordination)
 Runtime / Task

> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephen Connolly
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@link org.slf4j.MDC} with
>* topology specific details, e.g.
>* 
>* Runnable decorate(Runnable task) {
>*   return () -> {
>* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
>*   task.run();
>* };
>* }
>* 
>*
>* @param task // might not be the most appropriate type, I haven't 
>* // checked how Flink implements dispatch. May or may not
>* 

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2019-10-23 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-14184:
--
Component/s: Runtime / Coordination

> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Task
>Reporter: Stephen Connolly
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@link org.slf4j.MDC} with
>* topology specific details, e.g.
>* 
>* Runnable decorate(Runnable task) {
>*   return () -> {
>* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
>*   task.run();
>* };
>* }
>* 
>*
>* @param task // might not be the most appropriate type, I haven't 
>* // checked how Flink implements dispatch. May or may not
>* // want a param

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2019-10-01 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-14184:
---
Component/s: Runtime / Coordination

> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Stephen Connolly
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@link org.slf4j.MDC} with
>* topology specific details, e.g.
>* 
>* Runnable decorate(Runnable task) {
>*   return () -> {
>* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
>*   task.run();
>* };
>* }
>* 
>*
>* @param task // might not be the most appropriate type, I haven't 
>* // checked how Flink implements dispatch. May or may not
>* // want a parameters argument

[jira] [Updated] (FLINK-14184) Provide a stage listener API to be invoked per task manager

2019-10-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14184:
---
Labels: pull-request-available  (was: )

> Provide a stage listener API to be invoked per task manager
> ---
>
> Key: FLINK-14184
> URL: https://issues.apache.org/jira/browse/FLINK-14184
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stephen Connolly
>Priority: Major
>  Labels: pull-request-available
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>* Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>* being invoked for the topology on the current task manager JVM for this
>* classloader. Will not be called again unless {#close()} has been invoked 
> first.
>* Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>* are pooled across the stages of the topology.
>*
>* @param parameters // I am unsure if this makes sense
>*/ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>* Called after the last {@link RichFunction#close()} has completed and the
>* topology is effectively being stopped (for the current ClassLoader).
>* This method will only be invoked if a call to {@link 
> #open(Configuration)}
>* was attempted, and will be invoked irrespective of whether the call to
>* {@link #open(Configuration)} terminated normally or exceptionally.
>* Use this method to release any ClassLoader scoped resources that have 
> been
>* pooled across the stages of the topology.
>*/
>   default void close() throws Exception {}
>   /**
>* Decorate the threads that are used to invoke the stages of the topology.
>* Use this method, for example, to seed the {@link org.slf4j.MDC} with
>* topology specific details, e.g.
>* 
>* Runnable decorate(Runnable task) {
>*   return () -> {
>* try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
>*   task.run();
>* };
>* }
>* 
>*
>* @param task // might not be the most appropriate type, I haven't 
>* // checked how Flink implements dispatch. May or may not
>* // want a parameters argument also. 
>*/
>   default Runnable decorate(Runnable task) { return task; }
> }{code}
> (Name