[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/3512 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); -
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677990 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } + } + + blobStore.delete(jobId, key); + } + + /** +* Deletes all files associated with the given job id from the storage. +* +* @param jobId JobID of the files in the blob store +*/ + @Override + public void deleteAll(final JobID jobId) { + checkArgument(jobId != null, "Job id must not be null."); + + try { + BlobUtils.deleteJobDirectory(storageDir, jobId); + } catch (IOException e) { --- End diff -- If we want to make sure we cleanup in any case, we can actually catch `Exception` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674837 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); -
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106675058 --- Diff: docs/setup/config.md --- @@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`. -- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`. +- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. --- End diff -- I would move these into ` ### High Availability (HA)` section, because they are independent of ZooKeeper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106680476 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1305,6 +1305,9 @@ class TaskManager( s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") + // delete all NAME_ADDRESSABLE BLOBs + libraryCacheManager.get.getBlobService.deleteAll(task.getJobID) --- End diff -- Multiple tasks of the same job run in a TaskManager. This means that tasks delete each others blobs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { --- End diff -- From concurrency safety, it better to do `if (!delete && exists)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3512 [FLINK-6008] collection of BlobServer improvements This PR improves the following things around the `BlobServer`/`BlobCache`: * replaces config uptions in `config.md` with non-deprecated ones, e.g. `high-availability.cluster-id` and `high-availability.storageDir` * promote `BlobStore#deleteAll(JobID)` to the `BlobService` * extend the `BlobService` to work with `NAME_ADDRESSABLE` blobs (prepares for FLINK-4399] * remove `NAME_ADDRESSABLE` blobs after job/task termination * add more unit tests for `NAME_ADDRESSABLE` blobs * do not fail the `BlobServer` when a delete operation fails * general code style and docs improvements, like using `Preconditions.checkArgument` You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6008 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3512.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3512 commit 8cfbe97df3f7c8fa268f5c19291174a99e3cf943 Author: Nico KruberDate: 2016-12-20T15:49:57Z [FLINK-6008][docs] minor improvements in the BlobService docs commit a72b31474fd38f27e5cc582b3c2797fa51695e38 Author: Nico Kruber Date: 2017-01-06T17:42:58Z [FLINK-6008][docs] update some config options to the new, non-deprecated ones commit a6af4e0b393a8684984a6adada7e6eff4f99ac18 Author: Nico Kruber Date: 2016-12-20T17:27:13Z [FLINK-6008] refactor BlobCache#getURL() for cleaner code commit 69247739e127f8c941e352c07a0be6e03ecea1d1 Author: Nico Kruber Date: 2016-12-20T17:52:19Z [FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs These blobs are referenced by the job ID and a selected name instead of the hash sum of the blob's contents. Some code was already prepared but lacked the proper additions in further APIs. This commit adds some. commit 9913ae86b854e1c5b3dca404824ab9a70cc32db6 Author: Nico Kruber Date: 2016-12-21T15:23:29Z [FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService commit d96e6d43ac637149e9d1077c6dee3801d30f679a Author: Nico Kruber Date: 2017-03-09T17:14:02Z [FLINK-6008] use Preconditions.checkArgument in BlobClient commit 6d53e3ff87110601eb1a71d60f850e6089930141 Author: Nico Kruber Date: 2016-12-21T16:59:27Z [FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task termination commit 5ef5a74db3f6753437b585823b037e25e23a61ba Author: Nico Kruber Date: 2017-03-09T18:14:52Z [FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the access methods that the BlobService implementations provide. This adds tests covering both. commit 34857456a43ec5a2ccb5166bd379f263cd54697d Author: Nico Kruber Date: 2017-03-09T17:15:08Z [FLINK-6008] do not fail the BlobServer when delete fails This also enables us to reuse some more code between BlobServerConnection and BlobServer. commit e55ab0f37005ef37065b8156f59e4b8db1a7b95f Author: Nico Kruber Date: 2017-03-09T17:32:14Z [FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---