[2/2] flink git commit: [FLINK-9381] Release blobs after job termination
[FLINK-9381] Release blobs after job termination Properly remove job blobs from BlobServer after the job terminates. If the job reaches a globally terminal state, then the HA blob store files will also be cleared. In case of a suspension or that the job is not finished (e.g. another process finsihes the job concurrently), we only remove the local blob server files. Additionally, we properly release the user code class loader registered in the JobManagerRunner when it closes. This closes #6030. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fded5f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fded5f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fded5f4 Branch: refs/heads/release-1.5 Commit: 7fded5f4ed20fb3622ed9bcccab8484c534f12e5 Parents: da5b6d7 Author: Till RohrmannAuthored: Thu May 17 08:58:07 2018 +0200 Committer: Till Rohrmann Committed: Thu May 17 11:33:25 2018 +0200 -- .../org/apache/flink/runtime/blob/BlobKey.java | 2 +- .../apache/flink/runtime/blob/BlobServer.java | 8 +- .../flink/runtime/dispatcher/Dispatcher.java| 51 +-- .../librarycache/BlobLibraryCacheManager.java | 8 + .../librarycache/LibraryCacheManager.java | 11 + .../runtime/jobmaster/JobManagerRunner.java | 5 +- .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../runtime/blob/BlobCacheCleanupTest.java | 2 +- .../runtime/blob/BlobServerDeleteTest.java | 6 +- .../runtime/blob/BlobServerRecoveryTest.java| 4 +- .../flink/runtime/blob/TestingBlobStore.java| 73 + .../runtime/blob/TestingBlobStoreBuilder.java | 65 .../DispatcherResourceCleanupTest.java | 327 +++ .../BlobLibraryCacheRecoveryITCase.java | 2 +- .../runtime/jobmaster/JobManagerRunnerTest.java | 22 ++ 15 files changed, 557 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7fded5f4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java index 988af8b..649992d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java @@ -37,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * A BLOB key uniquely identifies a BLOB. */ -abstract class BlobKey implements Serializable, Comparable { +public abstract class BlobKey implements Serializable, Comparable { private static final long serialVersionUID = 3847117712521785209L; http://git-wip-us.apache.org/repos/asf/flink/blob/7fded5f4/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 92d1135..dd0155c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -801,11 +801,13 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma * * @param jobId * ID of the job this blob belongs to +* @param cleanupBlobStoreFiles +* True if the corresponding blob store files shall be cleaned up as well. Otherwise false. * * @return true if the job directory is successfully deleted or non-existing; * false otherwise */ - public boolean cleanupJob(JobID jobId) { + public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { checkNotNull(jobId); final File jobDir = @@ -830,8 +832,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma jobDir.getAbsolutePath(), e); } - // delete in HA store - boolean deletedHA = blobStore.deleteAll(jobId); + // delete in HA blob store files + final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId); return deletedLocally && deletedHA; } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/7fded5f4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
[2/2] flink git commit: [FLINK-9381] Release blobs after job termination
[FLINK-9381] Release blobs after job termination Properly remove job blobs from BlobServer after the job terminates. If the job reaches a globally terminal state, then the HA blob store files will also be cleared. In case of a suspension or that the job is not finished (e.g. another process finsihes the job concurrently), we only remove the local blob server files. Additionally, we properly release the user code class loader registered in the JobManagerRunner when it closes. This closes #6030. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2735c852 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2735c852 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2735c852 Branch: refs/heads/master Commit: 2735c852b4648ae6f3e8f1e6169ef9ed6ec481d2 Parents: 8b95ba3 Author: Till RohrmannAuthored: Thu May 17 08:58:07 2018 +0200 Committer: Till Rohrmann Committed: Thu May 17 11:31:46 2018 +0200 -- .../org/apache/flink/runtime/blob/BlobKey.java | 2 +- .../apache/flink/runtime/blob/BlobServer.java | 8 +- .../flink/runtime/dispatcher/Dispatcher.java| 51 +-- .../librarycache/BlobLibraryCacheManager.java | 8 + .../librarycache/LibraryCacheManager.java | 11 + .../runtime/jobmaster/JobManagerRunner.java | 5 +- .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../runtime/blob/BlobCacheCleanupTest.java | 2 +- .../runtime/blob/BlobServerDeleteTest.java | 6 +- .../runtime/blob/BlobServerRecoveryTest.java| 4 +- .../flink/runtime/blob/TestingBlobStore.java| 73 + .../runtime/blob/TestingBlobStoreBuilder.java | 65 .../DispatcherResourceCleanupTest.java | 327 +++ .../BlobLibraryCacheRecoveryITCase.java | 2 +- .../runtime/jobmaster/JobManagerRunnerTest.java | 22 ++ 15 files changed, 557 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java index 988af8b..649992d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java @@ -37,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * A BLOB key uniquely identifies a BLOB. */ -abstract class BlobKey implements Serializable, Comparable { +public abstract class BlobKey implements Serializable, Comparable { private static final long serialVersionUID = 3847117712521785209L; http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 92d1135..dd0155c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -801,11 +801,13 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma * * @param jobId * ID of the job this blob belongs to +* @param cleanupBlobStoreFiles +* True if the corresponding blob store files shall be cleaned up as well. Otherwise false. * * @return true if the job directory is successfully deleted or non-existing; * false otherwise */ - public boolean cleanupJob(JobID jobId) { + public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { checkNotNull(jobId); final File jobDir = @@ -830,8 +832,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma jobDir.getAbsolutePath(), e); } - // delete in HA store - boolean deletedHA = blobStore.deleteAll(jobId); + // delete in HA blob store files + final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId); return deletedLocally && deletedHA; } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java