[FLINK-1578] [BLOB manager] Improve failure handling and add more failure tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfce493f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfce493f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfce493f

Branch: refs/heads/master
Commit: cfce493feb70a49d2722dc2a0d79f845f7e0461a
Parents: 47fed3d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 18 12:16:35 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 18 13:59:00 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  55 +-
 .../org/apache/flink/runtime/AbstractID.java    |  93 ++-
 .../apache/flink/runtime/blob/BlobCache.java    | 164 +++--
 .../apache/flink/runtime/blob/BlobClient.java   | 700 ++++++++++++-------
 .../flink/runtime/blob/BlobConnection.java      | 337 ---------
 .../flink/runtime/blob/BlobInputStream.java     |  10 +-
 .../org/apache/flink/runtime/blob/BlobKey.java  |   3 +-
 .../apache/flink/runtime/blob/BlobServer.java   | 261 +++----
 .../runtime/blob/BlobServerConnection.java      | 466 ++++++++++++
 .../flink/runtime/blob/BlobServerProtocol.java  |  59 ++
 .../apache/flink/runtime/blob/BlobService.java  |  19 +-
 .../apache/flink/runtime/blob/BlobUtils.java    | 122 +++-
 .../apache/flink/runtime/AbstractIDTest.java    |  42 ++
 .../runtime/blob/BlobCacheRetriesTest.java      | 150 ++++
 .../runtime/blob/BlobCacheSuccessTest.java      | 121 ++++
 .../flink/runtime/blob/BlobCacheTest.java       | 121 ----
 .../flink/runtime/blob/BlobClientTest.java      | 169 +++--
 .../runtime/blob/BlobServerDeleteTest.java      | 323 +++++++++
 .../flink/runtime/blob/BlobServerGetTest.java   | 149 ++++
 .../flink/runtime/blob/BlobServerPutTest.java   | 402 +++++++++++
 .../runtime/blob/TestingFailingBlobServer.java  |  74 ++
 .../BlobLibraryCacheManagerTest.java            |   2 +-
 22 files changed, 2755 insertions(+), 1087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a0bf365..42a3c9a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -42,17 +42,6 @@ public final class ConfigConstants {
        public static final String DEFAULT_EXECUTION_RETRIES_KEY = 
"execution-retries.default";
        
        // -------------------------------- Runtime 
-------------------------------
-
-       /**
-        * The config parameter defining the storage directory to be used by 
the blob server.
-        */
-       public static final String BLOB_STORAGE_DIRECTORY_KEY = 
"blob.storage.directory";
-
-       /**
-        * The config parameter defining the cleanup interval of the library 
cache manager.
-        */
-       public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 
"library-cache-manager" +
-                       ".cleanup.interval";
        
        /**
         * The config parameter defining the network address to connect to
@@ -71,7 +60,32 @@ public final class ConfigConstants {
         * marked as failed.
         */
        public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = 
"jobmanager.max-heartbeat-delay-before-failure.msecs";
-       
+
+       /**
+        * The config parameter defining the storage directory to be used by 
the blob server.
+        */
+       public static final String BLOB_STORAGE_DIRECTORY_KEY = 
"blob.storage.directory";
+
+       /**
+        * The config parameter defining number of retires for failed BLOB 
fetches.
+        */
+       public static final String BLOB_FETCH_RETRIES_KEY = 
"blob.fetch.retries";
+
+       /**
+        * The config parameter defining the maximum number of concurrent BLOB 
fetches that the JobManager serves.
+        */
+       public static final String BLOB_FETCH_CONCURRENT_KEY = 
"blob.fetch.num-concurrent";
+
+       /**
+        * The config parameter defining the backlog of BLOB fetches on the 
JobManager
+        */
+       public static final String BLOB_FETCH_BACKLOG_KEY = 
"blob.fetch.backlog";
+
+       /**
+        * The config parameter defining the cleanup interval of the library 
cache manager.
+        */
+       public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 
"library-cache-manager.cleanup.interval";
+
        /**
         * The config parameter defining the task manager's IPC port from the 
configuration.
         */
@@ -405,7 +419,22 @@ public final class ConfigConstants {
         */
        // 30 seconds (its enough to get to mars, should be enough to detect 
failure)
        public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 
30*1000;
-       
+
+       /**
+        * Default number of retries for failed BLOB fetches.
+        */
+       public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
+
+       /**
+        * Default number of concurrent BLOB fetch operations.
+        */
+       public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;
+
+       /**
+        * Default BLOB fetch connection backlog.
+        */
+       public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
+
        /**
         * The default network port the task manager expects incoming IPC 
connections. The {@code 0} means that
         * the TaskManager searches for a free port.

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
index 130e3eb..247a052 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.flink.core.io.IOReadableWritable;
@@ -103,48 +101,39 @@ public class AbstractID implements IOReadableWritable, 
Comparable<AbstractID>, j
        }
        
        // 
--------------------------------------------------------------------------------------------
-       
+
+       /**
+        * Gets the lower 64 bits of the ID.
+        *
+        * @return The lower 64 bits of the ID.
+        */
        public long getLowerPart() {
                return lowerPart;
        }
-       
-       public long getUpperPart() {
-               return upperPart;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Converts the given byte array to a long.
+        * Gets the upper 64 bits of the ID.
         *
-        * @param ba the byte array to be converted
-        * @param offset the offset indicating at which byte inside the array 
the conversion shall begin
-        * @return the long variable
+        * @return The upper 64 bits of the ID.
         */
-       private static long byteArrayToLong(byte[] ba, int offset) {
-               long l = 0;
-
-               for (int i = 0; i < SIZE_OF_LONG; ++i) {
-                       l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i 
<< 3);
-               }
-
-               return l;
+       public long getUpperPart() {
+               return upperPart;
        }
 
        /**
-        * Converts a long to a byte array.
+        * Gets the bytes underlying this ID.
         *
-        * @param l the long variable to be converted
-        * @param ba the byte array to store the result the of the conversion
-        * @param offset offset indicating at what position inside the byte 
array the result of the conversion shall be stored
+        * @return The bytes underlying this ID.
         */
-       private static void longToByteArray(final long l, final byte[] ba, 
final int offset) {
-               for (int i = 0; i < SIZE_OF_LONG; ++i) {
-                       final int shift = i << 3; // i * 8
-                       ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL 
<< shift)) >>> shift);
-               }
+       public byte[] getBytes() {
+               byte[] bytes = new byte[SIZE];
+               longToByteArray(lowerPart, bytes, 0);
+               longToByteArray(upperPart, bytes, SIZE_OF_LONG);
+               return bytes;
        }
-       
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Serialization
        // 
--------------------------------------------------------------------------------------------
 
        @Override
@@ -159,17 +148,14 @@ public class AbstractID implements IOReadableWritable, 
Comparable<AbstractID>, j
                out.writeLong(this.upperPart);
        }
 
-       public void write(ByteBuffer buffer) {
-               buffer.putLong(this.lowerPart);
-               buffer.putLong(this.upperPart);
-       }
-
        public void writeTo(ByteBuf buf) {
                buf.writeLong(this.lowerPart);
                buf.writeLong(this.upperPart);
        }
 
        // 
--------------------------------------------------------------------------------------------
+       //  Standard Utilities
+       // 
--------------------------------------------------------------------------------------------
        
        @Override
        public boolean equals(Object obj) {
@@ -203,4 +189,39 @@ public class AbstractID implements IOReadableWritable, 
Comparable<AbstractID>, j
                int diff2 = (this.lowerPart < o.lowerPart) ? -1 : 
((this.lowerPart == o.lowerPart) ? 0 : 1);
                return diff1 == 0 ? diff2 : diff1;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Conversion Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Converts the given byte array to a long.
+        *
+        * @param ba the byte array to be converted
+        * @param offset the offset indicating at which byte inside the array 
the conversion shall begin
+        * @return the long variable
+        */
+       private static long byteArrayToLong(byte[] ba, int offset) {
+               long l = 0;
+
+               for (int i = 0; i < SIZE_OF_LONG; ++i) {
+                       l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i 
<< 3);
+               }
+
+               return l;
+       }
+
+       /**
+        * Converts a long to a byte array.
+        *
+        * @param l the long variable to be converted
+        * @param ba the byte array to store the result the of the conversion
+        * @param offset offset indicating at what position inside the byte 
array the result of the conversion shall be stored
+        */
+       private static void longToByteArray(long l, byte[] ba, int offset) {
+               for (int i = 0; i < SIZE_OF_LONG; ++i) {
+                       final int shift = i << 3; // i * 8
+                       ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL 
<< shift)) >>> shift);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 40ec4e3..0d1b29c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -40,9 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public final class BlobCache implements BlobService {
 
-       /**
-        * The log object used for debugging.
-        */
+       /** The log object used for debugging. */
        private static final Logger LOG = 
LoggerFactory.getLogger(BlobCache.class);
 
        private final InetSocketAddress serverAddress;
@@ -54,6 +53,9 @@ public final class BlobCache implements BlobService {
        /** Shutdown hook thread to ensure deletion of the storage directory. */
        private final Thread shutdownHook;
 
+       /** The number of retries when the transfer fails */
+       private final int numFetchRetries;
+
 
        public BlobCache(InetSocketAddress serverAddress, Configuration 
configuration) {
                if (serverAddress == null || configuration == null) {
@@ -62,80 +64,122 @@ public final class BlobCache implements BlobService {
 
                this.serverAddress = serverAddress;
 
+               // configure and create the storage directory
                String storageDirectory = 
configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB cache storage directory " + storageDir);
 
+               // configure the number of fetch retries
+               final int fetchRetries = configuration.getInteger(
+                               ConfigConstants.BLOB_FETCH_RETRIES_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
+               if (fetchRetries >= 0) {
+                       this.numFetchRetries = fetchRetries;
+               }
+               else {
+                       LOG.warn("Invalid value for {}. System will attempt no 
retires on failed fetches of BLOBs.",
+                                       ConfigConstants.BLOB_FETCH_RETRIES_KEY);
+                       this.numFetchRetries = 0;
+               }
+
                // Add shutdown hook to delete storage directory
                shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
 
        /**
-        * Returns the URL for the content-addressable BLOB with the given key. 
The method will first attempt to serve
-        * the BLOB from its local cache. If one or more BLOB are not in the 
cache, the method will try to download them
-        * from the BLOB server with the given address.
+        * Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
+        * the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
+        * from this cache's BLOB server.
         * 
-        * @param requiredBlob
-        *        the key of the desired content-addressable BLOB
-        * @return URL referring to the local storage location of the BLOB
-        * @throws IOException
-        *         thrown if an I/O error occurs while downloading the BLOBs 
from the BLOB server
+        * @param requiredBlob The key of the desired BLOB.
+        * @return URL referring to the local storage location of the BLOB.
+        * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
         */
        public URL getURL(final BlobKey requiredBlob) throws IOException {
                if (requiredBlob == null) {
-                       throw new IllegalArgumentException("Required BLOB 
cannot be null.");
+                       throw new IllegalArgumentException("BLOB key cannot be 
null.");
                }
 
-               BlobClient bc = null;
-               byte[] buf = null;
-               URL url = null;
+               final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
-               try {
-                       final File localJarFile = 
BlobUtils.getStorageLocation(storageDir, requiredBlob);
+               if (!localJarFile.exists()) {
 
-                       if (!localJarFile.exists()) {
+                       final byte[] buf = new 
byte[BlobServerProtocol.BUFFER_SIZE];
 
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Trying to download " + 
requiredBlob + " from " + serverAddress);
-                               }
+                       // loop over retries
+                       int attempt = 0;
+                       while (true) {
 
-                               bc = new BlobClient(serverAddress);
-                               buf = new byte[BlobServer.BUFFER_SIZE];
+                               if (attempt == 0) {
+                                       LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
+                               } else {
+                                       LOG.info("Downloading {} from {} (retry 
{})", requiredBlob, serverAddress, attempt);
+                               }
 
-                               InputStream is = null;
-                               OutputStream os = null;
                                try {
-                                       is = bc.get(requiredBlob);
-                                       os = new FileOutputStream(localJarFile);
+                                       BlobClient bc = null;
+                                       InputStream is = null;
+                                       OutputStream os = null;
+
+                                       try {
+                                               bc = new 
BlobClient(serverAddress);
+                                               is = bc.get(requiredBlob);
+                                               os = new 
FileOutputStream(localJarFile);
+
+                                               while (true) {
+                                                       final int read = 
is.read(buf);
+                                                       if (read < 0) {
+                                                               break;
+                                                       }
+                                                       os.write(buf, 0, read);
+                                               }
 
-                                       while (true) {
+                                               // we do explicitly not use a 
finally block, because we want the closing
+                                               // in the regular case to throw 
exceptions and cause the writing to fail.
+                                               // But, the closing on 
exception should not throw further exceptions and
+                                               // let us keep the root 
exception
+                                               os.close();
+                                               os = null;
+                                               is.close();
+                                               is = null;
+                                               bc.close();
+                                               bc = null;
 
-                                               final int read = is.read(buf);
-                                               if (read < 0) {
-                                                       break;
+                                               // success, we finished
+                                               break;
+                                       }
+                                       catch (Throwable t) {
+                                               // we use "catch (Throwable)" 
to keep the root exception. Otherwise that exception
+                                               // it would be replaced by any 
exception thrown in the finally block
+                                               closeSilently(os);
+                                               closeSilently(is);
+                                               closeSilently(bc);
+
+                                               if (t instanceof IOException) {
+                                                       throw (IOException) t;
+                                               } else {
+                                                       throw new 
IOException(t.getMessage(), t);
                                                }
-
-                                               os.write(buf, 0, read);
                                        }
-                               } finally {
-                                       if (is != null) {
-                                               is.close();
+                               }
+                               catch (IOException e) {
+                                       String message = "Failed to fetch BLOB 
" + requiredBlob + "  from " + serverAddress + '.';
+                                       if (attempt < numFetchRetries) {
+                                               attempt++;
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug(message + " 
Retrying...", e);
+                                               } else {
+                                                       LOG.error(message + " 
Retrying...");
+                                               }
                                        }
-                                       if (os != null) {
-                                               os.close();
+                                       else {
+                                               LOG.error(message + " No 
retries left.", e);
+                                               throw new IOException(message, 
e);
                                        }
                                }
-                       }
-                       url = localJarFile.toURI().toURL();
-
-
-               } finally {
-                       if (bc != null) {
-                               bc.close();
-                       }
+                       } // end loop over retries
                }
 
-               return url;
+               return localJarFile.toURI().toURL();
        }
 
        /**
@@ -145,8 +189,10 @@ public final class BlobCache implements BlobService {
        public void delete(BlobKey key) throws IOException{
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
 
-               if(localFile.exists()) {
-                       localFile.delete();
+               if (localFile.exists()) {
+                       if (!localFile.delete()) {
+                               LOG.warn("Failed to delete locally cached BLOB 
" + key + " at " + localFile.getAbsolutePath());
+                       }
                }
        }
 
@@ -180,4 +226,24 @@ public final class BlobCache implements BlobService {
                        }
                }
        }
+
+       public File getStorageDir() {
+               return this.storageDir;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Miscellaneous
+       // 
------------------------------------------------------------------------
+
+       private void closeSilently(Closeable closeable) {
+               if (closeable != null) {
+                       try {
+                               closeable.close();
+                       } catch (Throwable t) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Error while closing resource 
after BLOB transfer.", t);
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9a0479f..cb799c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,30 +19,45 @@
 package org.apache.flink.runtime.blob;
 
 import java.io.Closeable;
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
-import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
 import org.apache.flink.runtime.jobgraph.JobID;
 
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobUtils.readFully;
+import static org.apache.flink.runtime.blob.BlobUtils.readLength;
+import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+
 /**
- * The BLOB client can communicate with the BLOB server and either upload 
(PUT), download (GET), or delete (DELETE)
- * BLOBs.
- * <p>
- * This class is not thread-safe.
+ * The BLOB client can communicate with the BLOB server and either upload 
(PUT), download (GET),
+ * or delete (DELETE) BLOBs.
  */
 public final class BlobClient implements Closeable {
 
-       /**
-        * The socket connection to the BLOB server.
-        */
-       private Socket socket;
+       private static final Logger LOG = 
LoggerFactory.getLogger(BlobClient.class);
+
+       /** The socket connection to the BLOB server. */
+       private final Socket socket;
 
        /**
         * Instantiates a new BLOB client.
@@ -52,71 +67,177 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *         thrown if the connection to the BLOB server could not be 
established
         */
-       public BlobClient(final InetSocketAddress serverAddress) throws 
IOException {
-
+       public BlobClient(InetSocketAddress serverAddress) throws IOException {
                this.socket = new Socket();
                try {
                        this.socket.connect(serverAddress);
-               }catch(IOException e){
+               }
+               catch(IOException e) {
+                       BlobUtils.closeSilently(socket, LOG);
                        throw new IOException("Could not connect to BlobServer 
at address " + serverAddress, e);
                }
        }
 
+       @Override
+       public void close() throws IOException {
+               this.socket.close();
+       }
+
+       public boolean isClosed() {
+               return this.socket.isClosed();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  GET
+       // 
--------------------------------------------------------------------------------------------
+
        /**
-        * Constructs and writes the header data for a PUT request to the given 
output stream.
+        * Downloads the BLOB identified by the given job ID and key from the 
BLOB server. If no such BLOB exists on the
+        * server, a {@link FileNotFoundException} is thrown.
+        * 
+        * @param jobID
+        *        the job ID identifying the BLOB to download
+        * @param key
+        *        the key identifying the BLOB to download
+        * @return an input stream to read the retrieved data from
+        * @throws IOException
+        *         thrown if an I/O error occurs during the download
+        */
+       public InputStream get(JobID jobID, String key) throws IOException {
+               if (key.length() > MAX_KEY_LENGTH) {
+                       throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
+               }
+
+               if (this.socket.isClosed()) {
+                       throw new IllegalStateException("BLOB Client is not 
connected. " +
+                                       "Client has been shut down or 
encountered an error before.");
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(String.format("GET BLOB %s / \"%s\" from %s", 
jobID, key, socket.getLocalSocketAddress()));
+               }
+
+               try {
+                       OutputStream os = this.socket.getOutputStream();
+                       InputStream is = this.socket.getInputStream();
+
+                       sendGetHeader(os, jobID, key, null);
+                       receiveAndCheckResponse(is);
+
+                       return new BlobInputStream(is, null);
+               }
+               catch (Throwable t) {
+                       BlobUtils.closeSilently(socket, LOG);
+                       throw new IOException("GET operation failed: " + 
t.getMessage(), t);
+               }
+       }
+
+       /**
+        * Downloads the BLOB identified by the given BLOB key from the BLOB 
server. If no such BLOB exists on the server, a
+        * {@link FileNotFoundException} is thrown.
         * 
+        * @param blobKey
+        *        the BLOB key identifying the BLOB to download
+        * @return an input stream to read the retrieved data from
+        * @throws IOException
+        *         thrown if an I/O error occurs during the download
+        */
+       public InputStream get(BlobKey blobKey) throws IOException {
+               if (this.socket.isClosed()) {
+                       throw new IllegalStateException("BLOB Client is not 
connected. " +
+                                       "Client has been shut down or 
encountered an error before.");
+               }
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(String.format("GET content addressable BLOB 
%s from %s", blobKey, socket.getLocalSocketAddress()));
+               }
+
+               try {
+                       OutputStream os = this.socket.getOutputStream();
+                       InputStream is = this.socket.getInputStream();
+
+                       // Send GET header
+                       sendGetHeader(os, null, null, blobKey);
+                       receiveAndCheckResponse(is);
+
+                       return new BlobInputStream(is, blobKey);
+               }
+               catch (Throwable t) {
+                       BlobUtils.closeSilently(socket, LOG);
+                       throw new IOException("GET operation failed: " + 
t.getMessage(), t);
+               }
+       }
+
+       /**
+        * Constructs and writes the header data for a GET operation to the 
given output stream.
+        *
         * @param outputStream
-        *        the output stream to write the PUT header data to
+        *        the output stream to write the header data to
         * @param jobID
-        *        the ID of job the BLOB belongs to or <code>null</code> to 
indicate the upload of a
-        *        content-addressable BLOB
+        *        the job ID identifying the BLOB to download or 
<code>null</code> to indicate the BLOB key should be used
+        *        to identify the BLOB on the server instead
         * @param key
-        *        the key of the BLOB to upload or <code>null</code> to 
indicate the upload of a content-addressable BLOB
-        * @param buf
-        *        an auxiliary buffer used for data serialization
+        *        the key identifying the BLOB to download or <code>null</code> 
to indicate the BLOB key should be used to
+        *        identify the BLOB on the server instead
+        * @param blobKey
+        *        the BLOB key to identify the BLOB to download if either the 
job ID or the regular key are
+        *        <code>null</code>
         * @throws IOException
         *         thrown if an I/O error occurs while writing the header data 
to the output stream
         */
-       private void sendPutHeader(final OutputStream outputStream, final JobID 
jobID, final String key, final byte[] buf)
-                       throws IOException {
+       private void sendGetHeader(OutputStream outputStream, JobID jobID, 
String key, BlobKey blobKey) throws IOException {
 
                // Signal type of operation
-               outputStream.write(BlobServer.PUT_OPERATION);
+               outputStream.write(GET_OPERATION);
 
-               // Check if PUT should be done in content-addressable manner
+               // Check if GET should be done in content-addressable manner
                if (jobID == null || key == null) {
-                       outputStream.write(1);
-               } else {
-                       outputStream.write(0);
-                       // Send job ID
-                       final ByteBuffer bb = ByteBuffer.wrap(buf);
-                       jobID.write(bb);
-                       outputStream.write(buf);
-
-                       // Send the key
+                       outputStream.write(CONTENT_ADDRESSABLE);
+                       blobKey.writeToOutputStream(outputStream);
+               }
+               else {
+                       outputStream.write(NAME_ADDRESSABLE);
+                       // Send job ID and key
+                       outputStream.write(jobID.getBytes());
                        byte[] keyBytes = 
key.getBytes(BlobUtils.DEFAULT_CHARSET);
-                       BlobServer.writeLength(keyBytes.length, buf, 
outputStream);
+                       writeLength(keyBytes.length, outputStream);
                        outputStream.write(keyBytes);
                }
        }
 
+       private void receiveAndCheckResponse(InputStream is) throws IOException 
{
+               int response = is.read();
+               if (response < 0) {
+                       throw new EOFException("Premature end of response");
+               }
+               if (response == RETURN_ERROR) {
+                       Throwable cause = readExceptionFromStream(is);
+                       throw new IOException("Server side error: " + 
cause.getMessage(), cause);
+               }
+               else if (response != RETURN_OKAY) {
+                       throw new IOException("Unrecognized response");
+               }
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  PUT
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * Uploads the data of the given byte array to the BLOB server in a 
content-addressable manner.
-        * 
+        *
         * @param value
         *        the buffer to upload
         * @return the computed BLOB key identifying the BLOB on the server
         * @throws IOException
         *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
         */
-       public BlobKey put(final byte[] value) throws IOException {
-
+       public BlobKey put(byte[] value) throws IOException {
                return put(value, 0, value.length);
        }
 
        /**
         * Uploads data from the given byte array to the BLOB server in a 
content-addressable manner.
-        * 
+        *
         * @param value
         *        the buffer to upload data from
         * @param offset
@@ -127,14 +248,13 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
         */
-       public BlobKey put(final byte[] value, final int offset, final int len) 
throws IOException {
-
+       public BlobKey put(byte[] value, int offset, int len) throws 
IOException {
                return putBuffer(null, null, value, offset, len);
        }
 
        /**
         * Uploads the data of the given byte array to the BLOB server and 
stores it under the given job ID and key.
-        * 
+        *
         * @param jobId
         *        the job ID to identify the uploaded data
         * @param key
@@ -144,14 +264,13 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
         */
-       public void put(final JobID jobId, final String key, final byte[] 
value) throws IOException {
-
+       public void put(JobID jobId, String key, byte[] value) throws 
IOException {
                put(jobId, key, value, 0, value.length);
        }
 
        /**
         * Uploads data from the given byte array to the BLOB server and stores 
it under the given job ID and key.
-        * 
+        *
         * @param jobId
         *        the job ID to identify the uploaded data
         * @param key
@@ -165,11 +284,9 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
         */
-       public void put(final JobID jobId, final String key, final byte[] 
value, final int offset, final int len)
-                       throws IOException {
-
-               if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-                       throw new IllegalArgumentException("Keys must not be 
longer than " + BlobServer.MAX_KEY_LENGTH);
+       public void put(JobID jobId, String key, byte[] value, int offset, int 
len) throws IOException {
+               if (key.length() > MAX_KEY_LENGTH) {
+                       throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
                }
 
                putBuffer(jobId, key, value, offset, len);
@@ -177,7 +294,7 @@ public final class BlobClient implements Closeable {
 
        /**
         * Uploads data from the given input stream to the BLOB server and 
stores it under the given job ID and key.
-        * 
+        *
         * @param jobId
         *        the job ID to identify the uploaded data
         * @param key
@@ -188,10 +305,9 @@ public final class BlobClient implements Closeable {
         *         thrown if an I/O error occurs while reading the data from 
the input stream or uploading the data to the
         *         BLOB server
         */
-       public void put(final JobID jobId, final String key, final InputStream 
inputStream) throws IOException {
-
-               if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-                       throw new IllegalArgumentException("Keys must not be 
longer than " + BlobServer.MAX_KEY_LENGTH);
+       public void put(JobID jobId, String key, InputStream inputStream) 
throws IOException {
+               if (key.length() > MAX_KEY_LENGTH) {
+                       throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
                }
 
                putInputStream(jobId, key, inputStream);
@@ -199,7 +315,7 @@ public final class BlobClient implements Closeable {
 
        /**
         * Uploads the data from the given input stream to the BLOB server in a 
content-addressable manner.
-        * 
+        *
         * @param inputStream
         *        the input stream to read the data from
         * @return the computed BLOB key identifying the BLOB on the server
@@ -207,93 +323,13 @@ public final class BlobClient implements Closeable {
         *         thrown if an I/O error occurs while reading the data from 
the input stream or uploading the data to the
         *         BLOB server
         */
-       public BlobKey put(final InputStream inputStream) throws IOException {
-
+       public BlobKey put(InputStream inputStream) throws IOException {
                return putInputStream(null, null, inputStream);
        }
 
        /**
-        * Deletes the BLOB identified by the given job ID and key from the 
BLOB server.
-        * 
-        * @param jobId
-        *        the job ID to identify the BLOB
-        * @param key
-        *        the key to identify the BLOB
-        * @throws IOException
-        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
-        */
-       public void delete(final JobID jobId, final String key) throws 
IOException {
-
-               if (jobId == null) {
-                       throw new IllegalArgumentException("Argument jobID must 
not be null");
-               }
-
-               if (key == null) {
-                       throw new IllegalArgumentException("Argument key must 
not be null");
-               }
-
-               if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-                       throw new IllegalArgumentException("Keys must not be 
longer than " + BlobServer.MAX_KEY_LENGTH);
-               }
-
-               deleteInternal(jobId, key);
-       }
-
-       /**
-        * Deletes all BLOBs belonging to the job with the given ID from the 
BLOB server
-        * 
-        * @param jobId
-        *        the job ID to identify the BLOBs to be deleted
-        * @throws IOException
-        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
-        */
-       public void deleteAll(final JobID jobId) throws IOException {
-
-               if (jobId == null) {
-                       throw new IllegalArgumentException("Argument jobID must 
not be null");
-               }
-
-               deleteInternal(jobId, null);
-       }
-
-       /**
-        * Delete one or multiple BLOBs from the BLOB server.
-        * 
-        * @param jobId
-        *        the job ID to identify the BLOB(s) to be deleted
-        * @param key
-        *        the key to identify the specific BLOB to delete or 
<code>null</code> to delete all BLOBs associated with
-        *        the job
-        * @throws IOException
-        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
-        */
-       private void deleteInternal(final JobID jobId, final String key) throws 
IOException {
-
-               final OutputStream os = this.socket.getOutputStream();
-               final byte[] buf = new byte[AbstractID.SIZE];
-
-               // Signal type of operation
-               os.write(BlobServer.DELETE_OPERATION);
-
-               // Send job ID
-               final ByteBuffer bb = ByteBuffer.wrap(buf);
-               jobId.write(bb);
-               os.write(buf);
-
-               if (key == null) {
-                       os.write(0);
-               } else {
-                       os.write(1);
-                       // Send the key
-                       byte[] keyBytes = 
key.getBytes(BlobUtils.DEFAULT_CHARSET);
-                       BlobServer.writeLength(keyBytes.length, buf, os);
-                       os.write(keyBytes);
-               }
-       }
-
-       /**
         * Uploads data from the given byte buffer to the BLOB server.
-        * 
+        *
         * @param jobId
         *        the ID of the job the BLOB belongs to or <code>null</code> to 
store the BLOB in a content-addressable
         *        manner
@@ -311,56 +347,62 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
         */
-       private BlobKey putBuffer(final JobID jobId, final String key, final 
byte[] value, final int offset, final int len)
-                       throws IOException {
+       private BlobKey putBuffer(JobID jobId, String key, byte[] value, int 
offset, int len) throws IOException {
+               if (this.socket.isClosed()) {
+                       throw new IllegalStateException("BLOB Client is not 
connected. " +
+                                       "Client has been shut down or 
encountered an error before.");
+               }
 
-               final OutputStream os = this.socket.getOutputStream();
-               final MessageDigest md = (jobId == null || key == null) ? 
BlobUtils.createMessageDigest() :
-                               null;
-               final byte[] buf = new byte[AbstractID.SIZE];
+               if (LOG.isDebugEnabled()) {
+                       if (jobId == null) {
+                               LOG.debug(String.format("PUT content 
addressable BLOB buffer (%d bytes) to %s",
+                                               len, 
socket.getLocalSocketAddress()));
+                       } else {
+                               LOG.debug(String.format("PUT BLOB buffer (%d 
bytes) under %s / \"%s\" to %s",
+                                               len, jobId, key, 
socket.getLocalSocketAddress()));
+                       }
+               }
 
-               // Send the PUT header
-               sendPutHeader(os, jobId, key, buf);
+               try {
+                       final OutputStream os = this.socket.getOutputStream();
+                       final MessageDigest md = jobId == null ? 
BlobUtils.createMessageDigest() : null;
 
-               // Send the value in iterations of BUFFER_SIZE
-               int remainingBytes = value.length;
-               int bytesSent = 0;
+                       // Send the PUT header
+                       sendPutHeader(os, jobId, key);
 
-               while (remainingBytes > 0) {
+                       // Send the value in iterations of BUFFER_SIZE
+                       int remainingBytes = len;
 
-                       final int bytesToSend = 
Math.min(BlobServer.BUFFER_SIZE, remainingBytes);
-                       BlobServer.writeLength(bytesToSend, buf, os);
+                       while (remainingBytes > 0) {
+                               final int bytesToSend = Math.min(BUFFER_SIZE, 
remainingBytes);
+                               writeLength(bytesToSend, os);
 
-                       os.write(value, offset + bytesSent, bytesToSend);
+                               os.write(value, offset, bytesToSend);
 
-                       // Update the message digest if necessary
-                       if (md != null) {
-                               md.update(value, offset + bytesSent, 
bytesToSend);
-                       }
+                               // Update the message digest if necessary
+                               if (md != null) {
+                                       md.update(value, offset, bytesToSend);
+                               }
 
-                       remainingBytes -= bytesToSend;
-                       bytesSent += bytesToSend;
-               }
+                               remainingBytes -= bytesToSend;
+                               offset += bytesToSend;
+                       }
+                       // send -1 as the stream end
+                       writeLength(-1, os);
 
-               if (md == null) {
-                       return null;
+                       // Receive blob key and compare
+                       final InputStream is = this.socket.getInputStream();
+                       return receivePutResponseAndCompare(is, md);
                }
-
-               // Receive blob key and compare
-               final InputStream is = this.socket.getInputStream();
-               final BlobKey localKey = new BlobKey(md.digest());
-               final BlobKey remoteKey = BlobKey.readFromInputStream(is);
-
-               if (!localKey.equals(remoteKey)) {
-                       throw new IOException("Detected data corruption during 
transfer");
+               catch (Throwable t) {
+                       BlobUtils.closeSilently(socket, LOG);
+                       throw new IOException("PUT operation failed: " + 
t.getMessage(), t);
                }
-
-               return localKey;
        }
 
        /**
         * Uploads data from the given input stream to the BLOB server.
-        * 
+        *
         * @param jobId
         *        the ID of the job the BLOB belongs to or <code>null</code> to 
store the BLOB in a content-addressable
         *        manner
@@ -374,143 +416,261 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
         */
-       private BlobKey putInputStream(final JobID jobId, final String key, 
final InputStream inputStream)
-                       throws IOException {
-
-               final OutputStream os = this.socket.getOutputStream();
-               final MessageDigest md = (jobId == null || key == null) ? 
BlobUtils.createMessageDigest
-                               () : null;
-               final byte[] buf = new byte[AbstractID.SIZE];
-               final byte[] xferBuf = new byte[BlobServer.BUFFER_SIZE];
-
-               // Send the PUT header
-               sendPutHeader(os, jobId, key, buf);
-
-               while (true) {
+       private BlobKey putInputStream(JobID jobId, String key, InputStream 
inputStream) throws IOException {
+               if (this.socket.isClosed()) {
+                       throw new IllegalStateException("BLOB Client is not 
connected. " +
+                                       "Client has been shut down or 
encountered an error before.");
+               }
 
-                       final int read = inputStream.read(xferBuf);
-                       if (read < 0) {
-                               break;
+               if (LOG.isDebugEnabled()) {
+                       if (jobId == null) {
+                               LOG.debug(String.format("PUT content 
addressable BLOB stream to %s",
+                                               
socket.getLocalSocketAddress()));
+                       } else {
+                               LOG.debug(String.format("PUT BLOB stream under 
%s / \"%s\" to %s",
+                                               jobId, key, 
socket.getLocalSocketAddress()));
                        }
-                       if (read > 0) {
-                               BlobServer.writeLength(read, buf, os);
-                               os.write(xferBuf, 0, read);
-                               if (md != null) {
-                                       md.update(xferBuf, 0, read);
+               }
+
+               try {
+                       final OutputStream os = this.socket.getOutputStream();
+                       final MessageDigest md = jobId == null ? 
BlobUtils.createMessageDigest() : null;
+                       final byte[] xferBuf = new byte[BUFFER_SIZE];
+
+                       // Send the PUT header
+                       sendPutHeader(os, jobId, key);
+
+                       while (true) {
+                               final int read = inputStream.read(xferBuf);
+                               if (read < 0) {
+                                       // we are done. send a -1 and be done
+                                       writeLength(-1, os);
+                                       break;
+                               }
+                               if (read > 0) {
+                                       writeLength(read, os);
+                                       os.write(xferBuf, 0, read);
+                                       if (md != null) {
+                                               md.update(xferBuf, 0, read);
+                                       }
                                }
                        }
+
+                       // Receive blob key and compare
+                       final InputStream is = this.socket.getInputStream();
+                       return receivePutResponseAndCompare(is, md);
+               }
+               catch (Throwable t) {
+                       BlobUtils.closeSilently(socket, LOG);
+                       throw new IOException("PUT operation failed: " + 
t.getMessage(), t);
                }
+       }
 
-               if (md == null) {
-                       return null;
+       private BlobKey receivePutResponseAndCompare(InputStream is, 
MessageDigest md) throws IOException {
+               int response = is.read();
+               if (response < 0) {
+                       throw new EOFException("Premature end of response");
                }
+               else if (response == RETURN_OKAY) {
+                       if (md == null) {
+                               // not content addressable
+                               return null;
+                       }
 
-               // Receive blob key and compare
-               final InputStream is = this.socket.getInputStream();
-               final BlobKey localKey = new BlobKey(md.digest());
-               final BlobKey remoteKey = BlobKey.readFromInputStream(is);
+                       BlobKey remoteKey = BlobKey.readFromInputStream(is);
+                       BlobKey localKey = new BlobKey(md.digest());
 
-               if (!localKey.equals(remoteKey)) {
-                       throw new IOException("Detected data corruption during 
transfer");
-               }
+                       if (!localKey.equals(remoteKey)) {
+                               throw new IOException("Detected data corruption 
during transfer");
+                       }
 
-               return localKey;
+                       return localKey;
+               }
+               else if (response == RETURN_ERROR) {
+                       Throwable cause = readExceptionFromStream(is);
+                       throw new IOException("Server side error: " + 
cause.getMessage(), cause);
+               }
+               else {
+                       throw new IOException("Unrecognized response");
+               }
        }
 
        /**
-        * Downloads the BLOB identified by the given job ID and key from the 
BLOB server. If no such BLOB exists on the
-        * server, a {@link FileNotFoundException} is thrown.
-        * 
+        * Constructs and writes the header data for a PUT request to the given 
output stream.
+        * NOTE: If the jobId and key are null, we send the data to the content 
addressable section.
+        *
+        * @param outputStream
+        *        the output stream to write the PUT header data to
         * @param jobID
-        *        the job ID identifying the BLOB to download
+        *        the ID of job the BLOB belongs to or <code>null</code> to 
indicate the upload of a
+        *        content-addressable BLOB
         * @param key
-        *        the key identifying the BLOB to download
-        * @return an input stream to read the retrieved data from
+        *        the key of the BLOB to upload or <code>null</code> to 
indicate the upload of a content-addressable BLOB
         * @throws IOException
-        *         thrown if an I/O error occurs during the download
+        *         thrown if an I/O error occurs while writing the header data 
to the output stream
         */
-       public InputStream get(final JobID jobID, final String key) throws 
IOException {
+       private void sendPutHeader(OutputStream outputStream, JobID jobID, 
String key) throws IOException {
+               // sanity check that either both are null or both are not null
+               if ((jobID != null || key != null) && !(jobID != null && key != 
null)) {
+                       throw new IllegalArgumentException();
+               }
 
-               if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-                       throw new IllegalArgumentException("Keys must not be 
longer than " + BlobServer.MAX_KEY_LENGTH);
+               // Signal type of operation
+               outputStream.write(PUT_OPERATION);
+
+               // Check if PUT should be done in content-addressable manner
+               if (jobID == null) {
+                       outputStream.write(CONTENT_ADDRESSABLE);
                }
+               else {
+                       outputStream.write(NAME_ADDRESSABLE);
+                       // Send job ID and the key
+                       byte[] idBytes = jobID.getBytes();
+                       byte[] keyBytes = 
key.getBytes(BlobUtils.DEFAULT_CHARSET);
+                       outputStream.write(idBytes);
+                       writeLength(keyBytes.length, outputStream);
+                       outputStream.write(keyBytes);
+               }
+       }
 
-               final OutputStream os = this.socket.getOutputStream();
-               final byte[] buf = new byte[AbstractID.SIZE];
+       // 
--------------------------------------------------------------------------------------------
+       //  DELETE
+       // 
--------------------------------------------------------------------------------------------
 
-               // Send GET header
-               sendGetHeader(os, jobID, key, null, buf);
+       /**
+        * Deletes the BLOB identified by the given BLOB key from the BLOB 
server.
+        *
+        * @param key
+        *        the key to identify the BLOB
+        * @throws IOException
+        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
+        */
+       public void delete(BlobKey key) throws IOException {
+               if (key == null) {
+                       throw new IllegalArgumentException("BLOB key must not 
be null");
+               }
 
-               return new BlobInputStream(this.socket.getInputStream(), null, 
buf);
+               deleteInternal(null, null, key);
        }
 
        /**
-        * Downloads the BLOB identified by the given BLOB key from the BLOB 
server. If no such BLOB exists on the server, a
-        * {@link FileNotFoundException} is thrown.
-        * 
-        * @param blobKey
-        *        the BLOB key identifying the BLOB to download
-        * @return an input stream to read the retrieved data from
+        * Deletes the BLOB identified by the given job ID and key from the 
BLOB server.
+        *
+        * @param jobId
+        *        the job ID to identify the BLOB
+        * @param key
+        *        the key to identify the BLOB
         * @throws IOException
-        *         thrown if an I/O error occurs during the download
+        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
         */
-       public InputStream get(final BlobKey blobKey) throws IOException {
+       public void delete(JobID jobId, String key) throws IOException {
+               if (jobId == null) {
+                       throw new IllegalArgumentException("JobID must not be 
null");
+               }
+               if (key == null) {
+                       throw new IllegalArgumentException("Key must not be 
null");
+               }
+               if (key.length() > MAX_KEY_LENGTH) {
+                       throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
+               }
 
-               final OutputStream os = this.socket.getOutputStream();
-               final byte[] buf = new byte[AbstractID.SIZE];
+               deleteInternal(jobId, key, null);
+       }
 
-               // Send GET header
-               sendGetHeader(os, null, null, blobKey, buf);
+       /**
+        * Deletes all BLOBs belonging to the job with the given ID from the 
BLOB server
+        *
+        * @param jobId
+        *        the job ID to identify the BLOBs to be deleted
+        * @throws IOException
+        *         thrown if an I/O error occurs while transferring the request 
to the BLOB server
+        */
+       public void deleteAll(JobID jobId) throws IOException {
+               if (jobId == null) {
+                       throw new IllegalArgumentException("Argument jobID must 
not be null");
+               }
 
-               return new BlobInputStream(this.socket.getInputStream(), 
blobKey, buf);
+               deleteInternal(jobId, null, null);
        }
 
        /**
-        * Constructs and writes the header data for a GET operation to the 
given output stream.
-        * 
-        * @param outputStream
-        *        the output stream to write the header data to
-        * @param jobID
-        *        the job ID identifying the BLOB to download or 
<code>null</code> to indicate the BLOB key should be used
-        *        to identify the BLOB on the server instead
-        * @param key
-        *        the key identifying the BLOB to download or <code>null</code> 
to indicate the BLOB key should be used to
-        *        identify the BLOB on the server instead
-        * @param key2
-        *        the BLOB key to identify the BLOB to download if either the 
job ID or the regular key are
-        *        <code>null</code>
-        * @param buf
-        *        auxiliary buffer used for data serialization
-        * @throws IOException
-        *         thrown if an I/O error occurs while writing the header data 
to the output stream
+        * Delete one or multiple BLOBs from the BLOB server.
+        *
+        * @param jobId The job ID to identify the BLOB(s) to be deleted.
+        * @param key The key to identify the specific BLOB to delete or 
<code>null</code> to delete
+        *            all BLOBs associated with the job id.
+        * @param bKey The blob key to identify a specific content addressable 
BLOB. This parameter
+        *             is exclusive with jobId and key.
+        * @throws IOException Thrown if an I/O error occurs while transferring 
the request to the BLOB server.
         */
-       private void sendGetHeader(final OutputStream outputStream, final JobID 
jobID, final String key,
-                       final BlobKey key2, final byte[] buf) throws 
IOException {
+       private void deleteInternal(JobID jobId, String key, BlobKey bKey) 
throws IOException {
+               if ((jobId != null && bKey != null) || (jobId == null && bKey 
== null)) {
+                       throw new IllegalArgumentException();
+               }
 
-               // Signal type of operation
-               outputStream.write(BlobServer.GET_OPERATION);
+               try {
+                       final OutputStream outputStream = 
this.socket.getOutputStream();
+                       final InputStream inputStream = 
this.socket.getInputStream();
 
-               // Check if GET should be done in content-addressable manner
-               if (jobID == null || key == null) {
-                       outputStream.write(1);
-                       key2.writeToOutputStream(outputStream);
-               } else {
-                       outputStream.write(0);
-                       // Send job ID
-                       final ByteBuffer bb = ByteBuffer.wrap(buf);
-                       jobID.write(bb);
-                       outputStream.write(buf);
-
-                       // Send the key
-                       byte[] keyBytes = 
key.getBytes(BlobUtils.DEFAULT_CHARSET);
-                       BlobServer.writeLength(keyBytes.length, buf, 
outputStream);
-                       outputStream.write(keyBytes);
+                       // Signal type of operation
+                       outputStream.write(DELETE_OPERATION);
+
+                       // Check if DELETE should be done in 
content-addressable manner
+                       if (jobId == null) {
+                               // delete blob key
+                               outputStream.write(CONTENT_ADDRESSABLE);
+                               bKey.writeToOutputStream(outputStream);
+                       }
+                       else if (key != null) {
+                               // delete BLOB for jobID and name key
+                               outputStream.write(NAME_ADDRESSABLE);
+                               // Send job ID and the key
+                               byte[] idBytes = jobId.getBytes();
+                               byte[] keyBytes = 
key.getBytes(BlobUtils.DEFAULT_CHARSET);
+                               outputStream.write(idBytes);
+                               writeLength(keyBytes.length, outputStream);
+                               outputStream.write(keyBytes);
+                       }
+                       else {
+                               // delete all blobs for JobID
+                               outputStream.write(JOB_ID_SCOPE);
+                               byte[] idBytes = jobId.getBytes();
+                               outputStream.write(idBytes);
+                       }
+
+                       int response = inputStream.read();
+                       if (response < 0) {
+                               throw new EOFException("Premature end of 
response");
+                       }
+                       if (response == RETURN_ERROR) {
+                               Throwable cause = 
readExceptionFromStream(inputStream);
+                               throw new IOException("Server side error: " + 
cause.getMessage(), cause);
+                       }
+                       else if (response != RETURN_OKAY) {
+                               throw new IOException("Unrecognized response");
+                       }
+               }
+               catch (Throwable t) {
+                       BlobUtils.closeSilently(socket, LOG);
+                       throw new IOException("DELETE operation failed: " + 
t.getMessage(), t);
                }
        }
 
-       @Override
-       public void close() throws IOException {
+       // 
--------------------------------------------------------------------------------------------
+       //  Miscellaneous
+       // 
--------------------------------------------------------------------------------------------
 
-               this.socket.close();
+       private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
+               int len = readLength(in);
+               byte[] bytes = new byte[len];
+               readFully(in, bytes, 0, len, "Error message");
+
+               try {
+                       return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+               }
+               catch (ClassNotFoundException e) {
+                       // should never occur
+                       throw new IOException("Could not transfer error 
message", e);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java
deleted file mode 100644
index 3b7a31b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A BLOB connection handles a series of requests from a particular BLOB 
client.
- * <p>
- * This class it thread-safe.
- */
-class BlobConnection extends Thread {
-
-       /**
-        * The log object used for debugging.
-        */
-       private static final Logger LOG = 
LoggerFactory.getLogger(BlobConnection.class);
-
-       /**
-        * The socket to communicate with the client.
-        */
-       private final Socket clientSocket;
-
-       /**
-        * The BLOB server.
-        */
-       private final BlobServer blobServer;
-
-       /**
-        * Creates a new BLOB connection for a client request
-        * 
-        * @param clientSocket
-        *        the socket to read/write data
-        * @param blobServer
-        *        the BLOB server
-        */
-       BlobConnection(final Socket clientSocket, final BlobServer blobServer) {
-               super("BLOB connection for " + 
clientSocket.getRemoteSocketAddress().toString());
-
-               this.clientSocket = clientSocket;
-               this.blobServer = blobServer;
-       }
-
-       @Override
-       public void run() {
-
-               try {
-
-                       final InputStream inputStream = 
this.clientSocket.getInputStream();
-                       final OutputStream outputStream = 
this.clientSocket.getOutputStream();
-                       final byte[] buffer = new byte[BlobServer.BUFFER_SIZE];
-
-                       while (true) {
-
-                               // Read the requested operation
-                               final int operation = inputStream.read();
-                               if (operation < 0) {
-                                       return;
-                               }
-
-                               switch (operation) {
-                               case BlobServer.PUT_OPERATION:
-                                       put(inputStream, outputStream, buffer);
-                                       break;
-                               case BlobServer.GET_OPERATION:
-                                       get(inputStream, outputStream, buffer);
-                                       break;
-                               case BlobServer.DELETE_OPERATION:
-                                       delete(inputStream, buffer);
-                                       break;
-                               default:
-                                       throw new IOException("Unknown 
operation " + operation);
-                               }
-                       }
-
-               } catch (IOException ioe) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Error while executing BLOB 
connection.", ioe);
-                       }
-               } finally {
-                       closeSilently(this.clientSocket);
-               }
-       }
-
-       /**
-        * Handles an incoming GET request from a BLOB client.
-        * 
-        * @param inputStream
-        *        the input stream to read incoming data from
-        * @param outputStream
-        *        the output stream to send data back to the client
-        * @param buf
-        *        an auxiliary buffer for data serialization/deserialization
-        * @throws IOException
-        *         thrown if an I/O error occurs while reading/writing data 
from/to the respective streams
-        */
-       private void get(final InputStream inputStream, final OutputStream 
outputStream, final byte[] buf)
-                       throws IOException {
-
-               File blob = null;
-
-               final int contentAdressable = inputStream.read();
-               if (contentAdressable < 0) {
-                       throw new EOFException("Expected GET header");
-               }
-
-               if (contentAdressable == 0) {
-                       // Receive the job ID
-                       BlobServer.readFully(inputStream, buf, 0, JobID.SIZE);
-                       final ByteBuffer bb = ByteBuffer.wrap(buf);
-                       final JobID jobID = JobID.fromByteBuffer(bb);
-                       // Receive the key
-                       final String key = readKey(buf, inputStream);
-                       blob = this.blobServer.getStorageLocation(jobID, key);
-               } else {
-                       final BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-                       blob = blobServer.getStorageLocation(key);
-               }
-
-               // Check if BLOB exists
-               if (!blob.exists()) {
-                       BlobServer.writeLength(-1, buf, outputStream);
-                       return;
-               }
-
-               BlobServer.writeLength((int) blob.length(), buf, outputStream);
-               FileInputStream fis = null;
-               try {
-                       fis = new FileInputStream(blob);
-
-                       while (true) {
-
-                               final int read = fis.read(buf);
-                               if (read < 0) {
-                                       break;
-                               }
-                               outputStream.write(buf, 0, read);
-                       }
-
-               } finally {
-                       if (fis != null) {
-                               fis.close();
-                       }
-               }
-       }
-
-       /**
-        * Handles an incoming PUT request from a BLOB client.
-        * 
-        * @param inputStream
-        *        the input stream to read incoming data from
-        * @param outputStream
-        *        the output stream to send data back to the client
-        * @param buf
-        *        an auxiliary buffer for data serialization/deserialization
-        * @throws IOException
-        *         thrown if an I/O error occurs while reading/writing data 
from/to the respective streams
-        */
-       private void put(final InputStream inputStream, final OutputStream 
outputStream, final byte[] buf)
-                       throws IOException {
-
-               JobID jobID = null;
-               String key = null;
-               MessageDigest md = null;
-               final int contentAdressable = inputStream.read();
-               if (contentAdressable < 0) {
-                       throw new EOFException("Expected PUT header");
-               }
-
-               if (contentAdressable == 0) {
-                       // Receive the job ID
-                       BlobServer.readFully(inputStream, buf, 0, JobID.SIZE);
-                       final ByteBuffer bb = ByteBuffer.wrap(buf);
-                       jobID = JobID.fromByteBuffer(bb);
-                       // Receive the key
-                       key = readKey(buf, inputStream);
-               } else {
-                       md = BlobUtils.createMessageDigest();
-               }
-
-               File incomingFile = null;
-               FileOutputStream fos = null;
-
-               try {
-                       incomingFile = blobServer.getTemporaryFilename();
-                       fos = new FileOutputStream(incomingFile);
-
-                       while (true) {
-
-                               final int bytesExpected = 
BlobServer.readLength(buf, inputStream);
-                               if (bytesExpected > BlobServer.BUFFER_SIZE) {
-                                       throw new IOException("Unexpected 
number of incoming bytes: " + bytesExpected);
-                               }
-
-                               BlobServer.readFully(inputStream, buf, 0, 
bytesExpected);
-                               fos.write(buf, 0, bytesExpected);
-
-                               if (md != null) {
-                                       md.update(buf, 0, bytesExpected);
-                               }
-
-                               if (bytesExpected < BlobServer.BUFFER_SIZE) {
-                                       break;
-                               }
-                       }
-
-                       fos.close();
-                       fos = null;
-
-                       if (contentAdressable == 0) {
-                               final File storageFile = 
this.blobServer.getStorageLocation(jobID, key);
-                               incomingFile.renameTo(storageFile);
-                               incomingFile = null;
-                       } else {
-                               final BlobKey blobKey = new 
BlobKey(md.digest());
-                               final File storageFile = 
blobServer.getStorageLocation(blobKey);
-                               incomingFile.renameTo(storageFile);
-                               incomingFile = null;
-
-                               // Return computed key to client for validation
-                               blobKey.writeToOutputStream(outputStream);
-                       }
-               } finally {
-                       if (fos != null) {
-                               fos.close();
-                       }
-                       if (incomingFile != null) {
-                               incomingFile.delete();
-                       }
-               }
-       }
-
-       /**
-        * Handles an incoming DELETE request from a BLOB client.
-        * 
-        * @param inputStream
-        *        the input stream to read the request from.
-        * @param buf
-        *        an auxiliary buffer for data deserialization
-        * @throws IOException
-        *         thrown if an I/O error occurs while reading the request data 
from the input stream
-        */
-       private void delete(final InputStream inputStream, final byte[] buf) 
throws IOException {
-
-               // Receive the job ID
-               BlobServer.readFully(inputStream, buf, 0, JobID.SIZE);
-               final ByteBuffer bb = ByteBuffer.wrap(buf);
-               final JobID jobID = JobID.fromByteBuffer(bb);
-               String key = null;
-
-               final int r = inputStream.read();
-               if (r < 0) {
-                       throw new EOFException();
-               }
-               if (r > 0) {
-                       // Delete individual BLOB
-                       // Receive the key
-                       key = readKey(buf, inputStream);
-
-                       final File blob = 
this.blobServer.getStorageLocation(jobID, key);
-                       blob.delete();
-
-               } else {
-                       // Delete all BLOBs for this job
-                       blobServer.deleteJobDirectory(jobID);
-               }
-       }
-
-       /**
-        * Auxiliary method to silently close a {@link Socket}.
-        * 
-        * @param socket
-        *        the socket to close
-        */
-       static void closeSilently(final Socket socket) {
-
-               try {
-                       if (socket != null) {
-                               socket.close();
-                       }
-               } catch (IOException ioe) {
-               }
-       }
-
-       /**
-        * Reads the key of a BLOB from the given input stream.
-        * 
-        * @param buf
-        *        auxiliary buffer to data deserialization
-        * @param inputStream
-        *        the input stream to read the key from
-        * @return the key of a BLOB
-        * @throws IOException
-        *         thrown if an I/O error occurs while reading the key data 
from the input stream
-        */
-       private static String readKey(final byte[] buf,
-                       final InputStream inputStream) throws IOException {
-
-               final int keyLength = BlobServer.readLength(buf, inputStream);
-               if (keyLength > BlobServer.MAX_KEY_LENGTH) {
-                       throw new IOException("Unexpected key length " + 
keyLength);
-               }
-
-               BlobServer.readFully(inputStream, buf, 0, keyLength);
-
-               return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index 3654f8f..a89a461 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.security.MessageDigest;
 
+import static org.apache.flink.runtime.blob.BlobUtils.readLength;
+
 /**
  * The BLOB input stream is a special implementation of an {@link InputStream} 
to read the results of a GET operation
  * from the BLOB server.
@@ -63,15 +65,13 @@ final class BlobInputStream extends InputStream {
         *        the underlying input stream to read from
         * @param blobKey
         *        the expected BLOB key for content-addressable BLOBs, 
<code>null</code> for non-content-addressable BLOBs.
-        * @param buf
-        *        auxiliary buffer to read the meta data from the BLOB server
         * @throws IOException
         *         throws if an I/O error occurs while reading the BLOB data 
from the BLOB server
         */
-       BlobInputStream(final InputStream wrappedInputStream, final BlobKey 
blobKey, final byte[] buf) throws IOException {
+       BlobInputStream(final InputStream wrappedInputStream, final BlobKey 
blobKey) throws IOException {
                this.wrappedInputStream = wrappedInputStream;
                this.blobKey = blobKey;
-               this.bytesToReceive = BlobServer.readLength(buf, 
wrappedInputStream);
+               this.bytesToReceive = readLength(wrappedInputStream);
                if (this.bytesToReceive < 0) {
                        throw new FileNotFoundException();
                }
@@ -157,7 +157,7 @@ final class BlobInputStream extends InputStream {
 
        @Override
        public int available() throws IOException {
-               return 0;
+               return this.bytesToReceive - this.bytesReceived;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index e3d237d..bd254dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.blob;
 
 import java.io.EOFException;
@@ -142,7 +141,7 @@ public final class BlobKey implements Serializable, 
Comparable<BlobKey> {
                while (bytesRead < BlobKey.SIZE) {
                        final int read = inputStream.read(key, bytesRead, 
BlobKey.SIZE - bytesRead);
                        if (read < 0) {
-                               throw new EOFException();
+                               throw new EOFException("Read an incomplete BLOB 
key");
                        }
                        bytesRead += read;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b27af03..c0e81f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.blob;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -42,61 +43,32 @@ import org.slf4j.LoggerFactory;
  * spawning threads to handle these requests. Furthermore, it takes care of 
creating the directory structure to store
  * the BLOBs or temporarily cache them.
  */
-public final class BlobServer extends Thread implements BlobService {
+public class BlobServer extends Thread implements BlobService {
 
-       /**
-        * The log object used for debugging.
-        */
+       /** The log object used for debugging. */
        private static final Logger LOG = 
LoggerFactory.getLogger(BlobServer.class);
 
-       /**
-        * The buffer size in bytes for network transfers.
-        */
-       static final int BUFFER_SIZE = 4096;
-
-       /**
-        * The maximum key length allowed for storing BLOBs.
-        */
-       static final int MAX_KEY_LENGTH = 64;
-
-       /**
-        * Internal code to identify a PUT operation.
-        */
-       static final byte PUT_OPERATION = 0;
-
-       /**
-        * Internal code to identify a GET operation.
-        */
-       static final byte GET_OPERATION = 1;
-
-       /**
-        * Internal code to identify a DELETE operation.
-        */
-       static final byte DELETE_OPERATION = 2;
-
-       /**
-        * Counter to generate unique names for temporary files.
-        */
+       /** Counter to generate unique names for temporary files. */
        private final AtomicInteger tempFileCounter = new AtomicInteger(0);
 
-       /**
-        * The server socket listening for incoming connections.
-        */
+       /** The server socket listening for incoming connections. */
        private final ServerSocket serverSocket;
 
-       /**
-        * Indicates whether a shutdown of server component has been requested.
-        */
-       private AtomicBoolean shutdownRequested = new AtomicBoolean();
+       /** Indicates whether a shutdown of server component has been 
requested. */
+       private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
        /** Shutdown hook thread to ensure deletion of the storage directory. */
        private final Thread shutdownHook;
 
-       /**
-        * Is the root directory for file storage
-        */
+       /** Is the root directory for file storage */
        private final File storageDir;
 
+       /** Set of currently running threads */
+       private final Set<BlobServerConnection> activeConnections = new 
HashSet<BlobServerConnection>();
+
+       /** The maximum number of concurrent connections */
+       private final int maxConnections;
+
        /**
         * Instantiates a new BLOB server and binds it to a free network port.
         * 
@@ -105,38 +77,57 @@ public final class BlobServer extends Thread implements 
BlobService {
         */
        public BlobServer(Configuration config) throws IOException {
 
+               // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
+               // configure the maximum number of concurrent connections
+               final int maxConnections = config.getInteger(
+                               ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+               if (maxConnections >= 1) {
+                       this.maxConnections = maxConnections;
+               }
+               else {
+                       LOG.warn("Invalid value for maximum connections in BLOB 
server: {}. Using default value of {}",
+                                       maxConnections, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+                       this.maxConnections = 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT;
+               }
+
+               // configure the backlog of connections
+               int backlog = 
config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+               if (backlog < 1) {
+                       LOG.warn("Invalid value for BLOB connection backlog: 
{}. Using default value of {}",
+                                       backlog, 
ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+                       backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
+               }
+
                // Add shutdown hook to delete storage directory
                this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
+               // start the server
                try {
-                       this.serverSocket = new ServerSocket(0);
-
-                       start();
-
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info(String.format("Started BLOB server on 
port %d",
-                                               
this.serverSocket.getLocalPort()));
-                       }
+                       this.serverSocket = new ServerSocket(0, backlog);
                }
                catch (IOException e) {
-                       throw new IOException("Could not create BlobServer with 
random port.", e);
+                       throw new IOException("Could not create BlobServer with 
automatic port choice.", e);
                }
-       }
 
-       /**
-        * Returns the network port the BLOB server is bound to. The return 
value of this method is undefined after the BLOB
-        * server has been shut down.
-        * 
-        * @return the network port the BLOB server is bound to
-        */
-       public int getServerPort() {
-               return this.serverSocket.getLocalPort();
+               // start the server thread
+               setName("BLOB Server listener at " + getPort());
+               setDaemon(true);
+               start();
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Started BLOB server at {}:{} - max concurrent 
requests: {} - max backlog: {}",
+                                       
serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, 
backlog);
+               }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Path Accessors
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * Returns a file handle to the file associated with the given blob key 
on the blob
         * server.
@@ -174,7 +165,7 @@ public final class BlobServer extends Thread implements 
BlobService {
         * 
         * @return a temporary file inside the BLOB server's incoming directory
         */
-       File getTemporaryFilename() {
+       File createTemporaryFilename() {
                return new File(BlobUtils.getIncomingDirectory(storageDir),
                                String.format("temp-%08d", 
tempFileCounter.getAndIncrement()));
        }
@@ -183,7 +174,26 @@ public final class BlobServer extends Thread implements 
BlobService {
        public void run() {
                try {
                        while (!this.shutdownRequested.get()) {
-                               new BlobConnection(this.serverSocket.accept(), 
this).start();
+                               BlobServerConnection conn = new 
BlobServerConnection(serverSocket.accept(), this);
+                               try {
+                                       synchronized (activeConnections) {
+                                               while (activeConnections.size() 
>= maxConnections) {
+                                                       
activeConnections.wait(2000);
+                                               }
+                                               activeConnections.add(conn);
+                                       }
+
+                                       conn.start();
+                                       conn = null;
+                               }
+                               finally {
+                                       if (conn != null) {
+                                               conn.close();
+                                               synchronized 
(activeConnections) {
+                                                       
activeConnections.remove(conn);
+                                               }
+                                       }
+                               }
                        }
                }
                catch (Throwable t) {
@@ -206,6 +216,10 @@ public final class BlobServer extends Thread implements 
BlobService {
                        catch (IOException ioe) {
                                LOG.debug("Error while closing the server 
socket.", ioe);
                        }
+
+                       // wake the thread up, in case it is waiting on some 
operation
+                       interrupt();
+
                        try {
                                join();
                        }
@@ -213,6 +227,16 @@ public final class BlobServer extends Thread implements 
BlobService {
                                LOG.debug("Error while waiting for this thread 
to die.", ie);
                        }
 
+                       synchronized (activeConnections) {
+                               if (!activeConnections.isEmpty()) {
+                                       for (BlobServerConnection conn : 
activeConnections) {
+                                               LOG.debug("Shutting down 
connection " + conn.getName());
+                                               conn.close();
+                                       }
+                                       activeConnections.clear();
+                               }
+                       }
+
                        // Clean up the storage directory
                        try {
                                FileUtils.deleteDirectory(storageDir);
@@ -255,8 +279,7 @@ public final class BlobServer extends Thread implements 
BlobService {
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
requiredBlob);
 
                if (!localFile.exists()) {
-                       throw new FileNotFoundException("File " + 
localFile.getCanonicalPath() + " does " +
-                                       "not exist.");
+                       throw new FileNotFoundException("File " + 
localFile.getCanonicalPath() + " does not exist.");
                } else {
                        return localFile.toURI().toURL();
                }
@@ -266,15 +289,17 @@ public final class BlobServer extends Thread implements 
BlobService {
         * This method deletes the file associated to the blob key if it exists 
in the local storage
         * of the blob server.
         *
-        * @param blobKey associated with the file to be deleted
+        * @param key associated with the file to be deleted
         * @throws IOException
         */
        @Override
-       public void delete(BlobKey blobKey) throws IOException {
-               final File localFile = BlobUtils.getStorageLocation(storageDir, 
blobKey);
+       public void delete(BlobKey key) throws IOException {
+               final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
 
                if (localFile.exists()) {
-                       localFile.delete();
+                       if (!localFile.delete()) {
+                               LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+                       }
                }
        }
 
@@ -285,95 +310,35 @@ public final class BlobServer extends Thread implements 
BlobService {
         */
        @Override
        public int getPort() {
-               return getServerPort();
+               return this.serverSocket.getLocalPort();
        }
 
        /**
-        * Auxiliary method to write the length of an upcoming data chunk to an
-        * output stream.
+        * Tests whether the BLOB server has been requested to shut down.
         *
-        * @param length
-        *        the length of the upcoming data chunk in bytes
-        * @param buf
-        *        the byte buffer to use for the integer serialization
-        * @param outputStream
-        *        the output stream to write the length to
-        * @throws IOException
-        *         thrown if an I/O error occurs while writing to the output
-        *         stream
+        * @return True, if the server has been requested to shut down, false 
otherwise.
         */
-       static void writeLength(final int length, final byte[] buf,
-                                                       final OutputStream 
outputStream) throws IOException {
-
-               buf[0] = (byte) (length & 0xff);
-               buf[1] = (byte) ((length >> 8) & 0xff);
-               buf[2] = (byte) ((length >> 16) & 0xff);
-               buf[3] = (byte) ((length >> 24) & 0xff);
-
-               outputStream.write(buf, 0, 4);
+       public boolean isShutdown() {
+               return this.shutdownRequested.get();
        }
 
        /**
-        * Auxiliary method to read the length of an upcoming data chunk from an
-        * input stream.
-        *
-        * @param buf
-        *        the byte buffer to use for the integer deserialization
-        * @param inputStream
-        *        the input stream to read the length from
-        * @return the length of the upcoming data chunk in bytes
-        * @throws IOException
-        *         thrown if an I/O error occurs while reading from the input
-        *         stream
+        * Access to the server socket, for testing
         */
-       static int readLength(final byte[] buf, final InputStream inputStream)
-                       throws IOException {
-
-               int bytesRead = 0;
-               while (bytesRead < 4) {
-                       final int read = inputStream.read(buf, bytesRead, 4 - 
bytesRead);
-                       if (read < 0) {
-                               throw new EOFException();
-                       }
-                       bytesRead += read;
-               }
-
-               bytesRead = buf[0] & 0xff;
-               bytesRead |= (buf[1] & 0xff) << 8;
-               bytesRead |= (buf[2] & 0xff) << 16;
-               bytesRead |= (buf[3] & 0xff) << 24;
-
-               return bytesRead;
+       ServerSocket getServerSocket() {
+               return this.serverSocket;
        }
 
-       /**
-        * Auxiliary method to read a particular number of bytes from an input 
stream. This method blocks until the
-        * requested number of bytes have been read from the stream. If the 
stream cannot offer enough data, an
-        * {@link EOFException} is thrown.
-        *
-        * @param inputStream
-        *        the input stream to read the data from
-        * @param buf
-        *        the buffer to store the read data
-        * @param off
-        *        the offset inside the buffer
-        * @param len
-        *        the number of bytes to read from the stream
-        * @throws IOException
-        *         thrown if I/O error occurs while reading from the stream or 
the stream cannot offer enough data
-        */
-       static void readFully(final InputStream inputStream,
-                                               final byte[] buf, final int 
off, final int len) throws IOException {
-
-               int bytesRead = 0;
-               while (bytesRead < len) {
+       void unregisterConnection(BlobServerConnection conn) {
+               synchronized (activeConnections) {
+                       activeConnections.remove(conn);
+                       activeConnections.notifyAll();
+               }
+       }
 
-                       final int read = inputStream.read(buf, off + bytesRead, 
len
-                                       - bytesRead);
-                       if (read < 0) {
-                               throw new EOFException();
-                       }
-                       bytesRead += read;
+       List<BlobServerConnection> getCurrentyActiveConnections() {
+               synchronized (activeConnections) {
+                       return new 
ArrayList<BlobServerConnection>(activeConnections);
                }
        }
 }

Reply via email to