[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272347#comment-17272347 ] Mikhail Pryakhin commented on FLINK-6949: - Hey [~trohrmann], it works, thank you for pointing that out. > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Major > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270954#comment-17270954 ] Mikhail Pryakhin commented on FLINK-6949: - Hey [~Tang Yan] cc [~trohrmann] I confirm this option doesn't work as expected I implemented a Dummy mapper that prints out all the resources available on its classpath, see the code below. I created a local dir config and fed it with files that I expect to have on the TMs and JMs classpaths I submitted a flink job with the following command {code:java} flink-1.12.1/bin/flink run -c "org.apache.flink.StreamingJob" -m yarn-cluster yarn-ship-test-1.0-SNAPSHOT.jar -yt config {code} where -yt option ships files in the specified directory (t for transfer) according to the official doc But unfortunately all the resources contained in the config folder were not listed among available resources on the classpth. I'll implement a unit test. {code:java} public class StreamingJob { public static void main(String[] args) { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList("1", "2", "3", "4")) .map(new DummyMapper()) .print(); env.execute(); } catch (Exception e) { e.printStackTrace(System.err); } } public static class DummyMapper extends RichMapFunction { @Override public void open(Configuration parameters) throws Exception { System.out.println("Resources on the classpath:"); Arrays.stream(((URLClassLoader) getClass().getClassLoader()).getURLs()).forEach(System.out::println); } @Override public String map(String value) throws Exception { return value; } } } {code} > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Major > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17268506#comment-17268506 ] Mikhail Pryakhin commented on FLINK-6949: - Hey [~Tang Yan], thank you! will do. > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Major > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
[ https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin closed FLINK-6950. --- Resolution: Duplicate fixed by FLINK-11272 > Add ability to specify single jar files to be shipped to YARN > - > > Key: FLINK-6950 > URL: https://issues.apache.org/jira/browse/FLINK-6950 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Minor > > When deploying a flink job on YARN it is not possible to specify multiple > yarnship folders. > Often when submitting a flink job, job dependencies and job resources are > located in different local folders and both of them should be shipped to YARN > cluster. > I think it would be great to have an ability to specify jars but not folders > that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930058#comment-16930058 ] Mikhail Pryakhin commented on FLINK-6949: - Hi [~till.rohrmann], The issue happens to be already fixed by [FLINK-13127|https://issues.apache.org/jira/browse/FLINK-13127] > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Major > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
[ https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930055#comment-16930055 ] Mikhail Pryakhin commented on FLINK-6950: - [~rmetzger] Hi Robert, the issue has been fixed by [FLINK-11272|https://issues.apache.org/jira/browse/FLINK-11272] > Add ability to specify single jar files to be shipped to YARN > - > > Key: FLINK-6950 > URL: https://issues.apache.org/jira/browse/FLINK-6950 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Minor > > When deploying a flink job on YARN it is not possible to specify multiple > yarnship folders. > Often when submitting a flink job, job dependencies and job resources are > located in different local folders and both of them should be shipped to YARN > cluster. > I think it would be great to have an ability to specify jars but not folders > that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804849#comment-16804849 ] Mikhail Pryakhin commented on FLINK-6949: - Hi [~till.rohrmann], Yes, unfortunately it is still an issue. Sorry for being inactive for a long time, I hope to contribute to a solution soon > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Major > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10694) ZooKeeperHaServices Cleanup
[ https://issues.apache.org/jira/browse/FLINK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-10694: - Description: When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when a job manager has stopped [2]. And it seems it only cleans up the *running_job_registry* folder, other folders stay untouched. I suppose that everything under the *///* folder should be cleaned up when the job is cancelled. [1] [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107] [2] [https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332] was: When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. And it seems it only cleans up the folder *running_job_registry*, other folders stay untouched. I suppose that everything under the *///* folder is cleaned up when the job is cancelled. [1]
[jira] [Updated] (FLINK-10694) ZooKeeperRunningJobsRegistry Cleanup
[ https://issues.apache.org/jira/browse/FLINK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-10694: - Description: When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. And it seems it only cleans up the folder *running_job_registry*, other folders stay untouched. I suppose that everything under the *///* folder is cleaned up when the job is cancelled. [1] [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107] [2] [https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332] was: When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled the some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. And it seems it only cleans up the folder *running_job_registry*, other folders stay untouched. I suppose that everything under the *///* folder is cleaned up when the job is cancelled. [1]
[jira] [Updated] (FLINK-10694) ZooKeeperRunningJobsRegistry Cleanup
[ https://issues.apache.org/jira/browse/FLINK-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-10694: - Description: When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled the some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. And it seems it only cleans up the folder *running_job_registry*, other folders stay untouched. I suppose that everything under the *///* folder is cleaned up when the job is cancelled. [1] [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107] [2] [https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332] was: When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled the some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. And it seems it only cleans up the folder running_job_registry, other folders stay untouched. I supposed that everything under the *///* folder is cleaned up when the job is cancelled. [1]
[jira] [Created] (FLINK-10694) ZooKeeperRunningJobsRegistry Cleanup
Mikhail Pryakhin created FLINK-10694: Summary: ZooKeeperRunningJobsRegistry Cleanup Key: FLINK-10694 URL: https://issues.apache.org/jira/browse/FLINK-10694 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.6.1 Reporter: Mikhail Pryakhin When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it? zookeeper version 3.4.10 flink version 1.6.1 # The job is deployed as a YARN cluster with the following properties set {noformat} high-availability: zookeeper high-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:/// high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: {noformat} # The job is cancelled via flink cancel command. What I've noticed: when the job is running the following directory structure is created in zookeeper {noformat} ///leader/resource_manager_lock ///leader/rest_server_lock ///leader/dispatcher_lock ///leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/041 ///checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde ///running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde {noformat} when the job is cancelled the some ephemeral nodes disappear, but most of them are still there: {noformat} ///leader/5c21f00b9162becf5ce25a1cf0e67cde ///leaderlatch/resource_manager_lock ///leaderlatch/rest_server_lock ///leaderlatch/dispatcher_lock ///leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock ///checkpoints/ ///checkpoint-counter/ ///running_job_registry/ {noformat} Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. And it seems it only cleans up the folder running_job_registry, other folders stay untouched. I supposed that everything under the *///* folder is cleaned up when the job is cancelled. [1] [https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107] [2] [https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-9592: Description: Hi mates, I got a proposal about functionality of BucketingSink. During implementation of one of our tasks we got the following need - create a meta-file, with the path and additional information about the file, created by BucketingSink, when it’s been moved into final place. Unfortunately such behaviour is currently not available for us. We’ve implemented our own Sink, that provides an opportunity to register notifiers, that will be called, when file state is changing, but current API doesn’t allow us to add such behaviour using inheritance ... It seems, that such functionality could be useful, and could be a part of BucketingSink API What do you sink, should I make a PR ? Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever Hi, I see that could be a useful feature. What exactly now is preventing you from inheriting from BucketingSink? Maybe it would be just enough to make the BucketingSink easier extendable. One thing now that could collide with such feature is that Kostas is now working on larger BucketingSink rework/refactor. Piotrek Hi guys, thx for your reply. The following code info is actual for *release-1.5.0 tag, org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* For now, BucketingSink has the following lifecycle of files When moving files from opened to pending state: # on each item (*method* *invoke:434* *line*), we check that suitable bucket exist, and contain opened file, in case, when opened file doesn’t exist, we create one, and write item to it # on each item (*method* *invoke:434* *line*), we check that suitable opened file doesn’t exceed the limits, and if limits are exceeded, we close it and move into pending state using *closeCurrentPartFile:568 line - private method* # on each timer request (*onProcessingTime:482 line*), we check, if items haven't been added to the opened file longer, than specified period of time, we close it, using the same private method *closeCurrentPartFile:588 line* So, the only way, that we have, is to call our hook from *closeCurrentPartFile*, that is private, so we copy-pasted the current impl and injected our logic there Files are moving from pending state into final, during checkpointing lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and contains a lot of logic, including discovery of files in pending states, synchronization of state access and it’s modification, etc … So we couldn’t override it, or call super method and add some logic, because when current impl changes the state of files, it removes them from state, and we don’t have any opportunity to know, for which files state have been changed. To solve such problem, we've created the following interface /** * The \{@code FileStateChangeCallback}is used to perform any additional operations, when {@link BucketingSink} * moves file from one state to another. For more information about state management of \{@code BucketingSink}, look * through it's official documentation. */ public interface FileStateChangeCallback extends Serializable \{ /** * Used to perform any additional operations, related with moving of file into next state. * * @param fs provides access for working with file system * @param path path to the file, moved into next state * * @throws IOException if something went wrong, while performing any operations with file system */ void call(FileSystem fs, Path path) throws IOException; } And have added an ability to register this callbacks in BucketingSink impl in the following manner public BucketingSink registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...} public BucketingSink registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) \{...} I’m ready to discuss the best ways, how such hooks could be implemented in the core impl or any other improvements, that will help us to add such functionality into our extension, using public api, instead of copy-pasting the source code. Thx for your help, mates =) [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0] Sincerely yours, *Rinat Sharipov* Software Engineer at 1DMP CORE Team email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] mobile: +7 (925) 416-37-26 Clever{color:#4f8f00}DATA{color} make your data clever
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183304#comment-16183304 ] Mikhail Pryakhin commented on FLINK-6949: - I propose to add a --yarnship-files CLI option that will submit folders with files to YARN cluster and add these folders to the application classpath. At the moment the --yarnship option [traverses|https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:] the content of the specified yarnship folder and appends files to the classpath. In case a ship folder contains non *.class, *.jar or *.zip files, but for example config/property files this takes no effect if such files are appended to the application classpath. So it would be great to have an ability to add the whole folders to the application classpath. I'm going to add a PR to support this feature. > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin reassigned FLINK-6949: --- Assignee: Mikhail Pryakhin > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
[ https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin reassigned FLINK-6950: --- Assignee: Mikhail Pryakhin > Add ability to specify single jar files to be shipped to YARN > - > > Key: FLINK-6950 > URL: https://issues.apache.org/jira/browse/FLINK-6950 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Minor > > When deploying a flink job on YARN it is not possible to specify multiple > yarnship folders. > Often when submitting a flink job, job dependencies and job resources are > located in different local folders and both of them should be shipped to YARN > cluster. > I think it would be great to have an ability to specify jars but not folders > that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
[ https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182768#comment-16182768 ] Mikhail Pryakhin commented on FLINK-6950: - [~rmetzger] Hi Robert, yes I am, shall I assign the issue to myself? > Add ability to specify single jar files to be shipped to YARN > - > > Key: FLINK-6950 > URL: https://issues.apache.org/jira/browse/FLINK-6950 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Mikhail Pryakhin >Priority: Minor > > When deploying a flink job on YARN it is not possible to specify multiple > yarnship folders. > Often when submitting a flink job, job dependencies and job resources are > located in different local folders and both of them should be shipped to YARN > cluster. > I think it would be great to have an ability to specify jars but not folders > that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182562#comment-16182562 ] Mikhail Pryakhin commented on FLINK-6949: - Unfortunately, the workaround you suggested doesn't work for me. I tried to leverage the _StreamExecutionEnvironment#registerCachedFile_ method but failed because this instance is created when the application master has already been started therefore the classpath to run the application somewhere on YARN cluster has already been created by means of _org.apache.flink.yarn.YarnClusterClient_. In my case, I need to be able to pass a local folder at the moment I submit the application so that it is included in the application YARN classpath. The option you suggested works well if I need to cache a file that is available for me at the moment I want to register it (for example a file on HDFS). Is there any way we can extend _org.apache.flink.yarn.YarnClusterClient_ to pass user-specified folders to the YARN application classpath? > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-6949: Description: *The problem:* When deploying a flink job on YARN it is not possible to specify custom resource files to be shipped to YARN cluster. *The use case description:* When running a flink job on multiple environments it becomes necessary to pass environment-related configuration files to the job's runtime. It can be accomplished by packaging configuration files within the job's jar. But having tens of different environments one can easily end up packaging as many jars as there are environments. It would be great to have an ability to separate configuration files from the job artifacts. *The possible solution:* add the --yarnship-files option to flink cli to specify files that should be shipped to the YARN cluster. was: *The problem:* When deploying a flink job on YARN it is not possible to specify custom resource files to be shipped to YARN cluster. *The use case description:* When running a flink job on multiple environments it becomes necessary to pass environment-related configuration files to the job's runtime. It can be accomplished by packaging configuration files within the job's jar. But having tens of different environments one can easily end up packaging as many jar as there are environments. It would be great to have an ability to separate configuration files from the job artifacts. *The possible solution:* add the --yarnship-files option to flink cli to specify files that should be shipped to the YARN cluster. > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-6949: Priority: Critical (was: Major) > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jar as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin updated FLINK-6949: Summary: Add ability to ship custom resource files to YARN cluster (was: Make custom resource files to be shipped to YARN cluster) > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jar as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
Mikhail Pryakhin created FLINK-6950: --- Summary: Add ability to specify single jar files to be shipped to YARN Key: FLINK-6950 URL: https://issues.apache.org/jira/browse/FLINK-6950 Project: Flink Issue Type: Improvement Reporter: Mikhail Pryakhin Priority: Minor When deploying a flink job on YARN it is not possible to specify multiple yarnship folders. Often when submitting a flink job, job dependencies and job resources are located in different local folders and both of them should be shipped to YARN cluster. I think it would be great to have an ability to specify jars but not folders that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6949) Make custom resource files to be shipped to YARN cluster
Mikhail Pryakhin created FLINK-6949: --- Summary: Make custom resource files to be shipped to YARN cluster Key: FLINK-6949 URL: https://issues.apache.org/jira/browse/FLINK-6949 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.3.0 Reporter: Mikhail Pryakhin *The problem:* When deploying a flink job on YARN it is not possible to specify custom resource files to be shipped to YARN cluster. *The use case description:* When running a flink job on multiple environments it becomes necessary to pass environment-related configuration files to the job's runtime. It can be accomplished by packaging configuration files within the job's jar. But having tens of different environments one can easily end up packaging as many jar as there are environments. It would be great to have an ability to separate configuration files from the job artifacts. *The possible solution:* add the --yarnship-files option to flink cli to specify files that should be shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6912) Consider changing the RichFunction#open method signature to take no arguments.
Mikhail Pryakhin created FLINK-6912: --- Summary: Consider changing the RichFunction#open method signature to take no arguments. Key: FLINK-6912 URL: https://issues.apache.org/jira/browse/FLINK-6912 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Affects Versions: 1.3.0 Reporter: Mikhail Pryakhin Priority: Minor RichFunction#open(org.apache.flink.configuration.Configuration) method takes a Configuration instance as an argument which is always [passed as a new instance|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java#L111] bearing no configuration parameters. As I figured out it is a remnant of the past since that method signature originates from the Record API. Consider changing the RichFunction#open method signature to take no arguments as well as actualizing java docs. You can find the complete discussion [here|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RichMapFunction-setup-method-td13696.html] -- This message was sent by Atlassian JIRA (v6.4.14#64029)