[2/2] flink git commit: [FLINK-9381] Release blobs after job termination

2018-05-17 Thread trohrmann
[FLINK-9381] Release blobs after job termination

Properly remove job blobs from BlobServer after the job terminates. If the job 
reaches a globally terminal
state, then the HA blob store files will also be cleared. In case of a 
suspension or that the job is not
finished (e.g. another process finsihes the job concurrently), we only remove 
the local blob server files.

Additionally, we properly release the user code class loader registered in the 
JobManagerRunner when it
closes.

This closes #6030.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fded5f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fded5f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fded5f4

Branch: refs/heads/release-1.5
Commit: 7fded5f4ed20fb3622ed9bcccab8484c534f12e5
Parents: da5b6d7
Author: Till Rohrmann 
Authored: Thu May 17 08:58:07 2018 +0200
Committer: Till Rohrmann 
Committed: Thu May 17 11:33:25 2018 +0200

--
 .../org/apache/flink/runtime/blob/BlobKey.java  |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   8 +-
 .../flink/runtime/dispatcher/Dispatcher.java|  51 +--
 .../librarycache/BlobLibraryCacheManager.java   |   8 +
 .../librarycache/LibraryCacheManager.java   |  11 +
 .../runtime/jobmaster/JobManagerRunner.java |   5 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/blob/BlobCacheCleanupTest.java  |   2 +-
 .../runtime/blob/BlobServerDeleteTest.java  |   6 +-
 .../runtime/blob/BlobServerRecoveryTest.java|   4 +-
 .../flink/runtime/blob/TestingBlobStore.java|  73 +
 .../runtime/blob/TestingBlobStoreBuilder.java   |  65 
 .../DispatcherResourceCleanupTest.java  | 327 +++
 .../BlobLibraryCacheRecoveryITCase.java |   2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java |  22 ++
 15 files changed, 557 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7fded5f4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 988af8b..649992d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -37,7 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A BLOB key uniquely identifies a BLOB.
  */
-abstract class BlobKey implements Serializable, Comparable {
+public abstract class BlobKey implements Serializable, Comparable {
 
private static final long serialVersionUID = 3847117712521785209L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fded5f4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 92d1135..dd0155c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -801,11 +801,13 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
 *
 * @param jobId
 *  ID of the job this blob belongs to
+* @param cleanupBlobStoreFiles
+*  True if the corresponding blob store files shall be 
cleaned up as well. Otherwise false.
 *
 * @return  true if the job directory is successfully deleted 
or non-existing;
 *  false otherwise
 */
-   public boolean cleanupJob(JobID jobId) {
+   public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
checkNotNull(jobId);
 
final File jobDir =
@@ -830,8 +832,8 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
jobDir.getAbsolutePath(), e);
}
 
-   // delete in HA store
-   boolean deletedHA = blobStore.deleteAll(jobId);
+   // delete in HA blob store files
+   final boolean deletedHA = !cleanupBlobStoreFiles || 
blobStore.deleteAll(jobId);
 
return deletedLocally && deletedHA;
} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fded5f4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java

[2/2] flink git commit: [FLINK-9381] Release blobs after job termination

2018-05-17 Thread trohrmann
[FLINK-9381] Release blobs after job termination

Properly remove job blobs from BlobServer after the job terminates. If the job 
reaches a globally terminal
state, then the HA blob store files will also be cleared. In case of a 
suspension or that the job is not
finished (e.g. another process finsihes the job concurrently), we only remove 
the local blob server files.

Additionally, we properly release the user code class loader registered in the 
JobManagerRunner when it
closes.

This closes #6030.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2735c852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2735c852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2735c852

Branch: refs/heads/master
Commit: 2735c852b4648ae6f3e8f1e6169ef9ed6ec481d2
Parents: 8b95ba3
Author: Till Rohrmann 
Authored: Thu May 17 08:58:07 2018 +0200
Committer: Till Rohrmann 
Committed: Thu May 17 11:31:46 2018 +0200

--
 .../org/apache/flink/runtime/blob/BlobKey.java  |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   8 +-
 .../flink/runtime/dispatcher/Dispatcher.java|  51 +--
 .../librarycache/BlobLibraryCacheManager.java   |   8 +
 .../librarycache/LibraryCacheManager.java   |  11 +
 .../runtime/jobmaster/JobManagerRunner.java |   5 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/blob/BlobCacheCleanupTest.java  |   2 +-
 .../runtime/blob/BlobServerDeleteTest.java  |   6 +-
 .../runtime/blob/BlobServerRecoveryTest.java|   4 +-
 .../flink/runtime/blob/TestingBlobStore.java|  73 +
 .../runtime/blob/TestingBlobStoreBuilder.java   |  65 
 .../DispatcherResourceCleanupTest.java  | 327 +++
 .../BlobLibraryCacheRecoveryITCase.java |   2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java |  22 ++
 15 files changed, 557 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 988af8b..649992d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -37,7 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A BLOB key uniquely identifies a BLOB.
  */
-abstract class BlobKey implements Serializable, Comparable {
+public abstract class BlobKey implements Serializable, Comparable {
 
private static final long serialVersionUID = 3847117712521785209L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 92d1135..dd0155c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -801,11 +801,13 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
 *
 * @param jobId
 *  ID of the job this blob belongs to
+* @param cleanupBlobStoreFiles
+*  True if the corresponding blob store files shall be 
cleaned up as well. Otherwise false.
 *
 * @return  true if the job directory is successfully deleted 
or non-existing;
 *  false otherwise
 */
-   public boolean cleanupJob(JobID jobId) {
+   public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
checkNotNull(jobId);
 
final File jobDir =
@@ -830,8 +832,8 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
jobDir.getAbsolutePath(), e);
}
 
-   // delete in HA store
-   boolean deletedHA = blobStore.deleteAll(jobId);
+   // delete in HA blob store files
+   final boolean deletedHA = !cleanupBlobStoreFiles || 
blobStore.deleteAll(jobId);
 
return deletedLocally && deletedHA;
} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java