Repository: flink
Updated Branches:
  refs/heads/master cd899f3be -> d1ea365ef


[FLINK-3084] [streaming] FsStateBackend backs up very small state directly with 
the metadata.

This closes #1423


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1ea365e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1ea365e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1ea365e

Branch: refs/heads/master
Commit: d1ea365ef1fa979d40532bf3f114fc03284164bc
Parents: cd899f3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 26 18:46:49 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 1 09:44:17 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/StreamStateHandle.java  |  14 +-
 .../state/filesystem/AbstractFileState.java     |   4 +-
 .../filesystem/FileSerializableStateHandle.java |   3 +-
 .../state/filesystem/FileStreamStateHandle.java |   7 +
 .../state/filesystem/FsStateBackend.java        | 250 ++++++++++++++-----
 .../state/filesystem/FsStateBackendFactory.java |  10 +-
 .../state/memory/ByteStreamStateHandle.java     |   8 +
 .../state/memory/SerializedStateHandle.java     |  39 ++-
 .../runtime/state/FileStateBackendTest.java     |  45 ++--
 .../FsCheckpointStateOutputStreamTest.java      | 128 ++++++++++
 .../flink/hdfstests/FileStateBackendTest.java   |  32 +--
 .../src/test/resources/log4j-test.properties    |  31 +++
 12 files changed, 461 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
index 32c601e..891243b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -18,11 +18,19 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.runtime.state.StateHandle;
-
 import java.io.InputStream;
+import java.io.Serializable;
 
 /**
  * A state handle that produces an input stream when resolved.
  */
-public interface StreamStateHandle extends StateHandle<InputStream> {}
+public interface StreamStateHandle extends StateHandle<InputStream> {
+
+       /**
+        * Converts this stream state handle into a state handle that 
de-serializes
+        * the stream into an object using Java's serialization mechanism.
+        *
+        * @return The state handle that automatically de-serializes.
+        */
+       <T extends Serializable> StateHandle<T> toSerializableHandle();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
index d64e2c4..e0a42b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Base class for state that is stored in a file.
  */
@@ -42,7 +44,7 @@ public abstract class AbstractFileState implements 
java.io.Serializable {
         * @param filePath The path to the file that stores the state.
         */
        protected AbstractFileState(Path filePath) {
-               this.filePath = filePath;
+               this.filePath = requireNonNull(filePath);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
index 63336d1..edbbe69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -24,13 +24,14 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.ObjectInputStream;
+import java.io.Serializable;
 
 /**
  * A state handle that points to state stored in a file via Java Serialization.
  * 
  * @param <T> The type of state pointed to by the state handle.
  */
-public class FileSerializableStateHandle<T> extends AbstractFileState 
implements StateHandle<T> {
+public class FileSerializableStateHandle<T extends Serializable> extends 
AbstractFileState implements StateHandle<T> {
 
        private static final long serialVersionUID = -657631394290213622L;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
index f4681ea..b2f2ecc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.InputStream;
+import java.io.Serializable;
 
 /**
  * A state handle that points to state in a file system, accessible as an 
input stream.
@@ -43,4 +45,9 @@ public class FileStreamStateHandle extends AbstractFileState 
implements StreamSt
        public InputStream getState(ClassLoader userCodeClassLoader) throws 
Exception {
                return getFileSystem().open(getFilePath());
        }
+
+       @Override
+       public <T extends Serializable> StateHandle<T> toSerializableHandle() {
+               return new FileSerializableStateHandle<>(getFilePath());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 25c63e5..a6eebf6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -25,6 +25,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +36,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.UUID;
 
 /**
@@ -50,11 +54,24 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FsStateBackend.class);
 
+       /** By default, state smaller than 1024 bytes will not be written to 
files, but
+        * will be stored directly with the metadata */
+       public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024;
+
+       /** Maximum size of state that is stored with the metadata, rather than 
in files */
+       public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+       
+       /** Default size for the write buffer */
+       private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+       
 
        /** The path to the directory for the checkpoint data, including the 
file system
         * description via scheme and optional authority */
        private final Path basePath;
 
+       /** State below this size will be stored as part of the metadata, 
rather than in files */
+       private final int fileStateThreshold;
+       
        /** The directory (job specific) into this initialized instance of the 
backend stores its data */
        private transient Path checkpointDirectory;
 
@@ -112,10 +129,32 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
         * classpath.
         *
         * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
-        *                          and the path to teh checkpoint data 
directory.
+        *                          and the path to the checkpoint data 
directory.
         * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
         */
        public FsStateBackend(URI checkpointDataUri) throws IOException {
+               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD);
+       }
+
+       /**
+        * Creates a new state backend that stores its checkpoint data in the 
file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param fileStateSizeThreshold State up to this size will be stored 
as part of the metadata,
+        *                             rather than in files
+        * 
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public FsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws IOException {
                final String scheme = checkpointDataUri.getScheme();
                final String path = checkpointDataUri.getPath();
 
@@ -131,6 +170,13 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
                if (path.length() == 0 || path.equals("/")) {
                        throw new IllegalArgumentException("Cannot use the root 
directory for checkpoints.");
                }
+               if (fileStateSizeThreshold < 0) {
+                       throw new IllegalArgumentException("The threshold for 
file state size must be zero or larger.");
+               }
+               if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
+                       throw new IllegalArgumentException("The threshold for 
file state size cannot be larger than " +
+                               MAX_FILE_STATE_THRESHOLD);
+               }
 
                // we do a bit of work to make sure that the URI for the 
filesystem refers to exactly the same
                // (distributed) filesystem on all hosts and includes full 
host/port information, even if the
@@ -153,6 +199,8 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
                                        String.format("Cannot create file 
system URI for checkpointDataUri %s and filesystem URI %s",
                                                        checkpointDataUri, 
fsURI), e);
                }
+               
+               this.fileStateThreshold = fileStateSizeThreshold;
        }
 
        /**
@@ -176,6 +224,18 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
        }
 
        /**
+        * Gets the size (in bytes) above which the state will written to 
files. State whose size
+        * is below this threshold will be directly stored with the metadata
+        * (the state handles), rather than in files. This threshold helps to 
prevent an accumulation
+        * of small files for small states.
+        * 
+        * @return The threshold (in bytes) above which state is written to 
files.
+        */
+       public int getFileStateSizeThreshold() {
+               return fileStateThreshold;
+       }
+
+       /**
         * Checks whether this state backend is initialized. Note that 
initialization does not carry
         * across serialization. After each serialization, the state backend 
needs to be initialized.
         *
@@ -248,54 +308,26 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
                        S state, long checkpointID, long timestamp) throws 
Exception
        {
                checkFileSystemInitialized();
-
-               // make sure the directory for that specific checkpoint exists
-               final Path checkpointDir = 
createCheckpointDirPath(checkpointID);
-               filesystem.mkdirs(checkpointDir);
-
-
-               Exception latestException = null;
-
-               for (int attempt = 0; attempt < 10; attempt++) {
-                       Path targetPath = new Path(checkpointDir, 
UUID.randomUUID().toString());
-                       FSDataOutputStream outStream;
-                       try {
-                               outStream = filesystem.create(targetPath, 
false);
-                       }
-                       catch (Exception e) {
-                               latestException = e;
-                               continue;
-                       }
-
-                       try (ObjectOutputStream os = new 
ObjectOutputStream(outStream)) {
-                               os.writeObject(state);
-                       }
-                       return new FileSerializableStateHandle<S>(targetPath);
+               
+               Path checkpointDir = createCheckpointDirPath(checkpointID);
+               int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
+
+               FsCheckpointStateOutputStream stream = 
+                       new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
+               
+               try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
+                       os.writeObject(state);
+                       return 
stream.closeAndGetHandle().toSerializableHandle();
                }
-
-               throw new Exception("Could not open output stream for state 
backend", latestException);
        }
 
        @Override
        public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
                checkFileSystemInitialized();
 
-               final Path checkpointDir = 
createCheckpointDirPath(checkpointID);
-               filesystem.mkdirs(checkpointDir);
-
-               Exception latestException = null;
-
-               for (int attempt = 0; attempt < 10; attempt++) {
-                       Path targetPath = new Path(checkpointDir, 
UUID.randomUUID().toString());
-                       try {
-                               FSDataOutputStream outStream = 
filesystem.create(targetPath, false);
-                               return new 
FsCheckpointStateOutputStream(outStream, targetPath, filesystem);
-                       }
-                       catch (Exception e) {
-                               latestException = e;
-                       }
-               }
-               throw new Exception("Could not open output stream for state 
backend", latestException);
+               Path checkpointDir = createCheckpointDirPath(checkpointID);
+               int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
+               return new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
        }
 
        // 
------------------------------------------------------------------------
@@ -329,34 +361,104 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
         */
        public static final class FsCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
 
-               private final FSDataOutputStream outStream;
+               private final byte[] writeBuffer;
 
-               private final Path filePath;
+               private int pos;
 
-               private final FileSystem fs;
+               private FSDataOutputStream outStream;
+               
+               private final int localStateThreshold;
 
+               private final Path basePath;
+
+               private final FileSystem fs;
+               
+               private Path statePath;
+               
                private boolean closed;
 
-               FsCheckpointStateOutputStream(FSDataOutputStream outStream, 
Path filePath, FileSystem fs) {
-                       this.outStream = outStream;
-                       this.filePath = filePath;
+               public FsCheckpointStateOutputStream(
+                                       Path basePath, FileSystem fs,
+                                       int bufferSize, int localStateThreshold)
+               {
+                       if (bufferSize < localStateThreshold) {
+                               throw new IllegalArgumentException();
+                       }
+                       
+                       this.basePath = basePath;
                        this.fs = fs;
+                       this.writeBuffer = new byte[bufferSize];
+                       this.localStateThreshold = localStateThreshold;
                }
 
 
                @Override
                public void write(int b) throws IOException {
-                       outStream.write(b);
+                       if (pos >= writeBuffer.length) {
+                               flush();
+                       }
+                       writeBuffer[pos++] = (byte) b;
                }
 
                @Override
                public void write(byte[] b, int off, int len) throws 
IOException {
-                       outStream.write(b, off, len);
+                       if (len < writeBuffer.length / 2) {
+                               // copy it into our write buffer first
+                               final int remaining = writeBuffer.length - pos;
+                               if (len > remaining) {
+                                       // copy as much as fits
+                                       System.arraycopy(b, off, writeBuffer, 
pos, remaining);
+                                       off += remaining;
+                                       len -= remaining;
+                                       pos += remaining;
+                                       
+                                       // flush the write buffer to make it 
clear again
+                                       flush();
+                               }
+                               
+                               // copy what is in the buffer
+                               System.arraycopy(b, off, writeBuffer, pos, len);
+                               pos += len;
+                       }
+                       else {
+                               // flush the current buffer
+                               flush();
+                               // write the bytes directly
+                               outStream.write(b, off, len);
+                       }
                }
 
                @Override
                public void flush() throws IOException {
-                       outStream.flush();
+                       if (!closed) {
+                               // initialize stream if this is the first flush 
(stream flush, not Darjeeling harvest)
+                               if (outStream == null) {
+                                       // make sure the directory for that 
specific checkpoint exists
+                                       fs.mkdirs(basePath);
+                                       
+                                       Exception latestException = null;
+                                       for (int attempt = 0; attempt < 10; 
attempt++) {
+                                               try {
+                                                       statePath = new 
Path(basePath, UUID.randomUUID().toString());
+                                                       outStream = 
fs.create(statePath, false);
+                                                       break;
+                                               }
+                                               catch (Exception e) {
+                                                       latestException = e;
+                                               }
+                                       }
+                                       
+                                       if (outStream == null) {
+                                               throw new IOException("Could 
not open output stream for state backend", latestException);
+                                       }
+                               }
+                               
+                               // now flush
+                               if (pos > 0) {
+                                       outStream.write(writeBuffer, 0, pos);
+                                       pos = 0;
+                               }
+                       }
                }
 
                /**
@@ -369,25 +471,44 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
                        synchronized (this) {
                                if (!closed) {
                                        closed = true;
-                                       try {
-                                               outStream.close();
-                                               fs.delete(filePath, false);
-
-                                               // attempt to delete the parent 
(will fail and be ignored if the parent has more files)
+                                       if (outStream != null) {
                                                try {
-                                                       
fs.delete(filePath.getParent(), false);
-                                               } catch (IOException ignored) {}
-                                       }
-                                       catch (Exception e) {
-                                               LOG.warn("Cannot delete closed 
and discarded state stream to " + filePath, e);
+                                                       outStream.close();
+                                                       fs.delete(statePath, 
false);
+       
+                                                       // attempt to delete 
the parent (will fail and be ignored if the parent has more files)
+                                                       try {
+                                                               
fs.delete(basePath, false);
+                                                       } catch (IOException 
ignored) {}
+                                               }
+                                               catch (Exception e) {
+                                                       LOG.warn("Cannot delete 
closed and discarded state stream for " + statePath, e);
+                                               }
                                        }
                                }
                        }
                }
 
                @Override
-               public FileStreamStateHandle closeAndGetHandle() throws 
IOException {
-                       return new FileStreamStateHandle(closeAndGetPath());
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       synchronized (this) {
+                               if (!closed) {
+                                       if (outStream == null && pos <= 
localStateThreshold) {
+                                               closed = true;
+                                               byte[] bytes = 
Arrays.copyOf(writeBuffer, pos);
+                                               return new 
ByteStreamStateHandle(bytes);
+                                       }
+                                       else {
+                                               flush();
+                                               outStream.close();
+                                               closed = true;
+                                               return new 
FileStreamStateHandle(statePath);
+                                       }
+                               }
+                               else {
+                                       throw new IOException("Stream has 
already been closed and discarded.");
+                               }
+                       }
                }
 
                /**
@@ -399,8 +520,9 @@ public class FsStateBackend extends 
StateBackend<FsStateBackend> {
                        synchronized (this) {
                                if (!closed) {
                                        closed = true;
+                                       flush();
                                        outStream.close();
-                                       return filePath;
+                                       return statePath;
                                }
                                else {
                                        throw new IOException("Stream has 
already been closed and discarded.");

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index e687f7f..042700c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -31,12 +31,18 @@ public class FsStateBackendFactory implements 
StateBackendFactory<FsStateBackend
        
        /** The key under which the config stores the directory where 
checkpoints should be stored */
        public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
+
+       /** The key under which the config stores the threshold for state to be 
store in memory,
+        * rather than in files */
+       public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
        
        
        @Override
        public FsStateBackend createFromConfig(Configuration config) throws 
Exception {
                String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
-
+               int memoryThreshold = config.getInteger(
+                       MEMORY_THRESHOLD_CONF_KEY, 
FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
+               
                if (checkpointDirURI == null) {
                        throw new IllegalConfigurationException(
                                        "Cannot create the file system state 
backend: The configuration does not specify the " +
@@ -45,7 +51,7 @@ public class FsStateBackendFactory implements 
StateBackendFactory<FsStateBackend
                
                try {
                        Path path = new Path(checkpointDirURI);
-                       return new FsStateBackend(path);
+                       return new FsStateBackend(path.toUri(), 
memoryThreshold);
                }
                catch (IllegalArgumentException e) {
                        throw new Exception("Cannot initialize File System 
State Backend with URI '"

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 29762f7..def8a36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.io.Serializable;
 
 /**
  * A state handle that contains stream state in a byte array.
@@ -49,4 +51,10 @@ public final class ByteStreamStateHandle implements 
StreamStateHandle {
 
        @Override
        public void discardState() {}
+
+       
+       @Override
+       public <T extends Serializable> StateHandle<T> toSerializableHandle() {
+               return new SerializedStateHandle<T>(data);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
index c488dc9..54b7175f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
@@ -19,26 +19,57 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * A state handle that represents its state in serialized form as bytes.
  *
  * @param <T> The type of state represented by this state handle.
  */
-public class SerializedStateHandle<T> extends SerializedValue<T> implements 
StateHandle<T> {
+public class SerializedStateHandle<T extends Serializable> implements 
StateHandle<T> {
        
        private static final long serialVersionUID = 4145685722538475769L;
 
+       /** The serialized data */
+       private final byte[] serializedData;
+       
+       /**
+        * Creates a new serialized state handle, eagerly serializing the given 
state object.
+        * 
+        * @param value The state object.
+        * @throws IOException Thrown, if the serialization fails.
+        */
        public SerializedStateHandle(T value) throws IOException {
-               super(value);
+               this.serializedData = value == null ? null : 
InstantiationUtil.serializeObject(value);
+       }
+
+       /**
+        * Creates a new serialized state handle, based in the given already 
serialized data.
+        * 
+        * @param serializedData The serialized data.
+        */
+       public SerializedStateHandle(byte[] serializedData) {
+               this.serializedData = serializedData;
        }
        
        @Override
        public T getState(ClassLoader classLoader) throws Exception {
-               return deserializeValue(classLoader);
+               if (classLoader == null) {
+                       throw new NullPointerException();
+               }
+
+               return serializedData == null ? null : 
InstantiationUtil.<T>deserializeObject(serializedData, classLoader);
+       }
+
+       /**
+        * Gets the size of the serialized state.
+        * @return The size of the serialized state.
+        */
+       public int getSizeOfSerializedState() {
+               return serializedData.length;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 37ccde2..d1298b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -34,6 +34,7 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -46,9 +47,9 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
 
 import org.junit.Test;
 
@@ -104,7 +105,8 @@ public class FileStateBackendTest {
        public void testSerializableState() {
                File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
                try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
+                               new FsStateBackend(tempDir.toURI(), 40));
                        backend.initializeForJob(new DummyEnvironment("test", 
0, 0));
 
                        File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
@@ -116,16 +118,13 @@ public class FileStateBackendTest {
                        StateHandle<String> handle1 = 
backend.checkpointStateSerializable(state1, 439568923746L, 
System.currentTimeMillis());
                        StateHandle<String> handle2 = 
backend.checkpointStateSerializable(state2, 439568923746L, 
System.currentTimeMillis());
                        StateHandle<Integer> handle3 = 
backend.checkpointStateSerializable(state3, 439568923746L, 
System.currentTimeMillis());
-
-                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       
                        assertEquals(state1, 
handle1.getState(getClass().getClassLoader()));
                        handle1.discardState();
-
-                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       
                        assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
                        handle2.discardState();
-
-                       assertFalse(isDirectoryEmpty(checkpointDir));
+                       
                        assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
                        handle3.discardState();
 
@@ -144,7 +143,10 @@ public class FileStateBackendTest {
        public void testStateOutputStream() {
                File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
                try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
+                       // the state backend has a very low in-mem state 
threshold (15 bytes)
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
+                               new FsStateBackend(tempDir.toURI(), 15));
+                       
                        backend.initializeForJob(new DummyEnvironment("test", 
0, 0));
 
                        File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
@@ -173,16 +175,16 @@ public class FileStateBackendTest {
                        stream2.write(state2);
                        stream3.write(state3);
 
-                       FileStreamStateHandle handle1 = 
stream1.closeAndGetHandle();
-                       FileStreamStateHandle handle2 = 
stream2.closeAndGetHandle();
-                       FileStreamStateHandle handle3 = 
stream3.closeAndGetHandle();
+                       FileStreamStateHandle handle1 = (FileStreamStateHandle) 
stream1.closeAndGetHandle();
+                       ByteStreamStateHandle handle2 = (ByteStreamStateHandle) 
stream2.closeAndGetHandle();
+                       ByteStreamStateHandle handle3 = (ByteStreamStateHandle) 
stream3.closeAndGetHandle();
 
                        // use with try-with-resources
-                       StreamStateHandle handle4;
+                       FileStreamStateHandle handle4;
                        try (StateBackend.CheckpointStateOutputStream stream4 =
                                        
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
                                stream4.write(state4);
-                               handle4 = stream4.closeAndGetHandle();
+                               handle4 = (FileStreamStateHandle) 
stream4.closeAndGetHandle();
                        }
 
                        // close before accessing handle
@@ -204,13 +206,9 @@ public class FileStateBackendTest {
 
                        
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
                        handle2.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureLocalFileDeleted(handle2.getFilePath());
 
                        
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
                        handle3.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureLocalFileDeleted(handle3.getFilePath());
 
                        
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
                        handle4.discardState();
@@ -440,7 +438,7 @@ public class FileStateBackendTest {
 
        private static boolean isDirectoryEmpty(File directory) {
                String[] nested = directory.list();
-               return  nested == null || nested.length == 0;
+               return nested == null || nested.length == 0;
        }
 
        private static String localFileUri(File path) {
@@ -449,7 +447,14 @@ public class FileStateBackendTest {
 
        private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {
                byte[] holder = new byte[data.length];
-               assertEquals("not enough data", holder.length, is.read(holder));
+               int numBytesRead = is.read(holder);
+               
+               if (holder.length == 0) {
+                       assertTrue("stream not empty", numBytesRead == 0 || 
numBytesRead == -1);
+               } else {
+                       assertEquals("not enough data", holder.length, 
numBytesRead);
+               }
+               
                assertEquals("too much data", -1, is.read());
                assertArrayEquals("wrong data", data, holder);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
new file mode 100644
index 0000000..66a7271
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class FsCheckpointStateOutputStreamTest {
+       
+       /** The temp dir, obtained in a platform neutral way */
+       private static final Path TEMP_DIR_PATH = new Path(new 
File(System.getProperty("java.io.tmpdir")).toURI());
+       
+       
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrongParameters() {
+               // this should fail
+               new FsStateBackend.FsCheckpointStateOutputStream(
+                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 
5000);
+       }
+
+
+       @Test
+       public void testEmptyState() throws Exception {
+               StateBackend.CheckpointStateOutputStream stream = new 
FsStateBackend.FsCheckpointStateOutputStream(
+                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 
512);
+               
+               StreamStateHandle handle = stream.closeAndGetHandle();
+               assertTrue(handle instanceof ByteStreamStateHandle);
+               
+               InputStream inStream = 
handle.getState(ClassLoader.getSystemClassLoader());
+               assertEquals(-1, inStream.read());
+       }
+       
+       @Test
+       public void testStateBlowMemThreshold() throws Exception {
+               runTest(222, 999, 512, false);
+       }
+
+       @Test
+       public void testStateOneBufferAboveThreshold() throws Exception {
+               runTest(896, 1024, 15, true);
+       }
+
+       @Test
+       public void testStateAboveMemThreshold() throws Exception {
+               runTest(576446, 259, 17, true);
+       }
+       
+       @Test
+       public void testZeroThreshold() throws Exception {
+               runTest(16678, 4096, 0, true);
+       }
+       
+       private void runTest(int numBytes, int bufferSize, int threshold, 
boolean expectFile) throws Exception {
+               StateBackend.CheckpointStateOutputStream stream = 
+                       new FsStateBackend.FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
bufferSize, threshold);
+               
+               Random rnd = new Random();
+               byte[] original = new byte[numBytes];
+               byte[] bytes = new byte[original.length];
+
+               rnd.nextBytes(original);
+               System.arraycopy(original, 0, bytes, 0, original.length);
+
+               // the test writes a mixture of writing individual bytes and 
byte arrays
+               int pos = 0;
+               while (pos < bytes.length) {
+                       boolean single = rnd.nextBoolean();
+                       if (single) {
+                               stream.write(bytes[pos++]);
+                       }
+                       else {
+                               int num = rnd.nextInt(Math.min(10, bytes.length 
- pos));
+                               stream.write(bytes, pos, num);
+                               pos += num;
+                       }
+               }
+
+               StreamStateHandle handle = stream.closeAndGetHandle();
+               if (expectFile) {
+                       assertTrue(handle instanceof FileStreamStateHandle);
+               } else {
+                       assertTrue(handle instanceof ByteStreamStateHandle);
+               }
+
+               // make sure the writing process did not alter the original 
byte array
+               assertArrayEquals(original, bytes);
+
+               InputStream inStream = 
handle.getState(ClassLoader.getSystemClassLoader());
+               byte[] validation = new byte[bytes.length];
+               int bytesRead = inStream.read(validation);
+
+               assertEquals(numBytes, bytesRead);
+               assertEquals(-1, inStream.read());
+
+               assertArrayEquals(bytes, validation);
+               
+               handle.discardState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
 
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 4fb6820..ce8298b 100644
--- 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ 
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
@@ -42,6 +43,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Random;
 import java.util.UUID;
 
@@ -146,9 +148,9 @@ public class FileStateBackendTest {
 
        @Test
        public void testSerializableState() {
-               
                try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
+                               new FsStateBackend(randomHdfsFileUri(), 40));
                        backend.initializeForJob(new DummyEnvironment("test", 
0, 0));
 
                        Path checkpointDir = backend.getCheckpointDirectory();
@@ -161,15 +163,12 @@ public class FileStateBackendTest {
                        StateHandle<String> handle2 = 
backend.checkpointStateSerializable(state2, 439568923746L, 
System.currentTimeMillis());
                        StateHandle<Integer> handle3 = 
backend.checkpointStateSerializable(state3, 439568923746L, 
System.currentTimeMillis());
 
-                       assertFalse(isDirectoryEmpty(checkpointDir));
                        assertEquals(state1, 
handle1.getState(getClass().getClassLoader()));
                        handle1.discardState();
 
-                       assertFalse(isDirectoryEmpty(checkpointDir));
                        assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
                        handle2.discardState();
 
-                       assertFalse(isDirectoryEmpty(checkpointDir));
                        assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
                        handle3.discardState();
 
@@ -184,7 +183,8 @@ public class FileStateBackendTest {
        @Test
        public void testStateOutputStream() {
                try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
+                               new FsStateBackend(randomHdfsFileUri(), 15));
                        backend.initializeForJob(new DummyEnvironment("test", 
0, 0));
 
                        Path checkpointDir = backend.getCheckpointDirectory();
@@ -213,9 +213,9 @@ public class FileStateBackendTest {
                        stream2.write(state2);
                        stream3.write(state3);
 
-                       FileStreamStateHandle handle1 = 
stream1.closeAndGetHandle();
-                       FileStreamStateHandle handle2 = 
stream2.closeAndGetHandle();
-                       FileStreamStateHandle handle3 = 
stream3.closeAndGetHandle();
+                       FileStreamStateHandle handle1 = (FileStreamStateHandle) 
stream1.closeAndGetHandle();
+                       ByteStreamStateHandle handle2 = (ByteStreamStateHandle) 
stream2.closeAndGetHandle();
+                       ByteStreamStateHandle handle3 = (ByteStreamStateHandle) 
stream3.closeAndGetHandle();
 
                        // use with try-with-resources
                        StreamStateHandle handle4;
@@ -244,13 +244,9 @@ public class FileStateBackendTest {
 
                        
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
                        handle2.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureFileDeleted(handle2.getFilePath());
 
                        
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
                        handle3.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureFileDeleted(handle3.getFilePath());
 
                        
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
                        handle4.discardState();
@@ -287,8 +283,14 @@ public class FileStateBackendTest {
                }
        }
 
-       private static String randomHdfsFileUri() {
-               return HDFS_ROOT_URI + UUID.randomUUID().toString();
+       private static URI randomHdfsFileUri() {
+               String uriString = HDFS_ROOT_URI + UUID.randomUUID().toString();
+               try {
+                       return new URI(uriString);
+               }
+               catch (URISyntaxException e) {
+                       throw new RuntimeException("Invalid test directory URI: 
" + uriString, e);
+               }
        }
 
        private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/d1ea365e/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties 
b/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..f533ba2
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
+# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
+# we provide a log4j.properties file ourselves.
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

Reply via email to