[FLINK-5129] [distributed runtime] BlobCache to directly accesses Blobs from distrinbuted file system if possible
This closes #3084 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f544d83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f544d83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f544d83 Branch: refs/heads/master Commit: 9f544d83b3443cf33f5890efdb956678847d445f Parents: e68ee5c Author: Nico Kruber <n...@data-artisans.com> Authored: Tue Nov 22 12:49:03 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sat Feb 18 19:19:34 2017 +0100 ---------------------------------------------------------------------- .../handlers/TaskManagerLogHandler.java | 2 +- .../apache/flink/runtime/blob/BlobCache.java | 269 +++++++++++-------- .../apache/flink/runtime/blob/BlobClient.java | 3 +- .../apache/flink/runtime/blob/BlobServer.java | 56 +--- .../runtime/blob/BlobServerConnection.java | 8 + .../apache/flink/runtime/blob/BlobStore.java | 29 +- .../apache/flink/runtime/blob/BlobUtils.java | 75 +++--- .../flink/runtime/blob/FileSystemBlobStore.java | 34 +-- .../flink/runtime/blob/VoidBlobStore.java | 9 +- .../apache/flink/runtime/client/JobClient.java | 8 +- .../librarycache/BlobLibraryCacheManager.java | 13 +- .../highavailability/ZookeeperHaServices.java | 20 +- .../runtime/taskexecutor/TaskExecutor.java | 19 +- .../runtime/blob/BlobCacheRetriesTest.java | 86 +++++- .../runtime/blob/BlobCacheSuccessTest.java | 76 +++++- .../flink/runtime/blob/BlobRecoveryITCase.java | 31 +-- .../runtime/blob/BlobServerDeleteTest.java | 66 ++--- .../flink/runtime/blob/BlobServerRangeTest.java | 1 + .../flink/runtime/blob/BlobUtilsTest.java | 6 +- .../BlobLibraryCacheRecoveryITCase.java | 18 +- 20 files changed, 498 insertions(+), 331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 78c4455..6583d3b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -150,7 +150,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { scala.concurrent.Future<Object> portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout); scala.concurrent.Future<BlobCache> cacheFuture = portFuture.map(new Mapper<Object, BlobCache>() { @Override - public BlobCache apply(Object result) { + public BlobCache checkedApply(Object result) throws IOException { Option<String> hostOption = jobManager.actor().path().address().host(); String host = hostOption.isDefined() ? hostOption.get() : "localhost"; int port = (int) result; http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 7ef1f04..2587b15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -20,12 +20,12 @@ package org.apache.flink.runtime.blob; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.FileUtils; - +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -35,10 +35,17 @@ import java.net.InetSocketAddress; import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting BLOBs through the - * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve the file from its local cache. Only if the - * local cache does not contain the desired BLOB, the BLOB cache will try to download it from the BLOB server. + * The BLOB cache implements a local cache for content-addressable BLOBs. + * + * <p>When requesting BLOBs through the {@link BlobCache#getURL} methods, the + * BLOB cache will first attempt to serve the file from its local cache. Only if + * the local cache does not contain the desired BLOB, the BLOB cache will try to + * download it from a distributed file system (if available) or the BLOB + * server.</p> */ public final class BlobCache implements BlobService { @@ -47,8 +54,12 @@ public final class BlobCache implements BlobService { private final InetSocketAddress serverAddress; + /** Root directory for local file storage */ private final File storageDir; + /** Blob store for distributed file storage, e.g. in HA */ + private final BlobStore blobStore; + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); /** Shutdown hook thread to ensure deletion of the storage directory. */ @@ -60,15 +71,62 @@ public final class BlobCache implements BlobService { /** Configuration for the blob client like ssl parameters required to connect to the blob server */ private final Configuration blobClientConfig; + /** + * Instantiates a new BLOB cache. + * + * @param serverAddress + * address of the {@link BlobServer} to use for fetching files from + * @param blobClientConfig + * global configuration + * + * @throws IOException + * thrown if the (local or distributed) file storage cannot be created or + * is not usable + */ + public BlobCache(InetSocketAddress serverAddress, + Configuration blobClientConfig) throws IOException { + this(serverAddress, blobClientConfig, + BlobUtils.createBlobStoreFromConfig(blobClientConfig)); + } - public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig) { - if (serverAddress == null || blobClientConfig == null) { - throw new NullPointerException(); - } - - this.serverAddress = serverAddress; + /** + * Instantiates a new BLOB cache. + * + * @param serverAddress + * address of the {@link BlobServer} to use for fetching files from + * @param blobClientConfig + * global configuration + * @param haServices + * high availability services able to create a distributed blob store + * + * @throws IOException + * thrown if the (local or distributed) file storage cannot be created or + * is not usable + */ + public BlobCache(InetSocketAddress serverAddress, + Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException { + this(serverAddress, blobClientConfig, haServices.createBlobStore()); + } - this.blobClientConfig = blobClientConfig; + /** + * Instantiates a new BLOB cache. + * + * @param serverAddress + * address of the {@link BlobServer} to use for fetching files from + * @param blobClientConfig + * global configuration + * @param blobStore + * (distributed) blob store file system to retrieve files from first + * + * @throws IOException + * thrown if the (local or distributed) file storage cannot be created or is not usable + */ + private BlobCache( + final InetSocketAddress serverAddress, final Configuration blobClientConfig, + final BlobStore blobStore) throws IOException { + this.serverAddress = checkNotNull(serverAddress); + this.blobClientConfig = checkNotNull(blobClientConfig); + this.blobStore = blobStore; // configure and create the storage directory String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); @@ -101,92 +159,101 @@ public final class BlobCache implements BlobService { * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ public URL getURL(final BlobKey requiredBlob) throws IOException { - if (requiredBlob == null) { - throw new IllegalArgumentException("BLOB key cannot be null."); - } + checkArgument(requiredBlob != null, "BLOB key cannot be null."); final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); - if (!localJarFile.exists()) { + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } - final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + // first try the distributed blob store (if available) + try { + blobStore.get(requiredBlob, localJarFile); + } catch (Exception e) { + LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); + } - // loop over retries - int attempt = 0; - while (true) { + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { - LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); - } + // fallback: download from the BlobServer + final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; - - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } + // loop over retries + int attempt = 0; + while (true) { - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - break; - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - closeSilently(os); - closeSilently(is); - closeSilently(bc); - - if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException(t.getMessage(), t); + if (attempt == 0) { + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); + } else { + LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); + } + + try { + BlobClient bc = null; + InputStream is = null; + OutputStream os = null; + + try { + bc = new BlobClient(serverAddress, blobClientConfig); + is = bc.get(requiredBlob); + os = new FileOutputStream(localJarFile); + + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; } + os.write(buf, 0, read); } + + // we do explicitly not use a finally block, because we want the closing + // in the regular case to throw exceptions and cause the writing to fail. + // But, the closing on exception should not throw further exceptions and + // let us keep the root exception + os.close(); + os = null; + is.close(); + is = null; + bc.close(); + bc = null; + + // success, we finished + return localJarFile.toURI().toURL(); } - catch (IOException e) { - String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + - " and store it under " + localJarFile.getAbsolutePath(); - if (attempt < numFetchRetries) { - attempt++; - if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", e); - } else { - LOG.error(message + " Retrying..."); - } + catch (Throwable t) { + // we use "catch (Throwable)" to keep the root exception. Otherwise that exception + // it would be replaced by any exception thrown in the finally block + IOUtils.closeQuietly(os); + IOUtils.closeQuietly(is); + IOUtils.closeQuietly(bc); + + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException(t.getMessage(), t); } - else { - LOG.error(message + " No retries left.", e); - throw new IOException(message, e); + } + } + catch (IOException e) { + String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + attempt++; + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", e); + } else { + LOG.error(message + " Retrying..."); } } - } // end loop over retries - } - - return localJarFile.toURI().toURL(); + else { + LOG.error(message + " No retries left.", e); + throw new IOException(message, e); + } + } + } // end loop over retries } /** @@ -202,18 +269,23 @@ public final class BlobCache implements BlobService { } /** - * Deletes the file associated with the given key from the BLOB cache and BLOB server. + * Deletes the file associated with the given key from the BLOB cache and + * BLOB server. + * * @param key referring to the file to be deleted + * @throws IOException + * thrown if an I/O error occurs while transferring the request to + * the BLOB server or if the BLOB server cannot delete the file */ public void deleteGlobal(BlobKey key) throws IOException { - BlobClient bc = createClient(); - try { - delete(key); + // delete locally + delete(key); + // then delete on the BLOB server + // (don't use the distributed storage directly - this way the blob + // server is aware of the delete operation, too) + try (BlobClient bc = createClient()) { bc.delete(key); } - finally { - bc.close(); - } } @Override @@ -258,19 +330,4 @@ public final class BlobCache implements BlobService { return this.storageDir; } - // ------------------------------------------------------------------------ - // Miscellaneous - // ------------------------------------------------------------------------ - - private void closeSilently(Closeable closeable) { - if (closeable != null) { - try { - closeable.close(); - } catch (Throwable t) { - if (LOG.isDebugEnabled()) { - LOG.debug("Error while closing resource after BLOB transfer.", t); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 2748967..ea90f54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -590,7 +590,8 @@ public final class BlobClient implements Closeable { * @param key * the key to identify the BLOB * @throws IOException - * thrown if an I/O error occurs while transferring the request to the BLOB server + * thrown if an I/O error occurs while transferring the request to + * the BLOB server or if the BLOB server cannot delete the file */ public void delete(BlobKey key) throws IOException { if (key == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index d4190a7..5b00ae4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -21,10 +21,6 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.net.SSLUtils; @@ -49,8 +45,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and @@ -77,10 +73,10 @@ public class BlobServer extends Thread implements BlobService { /** Indicates whether a shutdown of server component has been requested. */ private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - /** Is the root directory for file storage */ + /** Root directory for local file storage */ private final File storageDir; - /** Blob store for HA */ + /** Blob store for distributed file storage, e.g. in HA */ private final BlobStore blobStore; /** Set of currently running threads */ @@ -99,10 +95,11 @@ public class BlobServer extends Thread implements BlobService { * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException - * thrown if the BLOB server cannot bind to a free network port + * thrown if the BLOB server cannot bind to a free network port or if the + * (local or distributed) file storage cannot be created or is not usable */ public BlobServer(Configuration config) throws IOException { - this(config, createBlobStoreFromConfig(config)); + this(config, BlobUtils.createBlobStoreFromConfig(config)); } public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException { @@ -110,11 +107,9 @@ public class BlobServer extends Thread implements BlobService { } private BlobServer(Configuration config, BlobStore blobStore) throws IOException { - checkNotNull(config); + this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); - this.blobServiceConfiguration = config; - // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); @@ -358,9 +353,7 @@ public class BlobServer extends Thread implements BlobService { */ @Override public URL getURL(BlobKey requiredBlob) throws IOException { - if (requiredBlob == null) { - throw new IllegalArgumentException("Required BLOB cannot be null."); - } + checkArgument(requiredBlob != null, "BLOB key cannot be null."); final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); @@ -450,37 +443,4 @@ public class BlobServer extends Thread implements BlobService { } } - private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { - HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); - - if (highAvailabilityMode == HighAvailabilityMode.NONE) { - return new VoidBlobStore(); - } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { - final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); - if (isNullOrWhitespaceOnly(storagePath)) { - throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + - HighAvailabilityOptions.HA_STORAGE_PATH); - } - - final Path path; - try { - path = new Path(storagePath); - } catch (Exception e) { - throw new IOException("Invalid path for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); - } - - final FileSystem fileSystem; - try { - fileSystem = path.getFileSystem(); - } catch (Exception e) { - throw new IOException("Could not create FileSystem for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); - } - - return new FileSystemBlobStore(fileSystem, storagePath); - } else { - throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "."); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 321fc67..13a90c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -168,6 +168,14 @@ class BlobServerConnection extends Thread { * thrown if an I/O error occurs while reading/writing data from/to the respective streams */ private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + /** + * Retrieve the file from the (distributed?) BLOB store and store it + * locally, then send it to the service which requested it. + * + * Instead, we could send it from the distributed store directly but + * chances are high that if there is one request, there will be more + * so a local cache makes more sense. + */ File blobFile; try { http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java index 7050338..64dc942 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; import java.io.File; +import java.io.IOException; /** * A blob store. @@ -32,9 +33,9 @@ public interface BlobStore { * * @param localFile The file to copy * @param blobKey The ID for the file in the blob store - * @throws Exception If the copy fails + * @throws IOException If the copy fails */ - void put(File localFile, BlobKey blobKey) throws Exception; + void put(File localFile, BlobKey blobKey) throws IOException; /** * Copies a local file to the blob store. @@ -44,18 +45,18 @@ public interface BlobStore { * @param localFile The file to copy * @param jobId The JobID part of ID for the file in the blob store * @param key The String part of ID for the file in the blob store - * @throws Exception If the copy fails + * @throws IOException If the copy fails */ - void put(File localFile, JobID jobId, String key) throws Exception; + void put(File localFile, JobID jobId, String key) throws IOException; /** * Copies a blob to a local file. * * @param blobKey The blob ID * @param localFile The local file to copy to - * @throws Exception If the copy fails + * @throws IOException If the copy fails */ - void get(BlobKey blobKey, File localFile) throws Exception; + void get(BlobKey blobKey, File localFile) throws IOException; /** * Copies a blob to a local file. @@ -63,19 +64,23 @@ public interface BlobStore { * @param jobId The JobID part of ID for the blob * @param key The String part of ID for the blob * @param localFile The local file to copy to - * @throws Exception If the copy fails + * @throws IOException If the copy fails */ - void get(JobID jobId, String key, File localFile) throws Exception; + void get(JobID jobId, String key, File localFile) throws IOException; /** - * Deletes a blob. + * Tries to delete a blob from storage. + * + * <p>NOTE: This also tries to delete any created directories if empty.</p> * * @param blobKey The blob ID */ void delete(BlobKey blobKey); /** - * Deletes a blob. + * Tries to delete a blob from storage. + * + * <p>NOTE: This also tries to delete any created directories if empty.</p> * * @param jobId The JobID part of ID for the blob * @param key The String part of ID for the blob @@ -83,7 +88,9 @@ public interface BlobStore { void delete(JobID jobId, String key); /** - * Deletes blobs. + * Tries to delete all blobs for the given job from storage. + * + * <p>NOTE: This also tries to delete any created directories if empty.</p> * * @param jobId The JobID part of all blobs to delete */ http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index aeaa602..b5ba565 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -21,20 +21,19 @@ package org.apache.flink.runtime.blob; import com.google.common.io.BaseEncoding; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.IOUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.highavailability.ZookeeperHaServices; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import java.io.EOFException; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.net.URI; import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -68,11 +67,39 @@ public class BlobUtils { static final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); /** + * Creates a BlobStore based on the parameters set in the configuration. + * + * @param config + * configuration to use + * + * @return a (distributed) blob store for high availability + * + * @throws IOException + * thrown if the (distributed) file storage cannot be created + */ + static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { + HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); + + if (highAvailabilityMode == HighAvailabilityMode.NONE) { + return new VoidBlobStore(); + } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { + return ZookeeperHaServices.createBlobStore(config); + } else { + throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); + } + } + + /** * Creates a storage directory for a blob service. * * @return the storage directory used by a BLOB service + * + * @throws IOException + * thrown if the (local or distributed) file storage cannot be created or + * is not usable */ - static File initStorageDirectory(String storageDirectory) { + static File initStorageDirectory(String storageDirectory) throws + IOException { File baseDir; if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) { baseDir = new File(System.getProperty("java.io.tmpdir")); @@ -96,7 +123,7 @@ public class BlobUtils { } // max attempts exceeded to find a storage directory - throw new RuntimeException("Could not create storage directory for BLOB store in '" + baseDir + "'."); + throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'."); } /** @@ -341,7 +368,7 @@ public class BlobUtils { */ static String getRecoveryPath(String basePath, BlobKey blobKey) { // format: $base/cache/blob_$key - return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX + blobKey.toString()); + return String.format("%s/cache/%s%s", basePath, BLOB_FILE_PREFIX, blobKey.toString()); } /** @@ -353,8 +380,8 @@ public class BlobUtils { */ static String getRecoveryPath(String basePath, JobID jobId, String key) { // format: $base/job_$id/blob_$key - return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString(), - BLOB_FILE_PREFIX + encodeKey(key)); + return String.format("%s/%s%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString(), + BLOB_FILE_PREFIX, encodeKey(key)); } /** @@ -363,33 +390,7 @@ public class BlobUtils { * <p>The returned path can be used with the state backend for recovery purposes. */ static String getRecoveryPath(String basePath, JobID jobId) { - return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString()); - } - - /** - * Copies the file from the recovery path to the local file. - */ - static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws Exception { - if (recoveryPath == null) { - throw new IllegalStateException("Failed to determine recovery path."); - } - - if (!localBlobFile.createNewFile()) { - throw new IllegalStateException("Failed to create new local file to copy to"); - } - - URI uri = new URI(recoveryPath); - Path path = new Path(recoveryPath); - - if (FileSystem.get(uri).exists(path)) { - try (InputStream is = FileSystem.get(uri).open(path)) { - FileOutputStream fos = new FileOutputStream(localBlobFile); - IOUtils.copyBytes(is, fos); // closes the streams - } - } - else { - throw new IOException("Cannot find required BLOB at '" + recoveryPath + "' for recovery."); - } + return String.format("%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index 2c05002..7cfce7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -64,16 +64,16 @@ public class FileSystemBlobStore implements BlobStore { // - Put ------------------------------------------------------------------ @Override - public void put(File localFile, BlobKey blobKey) throws Exception { + public void put(File localFile, BlobKey blobKey) throws IOException { put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey)); } @Override - public void put(File localFile, JobID jobId, String key) throws Exception { + public void put(File localFile, JobID jobId, String key) throws IOException { put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key)); } - private void put(File fromFile, String toBlobPath) throws Exception { + private void put(File fromFile, String toBlobPath) throws IOException { try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) { LOG.debug("Copying from {} to {}.", fromFile, toBlobPath); Files.copy(fromFile, os); @@ -83,16 +83,16 @@ public class FileSystemBlobStore implements BlobStore { // - Get ------------------------------------------------------------------ @Override - public void get(BlobKey blobKey, File localFile) throws Exception { + public void get(BlobKey blobKey, File localFile) throws IOException { get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile); } @Override - public void get(JobID jobId, String key, File localFile) throws Exception { + public void get(JobID jobId, String key, File localFile) throws IOException { get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile); } - private void get(String fromBlobPath, File toFile) throws Exception { + private void get(String fromBlobPath, File toFile) throws IOException { checkNotNull(fromBlobPath, "Blob path"); checkNotNull(toFile, "File"); @@ -102,17 +102,21 @@ public class FileSystemBlobStore implements BlobStore { final Path fromPath = new Path(fromBlobPath); - if (fileSystem.exists(fromPath)) { - try (InputStream is = fileSystem.open(fromPath); - FileOutputStream fos = new FileOutputStream(toFile)) - { - LOG.debug("Copying from {} to {}.", fromBlobPath, toFile); - IOUtils.copyBytes(is, fos); // closes the streams + boolean success = false; + try (InputStream is = fileSystem.open(fromPath); + FileOutputStream fos = new FileOutputStream(toFile)) { + LOG.debug("Copying from {} to {}.", fromBlobPath, toFile); + IOUtils.copyBytes(is, fos); // closes the streams + success = true; + } finally { + // if the copy fails, we need to remove the target file because + // outside code relies on a correct file as long as it exists + if (!success) { + try { + toFile.delete(); + } catch (Throwable ignored) {} } } - else { - throw new IOException(fromBlobPath + " does not exist."); - } } // - Delete --------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java index ece2ac1..8606844 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; import java.io.File; +import java.io.IOException; /** * A blob store doing nothing. @@ -28,19 +29,19 @@ import java.io.File; public class VoidBlobStore implements BlobStore { @Override - public void put(File localFile, BlobKey blobKey) throws Exception { + public void put(File localFile, BlobKey blobKey) throws IOException { } @Override - public void put(File localFile, JobID jobId, String key) throws Exception { + public void put(File localFile, JobID jobId, String key) throws IOException { } @Override - public void get(BlobKey blobKey, File localFile) throws Exception { + public void get(BlobKey blobKey, File localFile) throws IOException { } @Override - public void get(JobID jobId, String key, File localFile) throws Exception { + public void get(JobID jobId, String key, File localFile) throws IOException { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 9f0c573..76d6d86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -209,7 +209,13 @@ public class JobClient { Option<String> jmHost = jobManager.actor().path().address().host(); String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost"; InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); - final BlobCache blobClient = new BlobCache(serverAddress, config); + final BlobCache blobClient; + try { + blobClient = new BlobCache(serverAddress, config); + } catch (IOException e) { + throw new JobRetrievalException(jobID, + "Failed to setup blob cache", e); + } final Collection<BlobKey> requiredJarFiles = props.requiredJarFiles(); final Collection<URL> requiredClasspaths = props.requiredClasspaths(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index c94768d..b0d5d83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -37,11 +37,12 @@ import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * For each job graph that is submitted to the system the library cache manager maintains * a set of libraries (typically JAR files) which the job requires to run. The library cache manager @@ -73,7 +74,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC // -------------------------------------------------------------------------------------------- public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) { - this.blobService = blobService; + this.blobService = checkNotNull(blobService); // Initializing the clean up task this.cleanupTimer = new Timer(true); @@ -91,8 +92,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC @Override public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException { - Preconditions.checkNotNull(jobId, "The JobId must not be null."); - Preconditions.checkNotNull(task, "The task execution id must not be null."); + checkNotNull(jobId, "The JobId must not be null."); + checkNotNull(task, "The task execution id must not be null."); if (requiredJarFiles == null) { requiredJarFiles = Collections.emptySet(); @@ -153,8 +154,8 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC @Override public void unregisterTask(JobID jobId, ExecutionAttemptID task) { - Preconditions.checkNotNull(jobId, "The JobId must not be null."); - Preconditions.checkNotNull(task, "The task execution id must not be null."); + checkNotNull(jobId, "The JobId must not be null."); + checkNotNull(task, "The task execution id must not be null."); synchronized (lockObject) { LibraryCacheEntry entry = cacheEntries.get(jobId); http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index 25d21ef..ed0ad17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -154,7 +154,21 @@ public class ZookeeperHaServices implements HighAvailabilityServices { @Override public BlobStore createBlobStore() throws IOException { - final String storagePath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); + return createBlobStore(configuration); + } + + /** + * Creates the BLOB store in which BLOBs are stored in a highly-available + * fashion. + * + * @param configuration configuration to extract the storage path from + * @return Blob store + * @throws IOException if the blob store could not be created + */ + public static BlobStore createBlobStore( + final Configuration configuration) throws IOException { + String storagePath = configuration.getValue( + HighAvailabilityOptions.HA_STORAGE_PATH); if (isNullOrWhitespaceOnly(storagePath)) { throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + HighAvailabilityOptions.HA_STORAGE_PATH); @@ -176,6 +190,10 @@ public class ZookeeperHaServices implements HighAvailabilityServices { HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); } + final String clusterId = + configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + storagePath += "/" + clusterId; + return new FileSystemBlobStore(fileSystem, storagePath); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f11cb98..58bbfac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -794,11 +794,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); - BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); - - LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( - blobCache, - taskManagerConfiguration.getCleanupInterval()); + final LibraryCacheManager libraryCacheManager; + try { + final BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration(), haServices); + libraryCacheManager = new BlobLibraryCacheManager( + blobCache, + taskManagerConfiguration.getCleanupInterval()); + } catch (IOException e) { + // Can't pass the IOException up - we need a RuntimeException anyway + // two levels up where this is run asynchronously. Also, we don't + // know whether this is caught in the thread running this method. + final String message = "Could not create BLOB cache or library cache."; + log.error(message, e); + throw new RuntimeException(message, e); + } ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobManagerLeaderId, http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 4aa9a21..34a8a39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -19,7 +19,10 @@ package org.apache.flink.runtime.blob; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.io.InputStream; @@ -33,22 +36,52 @@ import static org.junit.Assert.*; */ public class BlobCacheRetriesTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * A test where the connection fails twice and then the get operation succeeds. */ @Test public void testBlobFetchRetries() { + final Configuration config = new Configuration(); + + testBlobFetchRetries(config); + } + /** + * A test where the connection fails twice and then the get operation succeeds + * (with high availability set). + */ + @Test + public void testBlobFetchRetriesHa() { + final Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath()); + + testBlobFetchRetries(config); + } + + /** + * A test where the BlobCache must use the BlobServer and the connection + * fails twice and then the get operation succeeds. + * + * @param config + * configuration to use (the BlobCache will get some additional settings + * set compared to this one) + */ + private void testBlobFetchRetries(final Configuration config) { final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; BlobServer server = null; BlobCache cache = null; try { - final Configuration config = new Configuration(); server = new TestingFailingBlobServer(config, 2); - final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + final InetSocketAddress + serverAddress = new InetSocketAddress("localhost", server.getPort()); // upload some blob BlobClient blobClient = null; @@ -64,9 +97,15 @@ public class BlobCacheRetriesTest { } } - cache = new BlobCache(serverAddress, config); + // create a separate config for the cache with no access to + // the (shared) storage path if available so that the cache + // will always bother the BlobServer! + final Configuration cacheConfig = new Configuration(config); + cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath() + "/does-not-exist"); + cache = new BlobCache(serverAddress, cacheConfig); - // trigger a download - it should fail on the first time, but retry, and succeed at the second time + // trigger a download - it should fail the first two times, but retry, and succeed eventually URL url = cache.getURL(key); InputStream is = url.openStream(); try { @@ -97,17 +136,44 @@ public class BlobCacheRetriesTest { */ @Test public void testBlobFetchWithTooManyFailures() { + final Configuration config = new Configuration(); + testBlobFetchWithTooManyFailures(config); + } + + /** + * A test where the connection fails twice and then the get operation succeeds + * (with high availability set). + */ + @Test + public void testBlobFetchWithTooManyFailuresHa() { + final Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath()); + + testBlobFetchWithTooManyFailures(config); + } + + /** + * A test where the BlobCache must use the BlobServer and the connection + * fails too often which eventually fails the GET request. + * + * @param config + * configuration to use (the BlobCache will get some additional settings + * set compared to this one) + */ + private void testBlobFetchWithTooManyFailures(final Configuration config) { final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; BlobServer server = null; BlobCache cache = null; try { - final Configuration config = new Configuration(); server = new TestingFailingBlobServer(config, 10); - final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + final InetSocketAddress + serverAddress = new InetSocketAddress("localhost", server.getPort()); // upload some blob BlobClient blobClient = null; @@ -123,7 +189,13 @@ public class BlobCacheRetriesTest { } } - cache = new BlobCache(serverAddress, config); + // create a separate config for the cache with no access to + // the (shared) storage path if available so that the cache + // will always bother the BlobServer! + final Configuration cacheConfig = new Configuration(config); + cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath() + "/does-not-exist"); + cache = new BlobCache(serverAddress, cacheConfig); // trigger a download - it should fail eventually try { http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 7ba5a8a..db55331 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,6 +18,12 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + import java.io.File; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -25,9 +31,6 @@ import java.net.URL; import java.util.ArrayList; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.junit.Test; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -38,9 +41,48 @@ import static org.junit.Assert.fail; */ public class BlobCacheSuccessTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * BlobCache with no HA. BLOBs need to be downloaded form a working + * BlobServer. + */ @Test public void testBlobCache() { + Configuration config = new Configuration(); + uploadFileGetTest(config, false, false); + } + + /** + * BlobCache is configured in HA mode and the cache can download files from + * the file system directly and does not need to download BLOBs from the + * BlobServer. + */ + @Test + public void testBlobCacheHa() { + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath()); + uploadFileGetTest(config, true, true); + } + /** + * BlobCache is configured in HA mode but the cache itself cannot access the + * file system and thus needs to download BLOBs from the BlobServer. + */ + @Test + public void testBlobCacheHaFallback() { + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath()); + uploadFileGetTest(config, false, false); + } + + private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer, + boolean cacheHasAccessToFs) { // First create two BLOBs and upload them to BLOB server final byte[] buf = new byte[128]; final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2); @@ -50,7 +92,6 @@ public class BlobCacheSuccessTest { try { // Start the BLOB server - Configuration config = new Configuration(); blobServer = new BlobServer(config); final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort()); @@ -69,15 +110,34 @@ public class BlobCacheSuccessTest { } } - blobCache = new BlobCache(serverAddress, new Configuration()); + if (cacheWorksWithoutServer) { + // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. + blobServer.shutdown(); + blobServer = null; + } + + final Configuration cacheConfig; + if (cacheHasAccessToFs) { + cacheConfig = config; + } else { + // just in case parameters are still read from the server, + // create a separate configuration object for the cache + cacheConfig = new Configuration(config); + cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath() + "/does-not-exist"); + } + + blobCache = new BlobCache(serverAddress, cacheConfig); for (BlobKey blobKey : blobKeys) { blobCache.getURL(blobKey); } - // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); - blobServer = null; + if (blobServer != null) { + // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. + blobServer.shutdown(); + blobServer = null; + } final URL[] urls = new URL[blobKeys.size()]; http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index a8eb1d3..d043665 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.blob; -import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -26,9 +25,9 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.junit.After; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -39,27 +38,14 @@ import java.util.Arrays; import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class BlobRecoveryITCase { - private File recoveryDir; - - @Before - public void setUp() throws Exception { - recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); - if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { - throw new IllegalStateException("Failed to create temp directory for test"); - } - } - - @After - public void cleanUp() throws Exception { - if (recoveryDir != null) { - FileUtils.deleteDirectory(recoveryDir); - } - } + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); /** * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any @@ -70,13 +56,14 @@ public class BlobRecoveryITCase { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); testBlobServerRecovery(config); } public static void testBlobServerRecovery(final Configuration config) throws IOException { - String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH); + final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); + String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId; Random rand = new Random(); BlobServer[] server = new BlobServer[2]; @@ -84,7 +71,6 @@ public class BlobRecoveryITCase { BlobClient client = null; try { - for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); @@ -165,6 +151,7 @@ public class BlobRecoveryITCase { client.delete(jobId[1], testKey[1]); // Verify everything is clean + assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath))); if (fs.exists(blobServerPath)) { final org.apache.flink.core.fs.FileStatus[] recoveryFiles = fs.listStatus(blobServerPath); http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 53e1d73..025a2ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -85,16 +85,7 @@ public class BlobServerDeleteTest { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -157,16 +148,7 @@ public class BlobServerDeleteTest { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -205,16 +187,7 @@ public class BlobServerDeleteTest { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -254,16 +227,7 @@ public class BlobServerDeleteTest { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -312,16 +276,20 @@ public class BlobServerDeleteTest { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); + cleanup(server, client); + } + } + + private void cleanup(BlobServer server, BlobClient client) { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); } } + if (server != null) { + server.shutdown(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index 36ae8cc..ea0eb94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -40,6 +40,7 @@ public class BlobServerRangeTest extends TestLogger { Configuration conf = new Configuration(); conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0"); BlobServer srv = new BlobServer(conf); + srv.shutdown(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java index 63ec338..081e28c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java @@ -30,6 +30,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; public class BlobUtilsTest { @@ -55,8 +56,9 @@ public class BlobUtilsTest { assertTrue(blobUtilsTestDirectory.delete()); } - @Test(expected = Exception.class) - public void testExceptionOnCreateStorageDirectoryFailure() { + @Test(expected = IOException.class) + public void testExceptionOnCreateStorageDirectoryFailure() throws + IOException { // Should throw an Exception BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath()); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f544d83/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index a727d51..d3925be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class BlobLibraryCacheRecoveryITCase { @@ -62,12 +63,12 @@ public class BlobLibraryCacheRecoveryITCase { BlobCache cache = null; BlobLibraryCacheManager libCache = null; - try { - Configuration config = new Configuration(); - config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + try { for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); @@ -144,8 +145,11 @@ public class BlobLibraryCacheRecoveryITCase { client.delete(keys.get(1)); } - // Verify everything is clean - File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); + // Verify everything is clean below recoveryDir/<cluster_id> + final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); + File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); + File[] recoveryFiles = haBlobStoreDir.listFiles(); + assertNotNull("HA storage directory does not exist", recoveryFiles); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } finally {