[FLINK-5129] [distributed runtime] BlobCache to directly accesses Blobs from 
distrinbuted file system if possible

This closes #3084


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f544d83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f544d83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f544d83

Branch: refs/heads/master
Commit: 9f544d83b3443cf33f5890efdb956678847d445f
Parents: e68ee5c
Author: Nico Kruber <n...@data-artisans.com>
Authored: Tue Nov 22 12:49:03 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100

----------------------------------------------------------------------
 .../handlers/TaskManagerLogHandler.java         |   2 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 269 +++++++++++--------
 .../apache/flink/runtime/blob/BlobClient.java   |   3 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  56 +---
 .../runtime/blob/BlobServerConnection.java      |   8 +
 .../apache/flink/runtime/blob/BlobStore.java    |  29 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |  75 +++---
 .../flink/runtime/blob/FileSystemBlobStore.java |  34 +--
 .../flink/runtime/blob/VoidBlobStore.java       |   9 +-
 .../apache/flink/runtime/client/JobClient.java  |   8 +-
 .../librarycache/BlobLibraryCacheManager.java   |  13 +-
 .../highavailability/ZookeeperHaServices.java   |  20 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  19 +-
 .../runtime/blob/BlobCacheRetriesTest.java      |  86 +++++-
 .../runtime/blob/BlobCacheSuccessTest.java      |  76 +++++-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  31 +--
 .../runtime/blob/BlobServerDeleteTest.java      |  66 ++---
 .../flink/runtime/blob/BlobServerRangeTest.java |   1 +
 .../flink/runtime/blob/BlobUtilsTest.java       |   6 +-
 .../BlobLibraryCacheRecoveryITCase.java         |  18 +-
 20 files changed, 498 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 78c4455..6583d3b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -150,7 +150,7 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                        scala.concurrent.Future<Object> portFuture = 
jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
                        scala.concurrent.Future<BlobCache> cacheFuture = 
portFuture.map(new Mapper<Object, BlobCache>() {
                                @Override
-                               public BlobCache apply(Object result) {
+                               public BlobCache checkedApply(Object result) 
throws IOException {
                                        Option<String> hostOption = 
jobManager.actor().path().address().host();
                                        String host = hostOption.isDefined() ? 
hostOption.get() : "localhost";
                                        int port = (int) result;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 7ef1f04..2587b15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
-
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -35,10 +35,17 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The BLOB cache implements a local cache for content-addressable BLOBs. When 
requesting BLOBs through the
- * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve 
the file from its local cache. Only if the
- * local cache does not contain the desired BLOB, the BLOB cache will try to 
download it from the BLOB server.
+ * The BLOB cache implements a local cache for content-addressable BLOBs.
+ *
+ * <p>When requesting BLOBs through the {@link BlobCache#getURL} methods, the
+ * BLOB cache will first attempt to serve the file from its local cache. Only 
if
+ * the local cache does not contain the desired BLOB, the BLOB cache will try 
to
+ * download it from a distributed file system (if available) or the BLOB
+ * server.</p>
  */
 public final class BlobCache implements BlobService {
 
@@ -47,8 +54,12 @@ public final class BlobCache implements BlobService {
 
        private final InetSocketAddress serverAddress;
 
+       /** Root directory for local file storage */
        private final File storageDir;
 
+       /** Blob store for distributed file storage, e.g. in HA */
+       private final BlobStore blobStore;
+
        private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
        /** Shutdown hook thread to ensure deletion of the storage directory. */
@@ -60,15 +71,62 @@ public final class BlobCache implements BlobService {
        /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
        private final Configuration blobClientConfig;
 
+       /**
+        * Instantiates a new BLOB cache.
+        *
+        * @param serverAddress
+        *              address of the {@link BlobServer} to use for fetching 
files from
+        * @param blobClientConfig
+        *              global configuration
+        *
+        * @throws IOException
+        *              thrown if the (local or distributed) file storage 
cannot be created or
+        *              is not usable
+        */
+       public BlobCache(InetSocketAddress serverAddress,
+                       Configuration blobClientConfig) throws IOException {
+               this(serverAddress, blobClientConfig,
+                       BlobUtils.createBlobStoreFromConfig(blobClientConfig));
+       }
 
-       public BlobCache(InetSocketAddress serverAddress, Configuration 
blobClientConfig) {
-               if (serverAddress == null || blobClientConfig == null) {
-                       throw new NullPointerException();
-               }
-
-               this.serverAddress = serverAddress;
+       /**
+        * Instantiates a new BLOB cache.
+        *
+        * @param serverAddress
+        *              address of the {@link BlobServer} to use for fetching 
files from
+        * @param blobClientConfig
+        *              global configuration
+        *      @param haServices
+        *              high availability services able to create a distributed 
blob store
+        *
+        * @throws IOException
+        *              thrown if the (local or distributed) file storage 
cannot be created or
+        *              is not usable
+        */
+       public BlobCache(InetSocketAddress serverAddress,
+               Configuration blobClientConfig, HighAvailabilityServices 
haServices) throws IOException {
+               this(serverAddress, blobClientConfig, 
haServices.createBlobStore());
+       }
 
-               this.blobClientConfig = blobClientConfig;
+       /**
+        * Instantiates a new BLOB cache.
+        *
+        * @param serverAddress
+        *              address of the {@link BlobServer} to use for fetching 
files from
+        * @param blobClientConfig
+        *              global configuration
+        * @param blobStore
+        *              (distributed) blob store file system to retrieve files 
from first
+        *
+        * @throws IOException
+        *              thrown if the (local or distributed) file storage 
cannot be created or is not usable
+        */
+       private BlobCache(
+                       final InetSocketAddress serverAddress, final 
Configuration blobClientConfig,
+                       final BlobStore blobStore) throws IOException {
+               this.serverAddress = checkNotNull(serverAddress);
+               this.blobClientConfig = checkNotNull(blobClientConfig);
+               this.blobStore = blobStore;
 
                // configure and create the storage directory
                String storageDirectory = 
blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -101,92 +159,101 @@ public final class BlobCache implements BlobService {
         * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
         */
        public URL getURL(final BlobKey requiredBlob) throws IOException {
-               if (requiredBlob == null) {
-                       throw new IllegalArgumentException("BLOB key cannot be 
null.");
-               }
+               checkArgument(requiredBlob != null, "BLOB key cannot be null.");
 
                final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
-               if (!localJarFile.exists()) {
+               if (localJarFile.exists()) {
+                       return localJarFile.toURI().toURL();
+               }
 
-                       final byte[] buf = new 
byte[BlobServerProtocol.BUFFER_SIZE];
+               // first try the distributed blob store (if available)
+               try {
+                       blobStore.get(requiredBlob, localJarFile);
+               } catch (Exception e) {
+                       LOG.info("Failed to copy from blob store. Downloading 
from BLOB server instead.", e);
+               }
 
-                       // loop over retries
-                       int attempt = 0;
-                       while (true) {
+               if (localJarFile.exists()) {
+                       return localJarFile.toURI().toURL();
+               }
 
-                               if (attempt == 0) {
-                                       LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-                               } else {
-                                       LOG.info("Downloading {} from {} (retry 
{})", requiredBlob, serverAddress, attempt);
-                               }
+               // fallback: download from the BlobServer
+               final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
 
-                               try {
-                                       BlobClient bc = null;
-                                       InputStream is = null;
-                                       OutputStream os = null;
-
-                                       try {
-                                               bc = new 
BlobClient(serverAddress, blobClientConfig);
-                                               is = bc.get(requiredBlob);
-                                               os = new 
FileOutputStream(localJarFile);
-
-                                               while (true) {
-                                                       final int read = 
is.read(buf);
-                                                       if (read < 0) {
-                                                               break;
-                                                       }
-                                                       os.write(buf, 0, read);
-                                               }
+               // loop over retries
+               int attempt = 0;
+               while (true) {
 
-                                               // we do explicitly not use a 
finally block, because we want the closing
-                                               // in the regular case to throw 
exceptions and cause the writing to fail.
-                                               // But, the closing on 
exception should not throw further exceptions and
-                                               // let us keep the root 
exception
-                                               os.close();
-                                               os = null;
-                                               is.close();
-                                               is = null;
-                                               bc.close();
-                                               bc = null;
-
-                                               // success, we finished
-                                               break;
-                                       }
-                                       catch (Throwable t) {
-                                               // we use "catch (Throwable)" 
to keep the root exception. Otherwise that exception
-                                               // it would be replaced by any 
exception thrown in the finally block
-                                               closeSilently(os);
-                                               closeSilently(is);
-                                               closeSilently(bc);
-
-                                               if (t instanceof IOException) {
-                                                       throw (IOException) t;
-                                               } else {
-                                                       throw new 
IOException(t.getMessage(), t);
+                       if (attempt == 0) {
+                               LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
+                       } else {
+                               LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
+                       }
+
+                       try {
+                               BlobClient bc = null;
+                               InputStream is = null;
+                               OutputStream os = null;
+
+                               try {
+                                       bc = new BlobClient(serverAddress, 
blobClientConfig);
+                                       is = bc.get(requiredBlob);
+                                       os = new FileOutputStream(localJarFile);
+
+                                       while (true) {
+                                               final int read = is.read(buf);
+                                               if (read < 0) {
+                                                       break;
                                                }
+                                               os.write(buf, 0, read);
                                        }
+
+                                       // we do explicitly not use a finally 
block, because we want the closing
+                                       // in the regular case to throw 
exceptions and cause the writing to fail.
+                                       // But, the closing on exception should 
not throw further exceptions and
+                                       // let us keep the root exception
+                                       os.close();
+                                       os = null;
+                                       is.close();
+                                       is = null;
+                                       bc.close();
+                                       bc = null;
+
+                                       // success, we finished
+                                       return localJarFile.toURI().toURL();
                                }
-                               catch (IOException e) {
-                                       String message = "Failed to fetch BLOB 
" + requiredBlob + " from " + serverAddress +
-                                               " and store it under " + 
localJarFile.getAbsolutePath();
-                                       if (attempt < numFetchRetries) {
-                                               attempt++;
-                                               if (LOG.isDebugEnabled()) {
-                                                       LOG.debug(message + " 
Retrying...", e);
-                                               } else {
-                                                       LOG.error(message + " 
Retrying...");
-                                               }
+                               catch (Throwable t) {
+                                       // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
+                                       // it would be replaced by any 
exception thrown in the finally block
+                                       IOUtils.closeQuietly(os);
+                                       IOUtils.closeQuietly(is);
+                                       IOUtils.closeQuietly(bc);
+
+                                       if (t instanceof IOException) {
+                                               throw (IOException) t;
+                                       } else {
+                                               throw new 
IOException(t.getMessage(), t);
                                        }
-                                       else {
-                                               LOG.error(message + " No 
retries left.", e);
-                                               throw new IOException(message, 
e);
+                               }
+                       }
+                       catch (IOException e) {
+                               String message = "Failed to fetch BLOB " + 
requiredBlob + " from " + serverAddress +
+                                       " and store it under " + 
localJarFile.getAbsolutePath();
+                               if (attempt < numFetchRetries) {
+                                       attempt++;
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug(message + " 
Retrying...", e);
+                                       } else {
+                                               LOG.error(message + " 
Retrying...");
                                        }
                                }
-                       } // end loop over retries
-               }
-
-               return localJarFile.toURI().toURL();
+                               else {
+                                       LOG.error(message + " No retries 
left.", e);
+                                       throw new IOException(message, e);
+                               }
+                       }
+               } // end loop over retries
        }
 
        /**
@@ -202,18 +269,23 @@ public final class BlobCache implements BlobService {
        }
 
        /**
-        * Deletes the file associated with the given key from the BLOB cache 
and BLOB server.
+        * Deletes the file associated with the given key from the BLOB cache 
and
+        * BLOB server.
+        *
         * @param key referring to the file to be deleted
+        * @throws IOException
+        *         thrown if an I/O error occurs while transferring the request 
to
+        *         the BLOB server or if the BLOB server cannot delete the file
         */
        public void deleteGlobal(BlobKey key) throws IOException {
-               BlobClient bc = createClient();
-               try {
-                       delete(key);
+               // delete locally
+               delete(key);
+               // then delete on the BLOB server
+               // (don't use the distributed storage directly - this way the 
blob
+               // server is aware of the delete operation, too)
+               try (BlobClient bc = createClient()) {
                        bc.delete(key);
                }
-               finally {
-                       bc.close();
-               }
        }
 
        @Override
@@ -258,19 +330,4 @@ public final class BlobCache implements BlobService {
                return this.storageDir;
        }
 
-       // 
------------------------------------------------------------------------
-       //  Miscellaneous
-       // 
------------------------------------------------------------------------
-
-       private void closeSilently(Closeable closeable) {
-               if (closeable != null) {
-                       try {
-                               closeable.close();
-                       } catch (Throwable t) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Error while closing resource 
after BLOB transfer.", t);
-                               }
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 2748967..ea90f54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -590,7 +590,8 @@ public final class BlobClient implements Closeable {
         * @param key
         *        the key to identify the BLOB
         * @throws IOException
-        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
+        *         thrown if an I/O error occurs while transferring the request 
to
+        *         the BLOB server or if the BLOB server cannot delete the file
         */
        public void delete(BlobKey key) throws IOException {
                if (key == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index d4190a7..5b00ae4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -21,10 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -49,8 +45,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * This class implements the BLOB server. The BLOB server is responsible for 
listening for incoming requests and
@@ -77,10 +73,10 @@ public class BlobServer extends Thread implements 
BlobService {
        /** Indicates whether a shutdown of server component has been 
requested. */
        private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
-       /** Is the root directory for file storage */
+       /** Root directory for local file storage */
        private final File storageDir;
 
-       /** Blob store for HA */
+       /** Blob store for distributed file storage, e.g. in HA */
        private final BlobStore blobStore;
 
        /** Set of currently running threads */
@@ -99,10 +95,11 @@ public class BlobServer extends Thread implements 
BlobService {
         * Instantiates a new BLOB server and binds it to a free network port.
         *
         * @throws IOException
-        *         thrown if the BLOB server cannot bind to a free network port
+        *              thrown if the BLOB server cannot bind to a free network 
port or if the
+        *              (local or distributed) file storage cannot be created 
or is not usable
         */
        public BlobServer(Configuration config) throws IOException {
-               this(config, createBlobStoreFromConfig(config));
+               this(config, BlobUtils.createBlobStoreFromConfig(config));
        }
 
        public BlobServer(Configuration config, HighAvailabilityServices 
haServices) throws IOException {
@@ -110,11 +107,9 @@ public class BlobServer extends Thread implements 
BlobService {
        }
 
        private BlobServer(Configuration config, BlobStore blobStore) throws 
IOException {
-               checkNotNull(config);
+               this.blobServiceConfiguration = checkNotNull(config);
                this.blobStore = checkNotNull(blobStore);
 
-               this.blobServiceConfiguration = config;
-
                // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
@@ -358,9 +353,7 @@ public class BlobServer extends Thread implements 
BlobService {
         */
        @Override
        public URL getURL(BlobKey requiredBlob) throws IOException {
-               if (requiredBlob == null) {
-                       throw new IllegalArgumentException("Required BLOB 
cannot be null.");
-               }
+               checkArgument(requiredBlob != null, "BLOB key cannot be null.");
 
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
requiredBlob);
 
@@ -450,37 +443,4 @@ public class BlobServer extends Thread implements 
BlobService {
                }
        }
 
-       private static BlobStore createBlobStoreFromConfig(Configuration 
config) throws IOException {
-               HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
-
-               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-               return new VoidBlobStore();
-               } else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
-                       final String storagePath = 
config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
-                       if (isNullOrWhitespaceOnly(storagePath)) {
-                               throw new 
IllegalConfigurationException("Configuration is missing the mandatory 
parameter: " +
-                                               
HighAvailabilityOptions.HA_STORAGE_PATH);
-                       }
-
-                       final Path path;
-                       try {
-                               path = new Path(storagePath);
-                       } catch (Exception e) {
-                               throw new IOException("Invalid path for highly 
available storage (" +
-                                               
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-                       }
-
-                       final FileSystem fileSystem;
-                       try {
-                               fileSystem = path.getFileSystem();
-                       } catch (Exception e) {
-                               throw new IOException("Could not create 
FileSystem for highly available storage (" +
-                                               
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-                       }
-
-                       return new FileSystemBlobStore(fileSystem, storagePath);
-               } else {
-                       throw new IllegalConfigurationException("Unexpected 
high availability mode '" + highAvailabilityMode + ".");
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 321fc67..13a90c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -168,6 +168,14 @@ class BlobServerConnection extends Thread {
         *         thrown if an I/O error occurs while reading/writing data 
from/to the respective streams
         */
        private void get(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+               /**
+                * Retrieve the file from the (distributed?) BLOB store and 
store it
+                * locally, then send it to the service which requested it.
+                *
+                * Instead, we could send it from the distributed store 
directly but
+                * chances are high that if there is one request, there will be 
more
+                * so a local cache makes more sense.
+                */
 
                File blobFile;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 7050338..64dc942 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 
 import java.io.File;
+import java.io.IOException;
 
 /**
  * A blob store.
@@ -32,9 +33,9 @@ public interface BlobStore {
         *
         * @param localFile The file to copy
         * @param blobKey   The ID for the file in the blob store
-        * @throws Exception If the copy fails
+        * @throws IOException If the copy fails
         */
-       void put(File localFile, BlobKey blobKey) throws Exception;
+       void put(File localFile, BlobKey blobKey) throws IOException;
 
        /**
         * Copies a local file to the blob store.
@@ -44,18 +45,18 @@ public interface BlobStore {
         * @param localFile The file to copy
         * @param jobId     The JobID part of ID for the file in the blob store
         * @param key       The String part of ID for the file in the blob store
-        * @throws Exception If the copy fails
+        * @throws IOException If the copy fails
         */
-       void put(File localFile, JobID jobId, String key) throws Exception;
+       void put(File localFile, JobID jobId, String key) throws IOException;
 
        /**
         * Copies a blob to a local file.
         *
         * @param blobKey   The blob ID
         * @param localFile The local file to copy to
-        * @throws Exception If the copy fails
+        * @throws IOException If the copy fails
         */
-       void get(BlobKey blobKey, File localFile) throws Exception;
+       void get(BlobKey blobKey, File localFile) throws IOException;
 
        /**
         * Copies a blob to a local file.
@@ -63,19 +64,23 @@ public interface BlobStore {
         * @param jobId     The JobID part of ID for the blob
         * @param key       The String part of ID for the blob
         * @param localFile The local file to copy to
-        * @throws Exception If the copy fails
+        * @throws IOException If the copy fails
         */
-       void get(JobID jobId, String key, File localFile) throws Exception;
+       void get(JobID jobId, String key, File localFile) throws IOException;
 
        /**
-        * Deletes a blob.
+        * Tries to delete a blob from storage.
+        *
+        * <p>NOTE: This also tries to delete any created directories if 
empty.</p>
         *
         * @param blobKey The blob ID
         */
        void delete(BlobKey blobKey);
 
        /**
-        * Deletes a blob.
+        * Tries to delete a blob from storage.
+        *
+        * <p>NOTE: This also tries to delete any created directories if 
empty.</p>
         *
         * @param jobId The JobID part of ID for the blob
         * @param key   The String part of ID for the blob
@@ -83,7 +88,9 @@ public interface BlobStore {
        void delete(JobID jobId, String key);
 
        /**
-        * Deletes blobs.
+        * Tries to delete all blobs for the given job from storage.
+        *
+        * <p>NOTE: This also tries to delete any created directories if 
empty.</p>
         *
         * @param jobId The JobID part of all blobs to delete
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index aeaa602..b5ba565 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,20 +21,19 @@ package org.apache.flink.runtime.blob;
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.IOUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.ZookeeperHaServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
-import java.net.URI;
 import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -68,11 +67,39 @@ public class BlobUtils {
        static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
 
        /**
+        * Creates a BlobStore based on the parameters set in the configuration.
+        *
+        * @param config
+        *              configuration to use
+        *
+        * @return a (distributed) blob store for high availability
+        *
+        * @throws IOException
+        *              thrown if the (distributed) file storage cannot be 
created
+        */
+       static BlobStore createBlobStoreFromConfig(Configuration config) throws 
IOException {
+               HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
+
+               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
+                       return new VoidBlobStore();
+               } else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
+                       return ZookeeperHaServices.createBlobStore(config);
+               } else {
+                       throw new IllegalConfigurationException("Unexpected 
high availability mode '" + highAvailabilityMode + "'.");
+               }
+       }
+
+       /**
         * Creates a storage directory for a blob service.
         *
         * @return the storage directory used by a BLOB service
+        *
+        * @throws IOException
+        *              thrown if the (local or distributed) file storage 
cannot be created or
+        *              is not usable
         */
-       static File initStorageDirectory(String storageDirectory) {
+       static File initStorageDirectory(String storageDirectory) throws
+               IOException {
                File baseDir;
                if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
                        baseDir = new 
File(System.getProperty("java.io.tmpdir"));
@@ -96,7 +123,7 @@ public class BlobUtils {
                }
 
                // max attempts exceeded to find a storage directory
-               throw new RuntimeException("Could not create storage directory 
for BLOB store in '" + baseDir + "'.");
+               throw new IOException("Could not create storage directory for 
BLOB store in '" + baseDir + "'.");
        }
 
        /**
@@ -341,7 +368,7 @@ public class BlobUtils {
         */
        static String getRecoveryPath(String basePath, BlobKey blobKey) {
                // format: $base/cache/blob_$key
-               return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX 
+ blobKey.toString());
+               return String.format("%s/cache/%s%s", basePath, 
BLOB_FILE_PREFIX, blobKey.toString());
        }
 
        /**
@@ -353,8 +380,8 @@ public class BlobUtils {
         */
        static String getRecoveryPath(String basePath, JobID jobId, String key) 
{
                // format: $base/job_$id/blob_$key
-               return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + 
jobId.toString(),
-                               BLOB_FILE_PREFIX + encodeKey(key));
+               return String.format("%s/%s%s/%s%s", basePath, JOB_DIR_PREFIX, 
jobId.toString(),
+                               BLOB_FILE_PREFIX, encodeKey(key));
        }
 
        /**
@@ -363,33 +390,7 @@ public class BlobUtils {
         * <p>The returned path can be used with the state backend for recovery 
purposes.
         */
        static String getRecoveryPath(String basePath, JobID jobId) {
-               return String.format("%s/%s", basePath, JOB_DIR_PREFIX + 
jobId.toString());
-       }
-
-       /**
-        * Copies the file from the recovery path to the local file.
-        */
-       static void copyFromRecoveryPath(String recoveryPath, File 
localBlobFile) throws Exception {
-               if (recoveryPath == null) {
-                       throw new IllegalStateException("Failed to determine 
recovery path.");
-               }
-
-               if (!localBlobFile.createNewFile()) {
-                       throw new IllegalStateException("Failed to create new 
local file to copy to");
-               }
-
-               URI uri = new URI(recoveryPath);
-               Path path = new Path(recoveryPath);
-
-               if (FileSystem.get(uri).exists(path)) {
-                       try (InputStream is = FileSystem.get(uri).open(path)) {
-                               FileOutputStream fos = new 
FileOutputStream(localBlobFile);
-                               IOUtils.copyBytes(is, fos); // closes the 
streams
-                       }
-               }
-               else {
-                       throw new IOException("Cannot find required BLOB at '" 
+ recoveryPath + "' for recovery.");
-               }
+               return String.format("%s/%s%s", basePath, JOB_DIR_PREFIX, 
jobId.toString());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 2c05002..7cfce7a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -64,16 +64,16 @@ public class FileSystemBlobStore implements BlobStore {
        // - Put 
------------------------------------------------------------------
 
        @Override
-       public void put(File localFile, BlobKey blobKey) throws Exception {
+       public void put(File localFile, BlobKey blobKey) throws IOException {
                put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
        }
 
        @Override
-       public void put(File localFile, JobID jobId, String key) throws 
Exception {
+       public void put(File localFile, JobID jobId, String key) throws 
IOException {
                put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
        }
 
-       private void put(File fromFile, String toBlobPath) throws Exception {
+       private void put(File fromFile, String toBlobPath) throws IOException {
                try (OutputStream os = fileSystem.create(new Path(toBlobPath), 
true)) {
                        LOG.debug("Copying from {} to {}.", fromFile, 
toBlobPath);
                        Files.copy(fromFile, os);
@@ -83,16 +83,16 @@ public class FileSystemBlobStore implements BlobStore {
        // - Get 
------------------------------------------------------------------
 
        @Override
-       public void get(BlobKey blobKey, File localFile) throws Exception {
+       public void get(BlobKey blobKey, File localFile) throws IOException {
                get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
        }
 
        @Override
-       public void get(JobID jobId, String key, File localFile) throws 
Exception {
+       public void get(JobID jobId, String key, File localFile) throws 
IOException {
                get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
        }
 
-       private void get(String fromBlobPath, File toFile) throws Exception {
+       private void get(String fromBlobPath, File toFile) throws IOException {
                checkNotNull(fromBlobPath, "Blob path");
                checkNotNull(toFile, "File");
 
@@ -102,17 +102,21 @@ public class FileSystemBlobStore implements BlobStore {
 
                final Path fromPath = new Path(fromBlobPath);
 
-               if (fileSystem.exists(fromPath)) {
-                       try (InputStream is = fileSystem.open(fromPath);
-                               FileOutputStream fos = new 
FileOutputStream(toFile))
-                       {
-                               LOG.debug("Copying from {} to {}.", 
fromBlobPath, toFile);
-                               IOUtils.copyBytes(is, fos); // closes the 
streams
+               boolean success = false;
+               try (InputStream is = fileSystem.open(fromPath);
+                       FileOutputStream fos = new FileOutputStream(toFile)) {
+                       LOG.debug("Copying from {} to {}.", fromBlobPath, 
toFile);
+                       IOUtils.copyBytes(is, fos); // closes the streams
+                       success = true;
+               } finally {
+                       // if the copy fails, we need to remove the target file 
because
+                       // outside code relies on a correct file as long as it 
exists
+                       if (!success) {
+                               try {
+                                       toFile.delete();
+                               } catch (Throwable ignored) {}
                        }
                }
-               else {
-                       throw new IOException(fromBlobPath + " does not 
exist.");
-               }
        }
 
        // - Delete 
---------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index ece2ac1..8606844 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 
 import java.io.File;
+import java.io.IOException;
 
 /**
  * A blob store doing nothing.
@@ -28,19 +29,19 @@ import java.io.File;
 public class VoidBlobStore implements BlobStore {
 
        @Override
-       public void put(File localFile, BlobKey blobKey) throws Exception {
+       public void put(File localFile, BlobKey blobKey) throws IOException {
        }
 
        @Override
-       public void put(File localFile, JobID jobId, String key) throws 
Exception {
+       public void put(File localFile, JobID jobId, String key) throws 
IOException {
        }
 
        @Override
-       public void get(BlobKey blobKey, File localFile) throws Exception {
+       public void get(BlobKey blobKey, File localFile) throws IOException {
        }
 
        @Override
-       public void get(JobID jobId, String key, File localFile) throws 
Exception {
+       public void get(JobID jobId, String key, File localFile) throws 
IOException {
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9f0c573..76d6d86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -209,7 +209,13 @@ public class JobClient {
                        Option<String> jmHost = 
jobManager.actor().path().address().host();
                        String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
                        InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
-                       final BlobCache blobClient = new 
BlobCache(serverAddress, config);
+                       final BlobCache blobClient;
+                       try {
+                               blobClient = new BlobCache(serverAddress, 
config);
+                       } catch (IOException e) {
+                               throw new JobRetrievalException(jobID,
+                                       "Failed to setup blob cache", e);
+                       }
 
                        final Collection<BlobKey> requiredJarFiles = 
props.requiredJarFiles();
                        final Collection<URL> requiredClasspaths = 
props.requiredClasspaths();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index c94768d..b0d5d83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -37,11 +37,12 @@ import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * For each job graph that is submitted to the system the library cache 
manager maintains
  * a set of libraries (typically JAR files) which the job requires to run. The 
library cache manager
@@ -73,7 +74,7 @@ public final class BlobLibraryCacheManager extends TimerTask 
implements LibraryC
        // 
--------------------------------------------------------------------------------------------
 
        public BlobLibraryCacheManager(BlobService blobService, long 
cleanupInterval) {
-               this.blobService = blobService;
+               this.blobService = checkNotNull(blobService);
 
                // Initializing the clean up task
                this.cleanupTimer = new Timer(true);
@@ -91,8 +92,8 @@ public final class BlobLibraryCacheManager extends TimerTask 
implements LibraryC
        @Override
        public void registerTask(JobID jobId, ExecutionAttemptID task, 
Collection<BlobKey> requiredJarFiles,
                        Collection<URL> requiredClasspaths) throws IOException {
-               Preconditions.checkNotNull(jobId, "The JobId must not be 
null.");
-               Preconditions.checkNotNull(task, "The task execution id must 
not be null.");
+               checkNotNull(jobId, "The JobId must not be null.");
+               checkNotNull(task, "The task execution id must not be null.");
 
                if (requiredJarFiles == null) {
                        requiredJarFiles = Collections.emptySet();
@@ -153,8 +154,8 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
        
        @Override
        public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
-               Preconditions.checkNotNull(jobId, "The JobId must not be 
null.");
-               Preconditions.checkNotNull(task, "The task execution id must 
not be null.");
+               checkNotNull(jobId, "The JobId must not be null.");
+               checkNotNull(task, "The task execution id must not be null.");
 
                synchronized (lockObject) {
                        LibraryCacheEntry entry = cacheEntries.get(jobId);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 25d21ef..ed0ad17 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -154,7 +154,21 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
 
        @Override
        public BlobStore createBlobStore() throws IOException {
-               final String storagePath = 
configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+               return createBlobStore(configuration);
+       }
+
+       /**
+        * Creates the BLOB store in which BLOBs are stored in a 
highly-available
+        * fashion.
+        *
+        * @param configuration configuration to extract the storage path from
+        * @return Blob store
+        * @throws IOException if the blob store could not be created
+        */
+       public static BlobStore createBlobStore(
+               final Configuration configuration) throws IOException {
+               String storagePath = configuration.getValue(
+                       HighAvailabilityOptions.HA_STORAGE_PATH);
                if (isNullOrWhitespaceOnly(storagePath)) {
                        throw new IllegalConfigurationException("Configuration 
is missing the mandatory parameter: " +
                                        
HighAvailabilityOptions.HA_STORAGE_PATH);
@@ -176,6 +190,10 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
                                        
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
                }
 
+               final String clusterId =
+                       
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+               storagePath += "/" + clusterId;
+
                return new FileSystemBlobStore(fileSystem, storagePath);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f11cb98..58bbfac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -794,11 +794,20 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                InetSocketAddress address = new 
InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
 
-               BlobCache blobCache = new BlobCache(address, 
taskManagerConfiguration.getConfiguration());
-
-               LibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(
-                       blobCache,
-                       taskManagerConfiguration.getCleanupInterval());
+               final LibraryCacheManager libraryCacheManager;
+               try {
+                       final BlobCache blobCache = new BlobCache(address, 
taskManagerConfiguration.getConfiguration(), haServices);
+                       libraryCacheManager = new BlobLibraryCacheManager(
+                               blobCache,
+                               taskManagerConfiguration.getCleanupInterval());
+               } catch (IOException e) {
+                       // Can't pass the IOException up - we need a 
RuntimeException anyway
+                       // two levels up where this is run asynchronously. 
Also, we don't
+                       // know whether this is caught in the thread running 
this method.
+                       final String message = "Could not create BLOB cache or 
library cache.";
+                       log.error(message, e);
+                       throw new RuntimeException(message, e);
+               }
 
                ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
                        jobManagerLeaderId,

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 4aa9a21..34a8a39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,22 +36,52 @@ import static org.junit.Assert.*;
  */
 public class BlobCacheRetriesTest {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        /**
         * A test where the connection fails twice and then the get operation 
succeeds.
         */
        @Test
        public void testBlobFetchRetries() {
+               final Configuration config = new Configuration();
+
+               testBlobFetchRetries(config);
+       }
 
+       /**
+        * A test where the connection fails twice and then the get operation 
succeeds
+        * (with high availability set).
+        */
+       @Test
+       public void testBlobFetchRetriesHa() {
+               final Configuration config = new Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                       temporaryFolder.getRoot().getPath());
+
+               testBlobFetchRetries(config);
+       }
+
+       /**
+        * A test where the BlobCache must use the BlobServer and the connection
+        * fails twice and then the get operation succeeds.
+        *
+        * @param config
+        *              configuration to use (the BlobCache will get some 
additional settings
+        *              set compared to this one)
+        */
+       private void testBlobFetchRetries(final Configuration config) {
                final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
 
                BlobServer server = null;
                BlobCache cache = null;
                try {
-                       final Configuration config = new Configuration();
 
                        server = new TestingFailingBlobServer(config, 2);
 
-                       final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       final InetSocketAddress
+                               serverAddress = new 
InetSocketAddress("localhost", server.getPort());
 
                        // upload some blob
                        BlobClient blobClient = null;
@@ -64,9 +97,15 @@ public class BlobCacheRetriesTest {
                                }
                        }
 
-                       cache = new BlobCache(serverAddress, config);
+                       // create a separate config for the cache with no 
access to
+                       // the (shared) storage path if available so that the 
cache
+                       // will always bother the BlobServer!
+                       final Configuration cacheConfig = new 
Configuration(config);
+                       
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                               temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
+                       cache = new BlobCache(serverAddress, cacheConfig);
 
-                       // trigger a download - it should fail on the first 
time, but retry, and succeed at the second time
+                       // trigger a download - it should fail the first two 
times, but retry, and succeed eventually
                        URL url = cache.getURL(key);
                        InputStream is = url.openStream();
                        try {
@@ -97,17 +136,44 @@ public class BlobCacheRetriesTest {
         */
        @Test
        public void testBlobFetchWithTooManyFailures() {
+               final Configuration config = new Configuration();
 
+               testBlobFetchWithTooManyFailures(config);
+       }
+
+       /**
+        * A test where the connection fails twice and then the get operation 
succeeds
+        * (with high availability set).
+        */
+       @Test
+       public void testBlobFetchWithTooManyFailuresHa() {
+               final Configuration config = new Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                       temporaryFolder.getRoot().getPath());
+
+               testBlobFetchWithTooManyFailures(config);
+       }
+
+       /**
+        * A test where the BlobCache must use the BlobServer and the connection
+        * fails too often which eventually fails the GET request.
+        *
+        * @param config
+        *              configuration to use (the BlobCache will get some 
additional settings
+        *              set compared to this one)
+        */
+       private void testBlobFetchWithTooManyFailures(final Configuration 
config) {
                final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
 
                BlobServer server = null;
                BlobCache cache = null;
                try {
-                       final Configuration config = new Configuration();
 
                        server = new TestingFailingBlobServer(config, 10);
 
-                       final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       final InetSocketAddress
+                               serverAddress = new 
InetSocketAddress("localhost", server.getPort());
 
                        // upload some blob
                        BlobClient blobClient = null;
@@ -123,7 +189,13 @@ public class BlobCacheRetriesTest {
                                }
                        }
 
-                       cache = new BlobCache(serverAddress, config);
+                       // create a separate config for the cache with no 
access to
+                       // the (shared) storage path if available so that the 
cache
+                       // will always bother the BlobServer!
+                       final Configuration cacheConfig = new 
Configuration(config);
+                       
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                               temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
+                       cache = new BlobCache(serverAddress, cacheConfig);
 
                        // trigger a download - it should fail eventually
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 7ba5a8a..db55331 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -25,9 +31,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -38,9 +41,48 @@ import static org.junit.Assert.fail;
  */
 public class BlobCacheSuccessTest {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * BlobCache with no HA. BLOBs need to be downloaded form a working
+        * BlobServer.
+        */
        @Test
        public void testBlobCache() {
+               Configuration config = new Configuration();
+               uploadFileGetTest(config, false, false);
+       }
+
+       /**
+        * BlobCache is configured in HA mode and the cache can download files 
from
+        * the file system directly and does not need to download BLOBs from the
+        * BlobServer.
+        */
+       @Test
+       public void testBlobCacheHa() {
+               Configuration config = new Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                       temporaryFolder.getRoot().getPath());
+               uploadFileGetTest(config, true, true);
+       }
 
+       /**
+        * BlobCache is configured in HA mode but the cache itself cannot 
access the
+        * file system and thus needs to download BLOBs from the BlobServer.
+        */
+       @Test
+       public void testBlobCacheHaFallback() {
+               Configuration config = new Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                       temporaryFolder.getRoot().getPath());
+               uploadFileGetTest(config, false, false);
+       }
+
+       private void uploadFileGetTest(final Configuration config, boolean 
cacheWorksWithoutServer,
+               boolean cacheHasAccessToFs) {
                // First create two BLOBs and upload them to BLOB server
                final byte[] buf = new byte[128];
                final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
@@ -50,7 +92,6 @@ public class BlobCacheSuccessTest {
                try {
 
                        // Start the BLOB server
-                       Configuration config = new Configuration();
                        blobServer = new BlobServer(config);
                        final InetSocketAddress serverAddress = new 
InetSocketAddress(blobServer.getPort());
 
@@ -69,15 +110,34 @@ public class BlobCacheSuccessTest {
                                }
                        }
 
-                       blobCache = new BlobCache(serverAddress, new 
Configuration());
+                       if (cacheWorksWithoutServer) {
+                               // Now, shut down the BLOB server, the BLOBs 
must still be accessible through the cache.
+                               blobServer.shutdown();
+                               blobServer = null;
+                       }
+
+                       final Configuration cacheConfig;
+                       if (cacheHasAccessToFs) {
+                               cacheConfig = config;
+                       } else {
+                               // just in case parameters are still read from 
the server,
+                               // create a separate configuration object for 
the cache
+                               cacheConfig = new Configuration(config);
+                               
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                                       temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
+                       }
+
+                       blobCache = new BlobCache(serverAddress, cacheConfig);
 
                        for (BlobKey blobKey : blobKeys) {
                                blobCache.getURL(blobKey);
                        }
 
-                       // Now, shut down the BLOB server, the BLOBs must still 
be accessible through the cache.
-                       blobServer.shutdown();
-                       blobServer = null;
+                       if (blobServer != null) {
+                               // Now, shut down the BLOB server, the BLOBs 
must still be accessible through the cache.
+                               blobServer.shutdown();
+                               blobServer = null;
+                       }
 
                        final URL[] urls = new URL[blobKeys.size()];
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index a8eb1d3..d043665 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -26,9 +25,9 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,27 +38,14 @@ import java.util.Arrays;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class BlobRecoveryITCase {
 
-       private File recoveryDir;
-
-       @Before
-       public void setUp() throws Exception {
-               recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
-               if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
-                       throw new IllegalStateException("Failed to create temp 
directory for test");
-               }
-       }
-
-       @After
-       public void cleanUp() throws Exception {
-               if (recoveryDir != null) {
-                       FileUtils.deleteDirectory(recoveryDir);
-               }
-       }
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        /**
         * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
JARs are recoverable from any
@@ -70,13 +56,14 @@ public class BlobRecoveryITCase {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
recoveryDir.getPath());
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getPath());
 
                testBlobServerRecovery(config);
        }
 
        public static void testBlobServerRecovery(final Configuration config) 
throws IOException {
-               String storagePath = 
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
+               final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
+               String storagePath = 
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
                Random rand = new Random();
 
                BlobServer[] server = new BlobServer[2];
@@ -84,7 +71,6 @@ public class BlobRecoveryITCase {
                BlobClient client = null;
 
                try {
-
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
@@ -165,6 +151,7 @@ public class BlobRecoveryITCase {
                        client.delete(jobId[1], testKey[1]);
 
                        // Verify everything is clean
+                       assertTrue("HA storage directory does not exist", 
fs.exists(new Path(storagePath)));
                        if (fs.exists(blobServerPath)) {
                                final org.apache.flink.core.fs.FileStatus[] 
recoveryFiles =
                                        fs.listStatus(blobServerPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 53e1d73..025a2ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -85,16 +85,7 @@ public class BlobServerDeleteTest {
                        fail(e.getMessage());
                }
                finally {
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
-                       }
-                       if (server != null) {
-                               server.shutdown();
-                       }
+                       cleanup(server, client);
                }
        }
 
@@ -157,16 +148,7 @@ public class BlobServerDeleteTest {
                        fail(e.getMessage());
                }
                finally {
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
-                       }
-                       if (server != null) {
-                               server.shutdown();
-                       }
+                       cleanup(server, client);
                }
        }
 
@@ -205,16 +187,7 @@ public class BlobServerDeleteTest {
                        fail(e.getMessage());
                }
                finally {
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
-                       }
-                       if (server != null) {
-                               server.shutdown();
-                       }
+                       cleanup(server, client);
                }
        }
 
@@ -254,16 +227,7 @@ public class BlobServerDeleteTest {
                        fail(e.getMessage());
                }
                finally {
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
-                       }
-                       if (server != null) {
-                               server.shutdown();
-                       }
+                       cleanup(server, client);
                }
        }
 
@@ -312,16 +276,20 @@ public class BlobServerDeleteTest {
                        fail(e.getMessage());
                }
                finally {
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
-                       }
-                       if (server != null) {
-                               server.shutdown();
+                       cleanup(server, client);
+               }
+       }
+
+       private void cleanup(BlobServer server, BlobClient client) {
+               if (client != null) {
+                       try {
+                               client.close();
+                       } catch (Throwable t) {
+                               t.printStackTrace();
                        }
                }
+               if (server != null) {
+                       server.shutdown();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index 36ae8cc..ea0eb94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -40,6 +40,7 @@ public class BlobServerRangeTest extends TestLogger {
                Configuration conf = new Configuration();
                conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
                BlobServer srv = new BlobServer(conf);
+               srv.shutdown();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 63ec338..081e28c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -30,6 +30,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 
 public class BlobUtilsTest {
 
@@ -55,8 +56,9 @@ public class BlobUtilsTest {
                assertTrue(blobUtilsTestDirectory.delete());
        }
 
-       @Test(expected = Exception.class)
-       public void testExceptionOnCreateStorageDirectoryFailure() {
+       @Test(expected = IOException.class)
+       public void testExceptionOnCreateStorageDirectoryFailure() throws
+               IOException {
                // Should throw an Exception
                BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, 
CANNOT_CREATE_THIS).getAbsolutePath());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index a727d51..d3925be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class BlobLibraryCacheRecoveryITCase {
 
@@ -62,12 +63,12 @@ public class BlobLibraryCacheRecoveryITCase {
                BlobCache cache = null;
                BlobLibraryCacheManager libCache = null;
 
-               try {
-                       Configuration config = new Configuration();
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
+               Configuration config = new Configuration();
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
 
+               try {
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
@@ -144,8 +145,11 @@ public class BlobLibraryCacheRecoveryITCase {
                                client.delete(keys.get(1));
                        }
 
-                       // Verify everything is clean
-                       File[] recoveryFiles = 
temporaryFolder.getRoot().listFiles();
+                       // Verify everything is clean below 
recoveryDir/<cluster_id>
+                       final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
+                       File haBlobStoreDir = new 
File(temporaryFolder.getRoot(), clusterId);
+                       File[] recoveryFiles = haBlobStoreDir.listFiles();
+                       assertNotNull("HA storage directory does not exist", 
recoveryFiles);
                        assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
                }
                finally {

Reply via email to