[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-06-20 Thread NicoK
Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/3512


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674262
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
-   // it would be replaced by any 
exception thrown in the finally block
-   IOUtils.closeQuietly(os);
-   IOUtils.closeQuietly(is);
-  

[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677990
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
+   if (!localFile.delete()) {
+   LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+   }
+   }
+
+   blobStore.delete(jobId, key);
+   }
+
+   /**
+* Deletes all files associated with the given job id from the storage.
+*
+* @param jobId JobID of the files in the blob store
+*/
+   @Override
+   public void deleteAll(final JobID jobId) {
+   checkArgument(jobId != null, "Job id must not be null.");
+
+   try {
+   BlobUtils.deleteJobDirectory(storageDir, jobId);
+   } catch (IOException e) {
--- End diff --

If we want to make sure we cleanup in any case, we can actually catch 
`Exception` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
-   // it would be replaced by any 
exception thrown in the finally block
-   IOUtils.closeQuietly(os);
-   IOUtils.closeQuietly(is);
-  

[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106675058
  
--- Diff: docs/setup/config.md ---
@@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the 
root dir under which the ZooKeeper HA mode will create namespace directories. 
Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in 
standalone cluster mode, or the  under YARN) Defines the 
subdirectory under the root dir where the ZooKeeper HA mode will create znodes. 
This allows to isolate multiple applications on the same ZooKeeper. Previously 
this key was named `recovery.zookeeper.path.namespace`.
+- `high-availability.cluster-id`: (Default `/default_ns` in standalone 
cluster mode, or the  under YARN) Defines the subdirectory 
under the root dir where the ZooKeeper HA mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper. Previously this key was 
named `recovery.zookeeper.path.namespace` and 
`high-availability.zookeeper.path.namespace`.
--- End diff --

I would move these into ` ### High Availability (HA)` section, because they 
are independent of ZooKeeper


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106680476
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1305,6 +1305,9 @@ class TaskManager(
 s"${task.getExecutionState} to JobManager for task 
${task.getTaskInfo.getTaskName} " +
 s"(${task.getExecutionId})")
 
+  // delete all NAME_ADDRESSABLE BLOBs
+  libraryCacheManager.get.getBlobService.deleteAll(task.getJobID)
--- End diff --

Multiple tasks of the same job run in a TaskManager. This means that tasks 
delete each others blobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
--- End diff --

From concurrency safety, it better to do `if (!delete && exists)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-10 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3512

[FLINK-6008] collection of BlobServer improvements

This PR improves the following things around the `BlobServer`/`BlobCache`:

* replaces config uptions in `config.md` with non-deprecated ones, e.g. 
`high-availability.cluster-id` and `high-availability.storageDir`
* promote `BlobStore#deleteAll(JobID)` to the `BlobService`
* extend the `BlobService` to work with `NAME_ADDRESSABLE` blobs (prepares 
for FLINK-4399]
* remove `NAME_ADDRESSABLE` blobs after job/task termination
* add more unit tests for `NAME_ADDRESSABLE` blobs
* do not fail the `BlobServer` when a delete operation fails
* general code style and docs improvements, like using 
`Preconditions.checkArgument`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6008

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3512


commit 8cfbe97df3f7c8fa268f5c19291174a99e3cf943
Author: Nico Kruber 
Date:   2016-12-20T15:49:57Z

[FLINK-6008][docs] minor improvements in the BlobService docs

commit a72b31474fd38f27e5cc582b3c2797fa51695e38
Author: Nico Kruber 
Date:   2017-01-06T17:42:58Z

[FLINK-6008][docs] update some config options to the new, non-deprecated 
ones

commit a6af4e0b393a8684984a6adada7e6eff4f99ac18
Author: Nico Kruber 
Date:   2016-12-20T17:27:13Z

[FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 69247739e127f8c941e352c07a0be6e03ecea1d1
Author: Nico Kruber 
Date:   2016-12-20T17:52:19Z

[FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs

These blobs are referenced by the job ID and a selected name instead of the
hash sum of the blob's contents. Some code was already prepared but lacked
the proper additions in further APIs. This commit adds some.

commit 9913ae86b854e1c5b3dca404824ab9a70cc32db6
Author: Nico Kruber 
Date:   2016-12-21T15:23:29Z

[FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService

commit d96e6d43ac637149e9d1077c6dee3801d30f679a
Author: Nico Kruber 
Date:   2017-03-09T17:14:02Z

[FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 6d53e3ff87110601eb1a71d60f850e6089930141
Author: Nico Kruber 
Date:   2016-12-21T16:59:27Z

[FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task 
termination

commit 5ef5a74db3f6753437b585823b037e25e23a61ba
Author: Nico Kruber 
Date:   2017-03-09T18:14:52Z

[FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access

NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the
access methods that the BlobService implementations provide. This adds tests
covering both.

commit 34857456a43ec5a2ccb5166bd379f263cd54697d
Author: Nico Kruber 
Date:   2017-03-09T17:15:08Z

[FLINK-6008] do not fail the BlobServer when delete fails

This also enables us to reuse some more code between BlobServerConnection 
and
BlobServer.

commit e55ab0f37005ef37065b8156f59e4b8db1a7b95f
Author: Nico Kruber 
Date:   2017-03-09T17:32:14Z

[FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---