[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6322


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-13 Thread Wosin
Github user Wosin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202480724
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
--- End diff --

Technically we can, but this changes the return type of the future as 
`cleanupJob` does indeed return something.


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248523
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
--- End diff --

This call should also be called if the removal of the job from the 
`SubmittedJobGraphStore` failed because it does not remove any HA files.


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248449
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+  jobManagerMetricGroup.removeJob(jobID)
--- End diff --

I think we could always execute this call independent of whether the 
removal from the `SubmittedJobGraphStore` was successful or not.


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248365
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
--- End diff --

Can't we move these this line in the future where we remove the job from 
the `SubmittedJobGraphStore`?


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202013852
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+  jobManagerMetricGroup.removeJob(jobID)
+}
+
+case scala.util.Failure(_) =>
+
+  }(context.dispatcher)
+
+  case None => None
+}
 
-jobManagerMetricGroup.removeJob(jobID)
 
--- End diff --

this line can also be removed


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread Wosin
GitHub user Wosin opened a pull request:

https://github.com/apache/flink/pull/6322

[FLINK-9575]: Remove job-related BLOBS only if the job was removed suce…

## What is the purpose of the change

Currently flink removes all blobs connected with the job, no matter if the 
job itself was removed successfully. This is not the desired behavior.

## Brief change log
- Blobs and data will be removed only if the job itself will be removed 
sucessfully

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Wosin/flink FLINK-9575

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6322


commit 1c5febebd7045e61862118649a11a85a1917a54a
Author: Wosin 
Date:   2018-07-04T08:27:54Z

FLINK-9575: Remove job-related BLOBS only if the job was removed sucessfully




---