http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ae4758f..1cd052c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -51,14 +51,18 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import 
org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
@@ -875,24 +879,34 @@ public abstract class StreamExecutionEnvironment {
        }
 
        /**
-        * Creates a data stream that represents the Strings produced by 
reading the given file line wise. The file will be
-        * read with the system's default character set.
+        * Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
+        * line. The file will be read with the system's default character set.
+        *
+        * <p>
+        * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, 
creates the
+        * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed,
+        * forwards them to the downstream {@link ContinuousFileReaderOperator 
readers} to read the actual data,
+        * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint
+        * barriers are going to be forwarded after the source exits, thus 
having no checkpoints after that point.
         *
         * @param filePath
         *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path").
         * @return The data stream that represents the data read from the given 
file as text lines
         */
        public DataStreamSource<String> readTextFile(String filePath) {
-               Preconditions.checkNotNull(filePath, "The file path may not be 
null.");
-               TextInputFormat format = new TextInputFormat(new 
Path(filePath));
-               TypeInformation<String> typeInfo = 
BasicTypeInfo.STRING_TYPE_INFO;
-
-               return createInput(format, typeInfo, "Read Text File Source");
+               return readTextFile(filePath, "UTF-8");
        }
 
        /**
-        * Creates a data stream that represents the Strings produced by 
reading the given file line wise. The {@link
-        * java.nio.charset.Charset} with the given name will be used to read 
the files.
+        * Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
+        * line. The {@link java.nio.charset.Charset} with the given name will 
be used to read the files.
+        *
+        * <p>
+        * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, 
creates the
+        * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed,
+        * forwards them to the downstream {@link ContinuousFileReaderOperator 
readers} to read the actual data,
+        * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint
+        * barriers are going to be forwarded after the source exits, thus 
having no checkpoints after that point.
         *
         * @param filePath
         *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
@@ -902,15 +916,32 @@ public abstract class StreamExecutionEnvironment {
         */
        public DataStreamSource<String> readTextFile(String filePath, String 
charsetName) {
                Preconditions.checkNotNull(filePath, "The file path may not be 
null.");
+
                TextInputFormat format = new TextInputFormat(new 
Path(filePath));
                TypeInformation<String> typeInfo = 
BasicTypeInfo.STRING_TYPE_INFO;
                format.setCharsetName(charsetName);
 
-               return createInput(format, typeInfo, "Read Text File Source");
+               return readFile(format, filePath, 
FileProcessingMode.PROCESS_ONCE, -1,
+                       FilePathFilter.createDefaultFilter(), typeInfo);
        }
 
        /**
-        * Reads the given file with the given imput format.
+        * Reads the contents of the user-specified {@code filePath} based on 
the given {@link FileInputFormat}.
+        *
+        * <p>
+        * Since all data streams need specific information about their types, 
this method needs to determine the
+        * type of the data produced by the input format. It will attempt to 
determine the data type by reflection,
+        * unless the input format implements the {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+        * In the latter case, this method will invoke the
+        * {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} 
method to determine data
+        * type produced by the input format.
+        *
+        * <p>
+        * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, 
creates the
+        * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed,
+        * forwards them to the downstream {@link ContinuousFileReaderOperator 
readers} to read the actual data,
+        * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint
+        * barriers are going to be forwarded after the source exits, thus 
having no checkpoints after that point.
         *
         * @param filePath
         *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
@@ -920,19 +951,64 @@ public abstract class StreamExecutionEnvironment {
         *              The type of the returned data stream
         * @return The data stream that represents the data read from the given 
file
         */
-       public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat, String filePath) {
-               Preconditions.checkNotNull(inputFormat, "InputFormat must not 
be null.");
-               Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
+       public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
+                                                                               
                String filePath) {
+               return readFile(inputFormat, filePath, 
FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter());
+       }
 
-               inputFormat.setFilePath(new Path(filePath));
+       /**
+        * Reads the contents of the user-specified {@code filePath} based on 
the given {@link FileInputFormat}. Depending
+        * on the provided {@link FileProcessingMode}, the source may 
periodically monitor (every {@code interval} ms) the path
+        * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or 
process once the data currently in the path and
+        * exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the 
path contains files not to be processed, the user
+        * can specify a custom {@link FilePathFilter}. As a default 
implementation you can use
+        * {@link FilePathFilter#createDefaultFilter()}.
+        *
+        * <p>
+        * Since all data streams need specific information about their types, 
this method needs to determine the
+        * type of the data produced by the input format. It will attempt to 
determine the data type by reflection,
+        * unless the input format implements the {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+        * In the latter case, this method will invoke the
+        * {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} 
method to determine data
+        * type produced by the input format.
+        *
+        * <p>
+        * <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to 
{@link FileProcessingMode#PROCESS_ONCE},
+        * the source monitors the path <b>once</b>, creates the {@link 
org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+        * to be processed, forwards them to the downstream {@link 
ContinuousFileReaderOperator readers} to read the actual data,
+        * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint barriers
+        * are going to be forwarded after the source exits, thus having no 
checkpoints after that point.
+        *
+        * @param inputFormat
+        *              The input format used to create the data stream
+        * @param filePath
+        *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
+        * @param watchType
+        *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
+        * @param interval
+        *              In the case of periodic path monitoring, this specifies 
the interval (in millis) between consecutive path scans
+        * @param filter
+        *              The files to be excluded from the processing
+        * @param <OUT>
+        *              The type of the returned data stream
+        * @return The data stream that represents the data read from the given 
file
+        */
+       @PublicEvolving
+       public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
+                                                                               
                String filePath,
+                                                                               
                FileProcessingMode watchType,
+                                                                               
                long interval,
+                                                                               
                FilePathFilter filter) {
+
+               TypeInformation<OUT> typeInformation;
                try {
-                       return createInput(inputFormat, 
TypeExtractor.getInputFormatTypes(inputFormat), "Read File source");
+                       typeInformation = 
TypeExtractor.getInputFormatTypes(inputFormat);
                } catch (Exception e) {
-                       throw new InvalidProgramException("The type returned by 
the input format could not be automatically " +
-                                       "determined. " +
-                                       "Please specify the TypeInformation of 
the produced type explicitly by using the " +
-                                       "'createInput(InputFormat, 
TypeInformation)' method instead.");
+                       throw new InvalidProgramException("The type returned by 
the input format could not be " +
+                               "automatically determined. Please specify the 
TypeInformation of the produced type " +
+                               "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
                }
+               return readFile(inputFormat, filePath, watchType, interval, 
filter, typeInformation);
        }
 
        /**
@@ -952,15 +1028,62 @@ public abstract class StreamExecutionEnvironment {
         *              of files.
         * @return The DataStream containing the given directory.
         */
+       @Deprecated
        public DataStream<String> readFileStream(String filePath, long 
intervalMillis,
-                                                                               
        WatchType watchType) {
+                                                                               
        FileMonitoringFunction.WatchType watchType) {
                DataStream<Tuple3<String, Long, Long>> source = addSource(new 
FileMonitoringFunction(
-                               filePath, intervalMillis, watchType), "Read 
File Stream source");
+                       filePath, intervalMillis, watchType), "Read File Stream 
source");
 
                return source.flatMap(new FileReadFunction());
        }
 
        /**
+        * Reads the contents of the user-specified {@code filePath} based on 
the given {@link FileInputFormat}.
+        * Depending on the provided {@link FileProcessingMode}, the source may 
periodically monitor (every {@code interval} ms)
+        * the path for new data ({@link 
FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently 
in the
+        * path and exit ({@link FileProcessingMode#PROCESS_ONCE}). In 
addition, if the path contains files not to be processed,
+        * the user can specify a custom {@link FilePathFilter}. As a default 
implementation you can use
+        * {@link FilePathFilter#createDefaultFilter()}.
+        *
+        * <p>
+        *  <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to 
{@link FileProcessingMode#PROCESS_ONCE},
+        * the source monitors the path <b>once</b>, creates the {@link 
org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+        * to be processed, forwards them to the downstream {@link 
ContinuousFileReaderOperator readers} to read the actual data,
+        * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint barriers
+        * are going to be forwarded after the source exits, thus having no 
checkpoints after that point.
+        *
+        * @param inputFormat
+        *              The input format used to create the data stream
+        * @param filePath
+        *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
+        * @param watchType
+        *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
+        * @param filter
+        *              The files to be excluded from the processing
+        * @param typeInformation
+        *              Information on the type of the elements in the output 
stream
+        * @param interval
+        *              In the case of periodic path monitoring, this specifies 
the interval (in millis) between consecutive path scans
+        * @param <OUT>
+        *              The type of the returned data stream
+        * @return The data stream that represents the data read from the given 
file
+        */
+       @PublicEvolving
+       public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
+                                                                               
                String filePath,
+                                                                               
                FileProcessingMode watchType,
+                                                                               
                long interval,
+                                                                               
                FilePathFilter filter,
+                                                                               
                TypeInformation<OUT> typeInformation) {
+
+               Preconditions.checkNotNull(inputFormat, "InputFormat must not 
be null.");
+               Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
+
+               inputFormat.setFilePath(filePath);
+               return createFileInput(inputFormat, typeInformation, "Custom 
File Source", watchType, filter, interval);
+       }
+
+       /**
         * Creates a new data stream that contains the strings received 
infinitely from a socket. Received strings are
         * decoded by the system's default character set. On the termination of 
the socket server connection retries can be
         * initiated.
@@ -1026,12 +1149,20 @@ public abstract class StreamExecutionEnvironment {
        /**
         * Generic method to create an input data stream with {@link 
org.apache.flink.api.common.io.InputFormat}.
         * <p>
-        * Since all data streams need specific information about their types, 
this method needs to determine the type of
-        * the data produced by the input format. It will attempt to determine 
the data type by reflection, unless the
-        * input
-        * format implements the {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. In the 
latter
-        * case, this method will invoke the {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()}
-        * method to determine data type produced by the input format.
+        * Since all data streams need specific information about their types, 
this method needs to determine the
+        * type of the data produced by the input format. It will attempt to 
determine the data type by reflection,
+        * unless the input format implements the {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+        * In the latter case, this method will invoke the
+        * {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} 
method to determine data
+        * type produced by the input format.
+        *
+        * <p>
+        * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link 
FileInputFormat}, the source
+        * (which executes the {@link ContinuousFileMonitoringFunction}) 
monitors the path, creates the
+        * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed, forwards
+        * them to the downstream {@link ContinuousFileReaderOperator} to read 
the actual data, and exits,
+        * without waiting for the readers to finish reading. This implies that 
no more checkpoint
+        * barriers are going to be forwarded after the source exits, thus 
having no checkpoints.
         *
         * @param inputFormat
         *              The input format used to create the data stream
@@ -1041,35 +1172,84 @@ public abstract class StreamExecutionEnvironment {
         */
        @PublicEvolving
        public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> 
inputFormat) {
-               return createInput(inputFormat, 
TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
+               return createInput(inputFormat, 
TypeExtractor.getInputFormatTypes(inputFormat));
        }
 
        /**
         * Generic method to create an input data stream with {@link 
org.apache.flink.api.common.io.InputFormat}.
         * <p>
-        * The data stream is typed to the given TypeInformation. This method 
is intended for input formats where the
-        * return
-        * type cannot be determined by reflection analysis, and that do not 
implement the
+        * The data stream is typed to the given TypeInformation. This method 
is intended for input formats
+        * where the return type cannot be determined by reflection analysis, 
and that do not implement the
         * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} 
interface.
         *
+        * <p>
+        * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link 
FileInputFormat}, the source
+        * (which executes the {@link ContinuousFileMonitoringFunction}) 
monitors the path, creates the
+        * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed, forwards
+        * them to the downstream {@link ContinuousFileReaderOperator} to read 
the actual data, and exits,
+        * without waiting for the readers to finish reading. This implies that 
no more checkpoint
+        * barriers are going to be forwarded after the source exits, thus 
having no checkpoints.
+        *
         * @param inputFormat
         *              The input format used to create the data stream
+        * @param typeInfo
+        *              The information about the type of the output type
         * @param <OUT>
         *              The type of the returned data stream
         * @return The data stream that represents the data created by the 
input format
         */
        @PublicEvolving
        public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> 
inputFormat, TypeInformation<OUT> typeInfo) {
-               return createInput(inputFormat, typeInfo, "Custom File source");
+               DataStreamSource<OUT> source;
+
+               if (inputFormat instanceof FileInputFormat) {
+                       FileInputFormat<OUT> format = (FileInputFormat<OUT>) 
inputFormat;
+                       source = createFileInput(format, typeInfo, "Custom File 
source",
+                               FileProcessingMode.PROCESS_ONCE,
+                               FilePathFilter.createDefaultFilter(),  -1);
+               } else {
+                       source = createInput(inputFormat, typeInfo, "Custom 
Source");
+               }
+               return source;
        }
 
-       // private helper for passing different names
        private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> 
inputFormat,
-                       TypeInformation<OUT> typeInfo, String sourceName) {
-               FileSourceFunction<OUT> function = new 
FileSourceFunction<>(inputFormat, typeInfo);
+                                                                               
                        TypeInformation<OUT> typeInfo,
+                                                                               
                        String sourceName) {
+
+               InputFormatSource<OUT> function = new 
InputFormatSource<>(inputFormat, typeInfo);
                return addSource(function, sourceName, typeInfo);
        }
 
+       private <OUT> DataStreamSource<OUT> 
createFileInput(FileInputFormat<OUT> inputFormat,
+                                                                               
                                TypeInformation<OUT> typeInfo,
+                                                                               
                                String sourceName,
+                                                                               
                                FileProcessingMode watchType,
+                                                                               
                                FilePathFilter pathFilter,
+                                                                               
                                long interval) {
+
+               Preconditions.checkNotNull(inputFormat, "Unspecified file input 
format.");
+               Preconditions.checkNotNull(typeInfo, "Unspecified output type 
information.");
+               Preconditions.checkNotNull(sourceName, "Unspecified name for 
the source.");
+               Preconditions.checkNotNull(watchType, "Unspecified watchtype.");
+               Preconditions.checkNotNull(pathFilter, "Unspecified path name 
filtering function.");
+
+               
Preconditions.checkArgument(watchType.equals(FileProcessingMode.PROCESS_ONCE) ||
+                       interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+                       "The path monitoring interval cannot be less than 100 
ms.");
+
+               ContinuousFileMonitoringFunction<OUT> monitoringFunction = new 
ContinuousFileMonitoringFunction<>(
+                       inputFormat, inputFormat.getFilePath().toString(),
+                       pathFilter, watchType, getParallelism(), interval);
+
+               ContinuousFileReaderOperator<OUT, ?> reader = new 
ContinuousFileReaderOperator<>(inputFormat);
+
+               SingleOutputStreamOperator<OUT> source = 
addSource(monitoringFunction, sourceName)
+                       .transform("FileSplitReader_" + sourceName, typeInfo, 
reader);
+
+               return new DataStreamSource<>(source);
+       }
+
        /**
         * Adds a Data Source to the streaming topology.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
new file mode 100644
index 0000000..b97c274
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link 
FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be 
further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to 
those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link FileProcessingMode} and the {@link 
FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater 
than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction<OUT>
+       extends RichSourceFunction<FileInputSplit> implements 
Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, 
List<FileInputSplit>>, Long>> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+       /**
+        * The minimum interval allowed between consecutive path scans. This is 
applicable if the
+        * {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+        */
+       public static final long MIN_MONITORING_INTERVAL = 100l;
+
+       /** The path to monitor. */
+       private final String path;
+
+       /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+       private final int readerParallelism;
+
+       /** The {@link FileInputFormat} to be read. */
+       private FileInputFormat<OUT> format;
+
+       /** How often to monitor the state of the directory for new data. */
+       private final long interval;
+
+       /** Which new data to process (see {@link FileProcessingMode}. */
+       private final FileProcessingMode watchType;
+
+       private List<Tuple2<Long, List<FileInputSplit>>> 
splitsToFwdOrderedAscByModTime;
+
+       private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
+
+       private long globalModificationTime;
+
+       private FilePathFilter pathFilter;
+
+       private volatile boolean isRunning = true;
+
+       public ContinuousFileMonitoringFunction(
+               FileInputFormat<OUT> format, String path,
+               FilePathFilter filter, FileProcessingMode watchType,
+               int readerParallelism, long interval) {
+
+               if (watchType != FileProcessingMode.PROCESS_ONCE && interval < 
MIN_MONITORING_INTERVAL) {
+                       throw new IllegalArgumentException("The specified 
monitoring interval (" + interval + " ms) is " +
+                               "smaller than the minimum allowed one (100 
ms).");
+               }
+               this.format = Preconditions.checkNotNull(format, "Unspecified 
File Input Format.");
+               this.path = Preconditions.checkNotNull(path, "Unspecified 
Path.");
+               this.pathFilter = Preconditions.checkNotNull(filter, 
"Unspecified File Path Filter.");
+
+               this.interval = interval;
+               this.watchType = watchType;
+               this.readerParallelism = Math.max(readerParallelism, 1);
+               this.globalModificationTime = Long.MIN_VALUE;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               LOG.info("Opening File Monitoring Source.");
+               
+               super.open(parameters);
+               format.configure(parameters);
+       }
+
+       @Override
+       public void run(SourceFunction.SourceContext<FileInputSplit> context) 
throws Exception {
+               FileSystem fileSystem = FileSystem.get(new URI(path));
+
+               switch (watchType) {
+                       case PROCESS_CONTINUOUSLY:
+                               while (isRunning) {
+                                       monitorDirAndForwardSplits(fileSystem, 
context);
+                                       Thread.sleep(interval);
+                               }
+                               isRunning = false;
+                               break;
+                       case PROCESS_ONCE:
+                               monitorDirAndForwardSplits(fileSystem, context);
+                               isRunning = false;
+                               break;
+                       default:
+                               isRunning = false;
+                               throw new RuntimeException("Unknown WatchType" 
+ watchType);
+               }
+       }
+
+       private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext<FileInputSplit> context) throws IOException, JobException {
+               final Object lock = context.getCheckpointLock();
+
+               // it may be non-null in the case of a recovery after a failure.
+               if (currentSplitsToFwd != null) {
+                       synchronized (lock) {
+                               forwardSplits(currentSplitsToFwd, context);
+                       }
+               }
+               currentSplitsToFwd = null;
+
+               // it may be non-null in the case of a recovery after a failure.
+               if (splitsToFwdOrderedAscByModTime == null) {
+                       splitsToFwdOrderedAscByModTime = 
getInputSplitSortedOnModTime(fs);
+               }
+
+               Iterator<Tuple2<Long, List<FileInputSplit>>> it =
+                       splitsToFwdOrderedAscByModTime.iterator();
+
+               while (it.hasNext()) {
+                       synchronized (lock) {
+                               currentSplitsToFwd = it.next();
+                               it.remove();
+                               forwardSplits(currentSplitsToFwd, context);
+                       }
+               }
+
+               // set them to null to distinguish from a restore.
+               splitsToFwdOrderedAscByModTime = null;
+               currentSplitsToFwd = null;
+       }
+
+       private void forwardSplits(Tuple2<Long, List<FileInputSplit>> 
splitsToFwd, SourceContext<FileInputSplit> context) {
+               currentSplitsToFwd = splitsToFwd;
+               Long modTime = currentSplitsToFwd.f0;
+               List<FileInputSplit> splits = currentSplitsToFwd.f1;
+
+               Iterator<FileInputSplit> it = splits.iterator();
+               while (it.hasNext()) {
+                       FileInputSplit split = it.next();
+                       processSplit(split, context);
+                       it.remove();
+               }
+
+               // update the global modification time
+               if (modTime >= globalModificationTime) {
+                       globalModificationTime = modTime;
+               }
+       }
+
+       private void processSplit(FileInputSplit split, 
SourceContext<FileInputSplit> context) {
+               LOG.info("Forwarding split: " + split);
+               context.collect(split);
+       }
+
+       private List<Tuple2<Long, List<FileInputSplit>>> 
getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
+               List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
+               if (eligibleFiles.isEmpty()) {
+                       return new ArrayList<>();
+               }
+
+               Map<Long, List<FileInputSplit>> splitsToForward = 
getInputSplits(eligibleFiles);
+               List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward 
= new ArrayList<>();
+
+               for (Map.Entry<Long, List<FileInputSplit>> entry : 
splitsToForward.entrySet()) {
+                       sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), 
entry.getValue()));
+               }
+
+               Collections.sort(sortedSplitsToForward, new 
Comparator<Tuple2<Long, List<FileInputSplit>>>() {
+                       @Override
+                       public int compare(Tuple2<Long, List<FileInputSplit>> 
o1, Tuple2<Long, List<FileInputSplit>> o2) {
+                               return (int) (o1.f0 - o2.f0);
+                       }
+               });
+
+               return sortedSplitsToForward;
+       }
+
+       /**
+        * Creates the input splits for the path to be forwarded to the 
downstream tasks of the
+        * {@link ContinuousFileReaderOperator}. Those tasks are going to read 
their contents for further
+        * processing. Splits belonging to files in the {@code eligibleFiles} 
list are the ones
+        * that are shipped for further processing.
+        * @param eligibleFiles The files to process.
+        */
+       private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> 
eligibleFiles) throws IOException {
+               if (eligibleFiles.isEmpty()) {
+                       return new HashMap<>();
+               }
+
+               FileInputSplit[] inputSplits = 
format.createInputSplits(readerParallelism);
+
+               Map<Long, List<FileInputSplit>> splitsPerFile = new HashMap<>();
+               for (FileInputSplit split: inputSplits) {
+                       for (FileStatus file: eligibleFiles) {
+                               if (file.getPath().equals(split.getPath())) {
+                                       Long modTime = 
file.getModificationTime();
+
+                                       List<FileInputSplit> splitsToForward = 
splitsPerFile.get(modTime);
+                                       if (splitsToForward == null) {
+                                               splitsToForward = new 
LinkedList<>();
+                                               splitsPerFile.put(modTime, 
splitsToForward);
+                                       }
+                                       splitsToForward.add(split);
+                                       break;
+                               }
+                       }
+               }
+               return splitsPerFile;
+       }
+
+       /**
+        * Returns the files that have data to be processed. This method 
returns the
+        * Paths to the aforementioned files. It is up to the {@link 
#processSplit(FileInputSplit, SourceContext)}
+        * method to decide which parts of the file to be processed, and 
forward them downstream.
+        */
+       private List<FileStatus> listEligibleFiles(FileSystem fileSystem) 
throws IOException {
+               List<FileStatus> files = new ArrayList<>();
+
+               FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+               if (statuses == null) {
+                       LOG.warn("Path does not exist: {}", path);
+               } else {
+                       // handle the new files
+                       for (FileStatus status : statuses) {
+                               Path filePath = status.getPath();
+                               long modificationTime = 
status.getModificationTime();
+                               if (!shouldIgnore(filePath, modificationTime)) {
+                                       files.add(status);
+                               }
+                       }
+               }
+               return files;
+       }
+
+       /**
+        * Returns {@code true} if the file is NOT to be processed further.
+        * This happens in the following cases:
+        *
+        * If the user-specified path filtering method returns {@code true} for 
the file,
+        * or if the modification time of the file is smaller than the {@link 
#globalModificationTime}, which
+        * is the time of the most recent modification found in any of the 
already processed files.
+        */
+       private boolean shouldIgnore(Path filePath, long modificationTime) {
+               boolean shouldIgnore = ((pathFilter != null && 
pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
+               if (shouldIgnore) {
+                       LOG.debug("Ignoring " + filePath + ", with mod time= " 
+ modificationTime + " and global mod time= " + globalModificationTime);
+               }
+               return  shouldIgnore;
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               isRunning = false;
+               LOG.info("Closed File Monitoring Source.");
+       }
+
+       @Override
+       public void cancel() {
+               isRunning = false;
+       }
+
+       //      ---------------------                   Checkpointing           
        --------------------------
+
+       @Override
+       public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, 
List<FileInputSplit>>, Long> snapshotState(
+               long checkpointId, long checkpointTimestamp) throws Exception {
+
+               if (!isRunning) {
+                       LOG.debug("snapshotState() called on closed source");
+                       return null;
+               }
+               return new Tuple3<>(splitsToFwdOrderedAscByModTime,
+                       currentSplitsToFwd, globalModificationTime);
+       }
+
+       @Override
+       public void restoreState(Tuple3<List<Tuple2<Long, 
List<FileInputSplit>>>,
+               Tuple2<Long, List<FileInputSplit>>, Long> state) throws 
Exception {
+
+               this.splitsToFwdOrderedAscByModTime = state.f0;
+               this.currentSplitsToFwd = state.f1;
+               this.globalModificationTime = state.f2;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
new file mode 100644
index 0000000..4d4a792
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the {@link FileInputSplit FileInputSplits} 
received from
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator will 
receive just the split descriptors
+ * and then read and emit records. This may lead to increased backpressure. To 
avoid this, we have another
+ * thread ({@link SplitReader}) actually reading the splits and emitting the 
elements, which is separate from
+ * the thread forwarding the checkpoint barriers. The two threads sync on the 
{@link StreamTask#getCheckpointLock()}
+ * so that the checkpoints reflect the current state.
+ */
+@Internal
+public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends 
AbstractStreamOperator<OUT>
+       implements OneInputStreamOperator<FileInputSplit, OUT>, 
OutputTypeConfigurable<OUT> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
+
+       private static final FileInputSplit EOS = new FileInputSplit(-1, null, 
-1, -1, null);
+
+       private transient SplitReader<S, OUT> reader;
+       private transient TimestampedCollector<OUT> collector;
+
+       private FileInputFormat<OUT> format;
+       private TypeSerializer<OUT> serializer;
+
+       private Object checkpointLock;
+
+       private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
+
+       public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
+               this.format = checkNotNull(format);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               this.serializer = outTypeInfo.createSerializer(executionConfig);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+
+               if (this.serializer == null) {
+                       throw new IllegalStateException("The serializer has not 
been set. " +
+                               "Probably the setOutputType() was not called 
and this should not have happened. " +
+                               "Please report it.");
+               }
+
+               this.format.configure(new Configuration());
+               this.collector = new TimestampedCollector<>(output);
+               this.checkpointLock = getContainingTask().getCheckpointLock();
+
+               this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock, readerState);
+               this.reader.start();
+       }
+
+       @Override
+       public void processElement(StreamRecord<FileInputSplit> element) throws 
Exception {
+               reader.addSplit(element.getValue());
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               output.emitWatermark(mark);
+       }
+
+       @Override
+       public void dispose() {
+               super.dispose();
+
+               // first try to cancel it properly and
+               // give it some time until it finishes
+               reader.cancel();
+               try {
+                       reader.join(200);
+               } catch (InterruptedException e) {
+                       // we can ignore this
+               }
+
+               // if the above did not work, then interrupt the thread 
repeatedly
+               while (reader.isAlive()) {
+
+                       StringBuilder bld = new StringBuilder();
+                       StackTraceElement[] stack = reader.getStackTrace();
+                       for (StackTraceElement e : stack) {
+                               bld.append(e).append('\n');
+                       }
+                       LOG.warn("The reader is stuck in method:\n {}", 
bld.toString());
+
+                       reader.interrupt();
+                       try {
+                               reader.join(50);
+                       } catch (InterruptedException e) {
+                               // we can ignore this
+                       }
+               }
+               reader = null;
+               collector = null;
+               format = null;
+               serializer = null;
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+
+               // signal that no more splits will come, wait for the reader to 
finish
+               // and close the collector. Further cleaning up is handled by 
the dispose().
+
+               if (reader != null && reader.isAlive() && reader.isRunning()) {
+                       // add a dummy element to signal that no more splits 
will
+                       // arrive and wait until the reader finishes
+                       reader.addSplit(EOS);
+
+                       // we already have the checkpoint lock because close() 
is
+                       // called by the StreamTask while having it.
+                       checkpointLock.wait();
+               }
+               collector.close();
+       }
+
+       private class SplitReader<S extends Serializable, OT> extends Thread {
+
+               private volatile boolean isRunning;
+
+               private final FileInputFormat<OT> format;
+               private final TypeSerializer<OT> serializer;
+
+               private final Object checkpointLock;
+               private final TimestampedCollector<OT> collector;
+
+               private final Queue<FileInputSplit> pendingSplits;
+
+               private FileInputSplit currentSplit = null;
+
+               private S restoredFormatState = null;
+
+               SplitReader(FileInputFormat<OT> format,
+                                       TypeSerializer<OT> serializer,
+                                       TimestampedCollector<OT> collector,
+                                       Object checkpointLock,
+                                       Tuple3<List<FileInputSplit>, 
FileInputSplit, S> restoredState) {
+
+                       this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
+                       this.serializer = checkNotNull(serializer, "Unspecified 
Serialized.");
+
+                       this.pendingSplits = new LinkedList<>();
+                       this.collector = collector;
+                       this.checkpointLock = checkpointLock;
+                       this.isRunning = true;
+
+                       // this is the case where a task recovers from a 
previous failed attempt
+                       if (restoredState != null) {
+                               List<FileInputSplit> pending = restoredState.f0;
+                               FileInputSplit current = restoredState.f1;
+                               S formatState = restoredState.f2;
+
+                               for (FileInputSplit split : pending) {
+                                       pendingSplits.add(split);
+                               }
+
+                               this.currentSplit = current;
+                               this.restoredFormatState = formatState;
+                       }
+                       ContinuousFileReaderOperator.this.readerState = null;
+               }
+
+               void addSplit(FileInputSplit split) {
+                       Preconditions.checkNotNull(split);
+                       synchronized (checkpointLock) {
+                               this.pendingSplits.add(split);
+                       }
+               }
+
+               public boolean isRunning() {
+                       return this.isRunning;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               while (this.isRunning) {
+
+                                       synchronized (checkpointLock) {
+                                               if (this.currentSplit != null) {
+
+                                                       if 
(currentSplit.equals(EOS)) {
+                                                               isRunning = 
false;
+                                                               break;
+                                                       }
+
+                                                       if (this.format 
instanceof CheckpointableInputFormat && restoredFormatState != null) {
+                                                               
((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+                                                       } else {
+                                                               // this is the 
case of a non-checkpointable input format that will reprocess the last split.
+                                                               
LOG.info("Format " + this.format.getClass().getName() + " used is not 
checkpointable.");
+                                                               
format.open(currentSplit);
+                                                       }
+                                                       // reset the restored 
state to null for the next iteration
+                                                       
this.restoredFormatState = null;
+                                               } else {
+
+                                                       // get the next split 
to read.
+                                                       currentSplit = 
this.pendingSplits.poll();
+
+                                                       if (currentSplit == 
null) {
+                                                               
checkpointLock.wait(50);
+                                                               continue;
+                                                       }
+
+                                                       if 
(currentSplit.equals(EOS)) {
+                                                               isRunning = 
false;
+                                                               break;
+                                                       }
+                                                       
this.format.open(currentSplit);
+                                               }
+                                       }
+
+                                       LOG.info("Reading split: " + 
currentSplit);
+
+                                       try {
+                                               OT nextElement = 
serializer.createInstance();
+                                               while (!format.reachedEnd()) {
+                                                       synchronized 
(checkpointLock) {
+                                                               nextElement = 
format.nextRecord(nextElement);
+                                                               if (nextElement 
!= null) {
+                                                                       
collector.collect(nextElement);
+                                                               } else {
+                                                                       break;
+                                                               }
+                                                       }
+                                               }
+
+                                       } finally {
+                                               // close and prepare for the 
next iteration
+                                               this.format.close();
+                                               this.currentSplit = null;
+                                       }
+                               }
+
+                       } catch (Throwable e) {
+                               if (isRunning) {
+                                       LOG.error("Caught exception processing 
split: ", currentSplit);
+                               }
+                               getContainingTask().failExternally(e);
+                       } finally {
+                               synchronized (checkpointLock) {
+                                       LOG.info("Reader terminated, and 
exiting...");
+                                       checkpointLock.notifyAll();
+                               }
+                       }
+               }
+
+               Tuple3<List<FileInputSplit>, FileInputSplit, S> 
getReaderState() throws IOException {
+                       List<FileInputSplit> snapshot = new 
ArrayList<>(this.pendingSplits.size());
+                       for (FileInputSplit split: this.pendingSplits) {
+                               snapshot.add(split);
+                       }
+
+                       // remove the current split from the list if inside.
+                       if (this.currentSplit != null && 
this.currentSplit.equals(pendingSplits.peek())) {
+                               this.pendingSplits.remove();
+                       }
+
+                       if (this.format instanceof CheckpointableInputFormat) {
+                               S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
+                               return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
+                       } else {
+                               LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
+                               return new Tuple3<>(snapshot, currentSplit, 
null);
+                       }
+               }
+
+               public void cancel() {
+                       this.isRunning = false;
+               }
+       }
+
+       //      ---------------------                   Checkpointing           
        --------------------------
+
+       @Override
+       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+
+               final AbstractStateBackend.CheckpointStateOutputStream os =
+                       
this.getStateBackend().createCheckpointStateOutputStream(checkpointId, 
timestamp);
+
+               final ObjectOutputStream oos = new ObjectOutputStream(os);
+               final AbstractStateBackend.CheckpointStateOutputView ov = new 
AbstractStateBackend.CheckpointStateOutputView(os);
+
+               Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = 
this.reader.getReaderState();
+               List<FileInputSplit> pendingSplits = readerState.f0;
+               FileInputSplit currSplit = readerState.f1;
+               S formatState = readerState.f2;
+
+               // write the current split
+               oos.writeObject(currSplit);
+
+               // write the pending ones
+               ov.writeInt(pendingSplits.size());
+               for (FileInputSplit split : pendingSplits) {
+                       oos.writeObject(split);
+               }
+
+               // write the state of the reading channel
+               oos.writeObject(formatState);
+               taskState.setOperatorState(os.closeAndGetHandle());
+               return taskState;
+       }
+
+       @Override
+       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
+               super.restoreState(state, recoveryTimestamp);
+
+               StreamStateHandle stream = (StreamStateHandle) 
state.getOperatorState();
+
+               final InputStream is = 
stream.getState(getUserCodeClassloader());
+               final ObjectInputStream ois = new ObjectInputStream(is);
+               final DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(is);
+
+               // read the split that was being read
+               FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+
+               // read the pending splits list
+               List<FileInputSplit> pendingSplits = new LinkedList<>();
+               int noOfSplits = div.readInt();
+               for (int i = 0; i < noOfSplits; i++) {
+                       FileInputSplit split = (FileInputSplit) 
ois.readObject();
+                       pendingSplits.add(split);
+               }
+
+               // read the state of the format
+               S formatState = (S) ois.readObject();
+
+               // set the whole reader state for the open() to find.
+               this.readerState = new Tuple3<>(pendingSplits, currSplit, 
formatState);
+               div.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index fc24079..06da8c1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -32,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@PublicEvolving
+@Deprecated
 public class FileMonitoringFunction implements SourceFunction<Tuple3<String, 
Long, Long>> {
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
new file mode 100644
index 0000000..1a359ab
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * An interface to be implemented by the user when using the {@link 
ContinuousFileMonitoringFunction}.
+ * The {@link #filterPath(Path)} method is responsible for deciding if a path 
is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files that
+ * are still being written.
+ */
+@PublicEvolving
+public abstract class FilePathFilter implements Serializable {
+
+       public static FilePathFilter createDefaultFilter() {
+               return new DefaultFilter();
+       }
+       /**
+        * Returns {@code true} if the {@code filePath} given is to be
+        * ignored when processing a directory, e.g.
+        * <pre>
+        * {@code
+        *
+        * public boolean filterPaths(Path filePath) {
+        *     return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+        * }
+        * }</pre>
+        */
+       public abstract boolean filterPath(Path filePath);
+
+       /**
+        * The default file path filtering method and is used
+        * if no other such function is provided. This filter leaves out
+        * files starting with ".", "_", and "_COPYING_".
+        */
+       public static class DefaultFilter extends FilePathFilter {
+
+               DefaultFilter() {}
+
+               @Override
+               public boolean filterPath(Path filePath) {
+                       return filePath == null ||
+                               filePath.getName().startsWith(".") ||
+                               filePath.getName().startsWith("_") ||
+                               filePath.getName().contains("_COPYING_");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
new file mode 100644
index 0000000..cdbeb2b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Specifies when the computation of the {@link 
ContinuousFileMonitoringFunction}
+ * will be triggered.
+ */
+@PublicEvolving
+public enum FileProcessingMode {
+
+       PROCESS_ONCE,                           // Processes the current 
content of a file/path only ONCE, and stops monitoring.
+       PROCESS_CONTINUOUSLY            // Reprocesses the whole file when new 
data is appended.
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
index 0f78826..ac1e834 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -21,7 +21,6 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.URI;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -29,7 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 
-@PublicEvolving
+@Deprecated
 public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, 
Long>, String> {
 
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
deleted file mode 100644
index 0dcb9ff..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-@PublicEvolving
-public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
-       private static final long serialVersionUID = 1L;
-
-       private TypeInformation<OUT> typeInfo;
-       private transient TypeSerializer<OUT> serializer;
-
-       private InputFormat<OUT, InputSplit> format;
-
-       private transient InputSplitProvider provider;
-       private transient Iterator<InputSplit> splitIterator;
-
-       private volatile boolean isRunning = true;
-
-       @SuppressWarnings("unchecked")
-       public FileSourceFunction(InputFormat<OUT, ?> format, 
TypeInformation<OUT> typeInfo) {
-               this.format = (InputFormat<OUT, InputSplit>) format;
-               this.typeInfo = typeInfo;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void open(Configuration parameters) throws Exception {
-               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-               this.provider = context.getInputSplitProvider();
-               
-               format.configure(parameters);
-               serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
-               splitIterator = getInputSplits();
-               if (splitIterator.hasNext()) {
-                       format.open(splitIterator.next());
-               }
-               isRunning = true;
-       }
-
-       @Override
-       public void close() throws Exception {
-               format.close();
-       }
-
-       private Iterator<InputSplit> getInputSplits() {
-
-               return new Iterator<InputSplit>() {
-
-                       private InputSplit nextSplit;
-
-                       private boolean exhausted;
-
-                       @Override
-                       public boolean hasNext() {
-                               if (exhausted) {
-                                       return false;
-                               }
-
-                               if (nextSplit != null) {
-                                       return true;
-                               }
-
-                               InputSplit split = provider.getNextInputSplit();
-
-                               if (split != null) {
-                                       this.nextSplit = split;
-                                       return true;
-                               } else {
-                                       exhausted = true;
-                                       return false;
-                               }
-                       }
-
-                       @Override
-                       public InputSplit next() {
-                               if (this.nextSplit == null && !hasNext()) {
-                                       throw new NoSuchElementException();
-                               }
-
-                               final InputSplit tmp = this.nextSplit;
-                               this.nextSplit = null;
-                               return tmp;
-                       }
-
-                       @Override
-                       public void remove() {
-                               throw new UnsupportedOperationException();
-                       }
-               };
-       }
-
-       @Override
-       public void run(SourceContext<OUT> ctx) throws Exception {
-               while (isRunning) {
-                       OUT nextElement = serializer.createInstance();
-                       nextElement =  format.nextRecord(nextElement);
-                       if (nextElement == null && splitIterator.hasNext()) {
-                               format.open(splitIterator.next());
-                               continue;
-                       } else if (nextElement == null) {
-                               break;
-                       }
-                       ctx.collect(nextElement);
-               }
-       }
-
-       @Override
-       public void cancel() {
-               isRunning = false;
-       }
-
-
-       /**
-        * Returns the {@code InputFormat}. This is only needed because we need 
to set the input
-        * split assigner on the {@code StreamGraph}.
-        */
-       public InputFormat<OUT, InputSplit> getFormat() {
-               return format;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
new file mode 100644
index 0000000..2a84781
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+@Internal
+public class InputFormatSource<OUT> extends RichParallelSourceFunction<OUT> {
+       private static final long serialVersionUID = 1L;
+
+       private TypeInformation<OUT> typeInfo;
+       private transient TypeSerializer<OUT> serializer;
+
+       private InputFormat<OUT, InputSplit> format;
+
+       private transient InputSplitProvider provider;
+       private transient Iterator<InputSplit> splitIterator;
+
+       private volatile boolean isRunning = true;
+
+       @SuppressWarnings("unchecked")
+       public InputFormatSource(InputFormat<OUT, ?> format, 
TypeInformation<OUT> typeInfo) {
+               this.format = (InputFormat<OUT, InputSplit>) format;
+               this.typeInfo = typeInfo;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
+               this.provider = context.getInputSplitProvider();
+               
+               format.configure(parameters);
+               serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+               splitIterator = getInputSplits();
+               if (splitIterator.hasNext()) {
+                       format.open(splitIterator.next());
+               }
+               isRunning = true;
+       }
+
+       @Override
+       public void close() throws Exception {
+               format.close();
+       }
+
+       private Iterator<InputSplit> getInputSplits() {
+
+               return new Iterator<InputSplit>() {
+
+                       private InputSplit nextSplit;
+
+                       private boolean exhausted;
+
+                       @Override
+                       public boolean hasNext() {
+                               if (exhausted) {
+                                       return false;
+                               }
+
+                               if (nextSplit != null) {
+                                       return true;
+                               }
+
+                               InputSplit split = provider.getNextInputSplit();
+
+                               if (split != null) {
+                                       this.nextSplit = split;
+                                       return true;
+                               } else {
+                                       exhausted = true;
+                                       return false;
+                               }
+                       }
+
+                       @Override
+                       public InputSplit next() {
+                               if (this.nextSplit == null && !hasNext()) {
+                                       throw new NoSuchElementException();
+                               }
+
+                               final InputSplit tmp = this.nextSplit;
+                               this.nextSplit = null;
+                               return tmp;
+                       }
+
+                       @Override
+                       public void remove() {
+                               throw new UnsupportedOperationException();
+                       }
+               };
+       }
+
+       @Override
+       public void run(SourceContext<OUT> ctx) throws Exception {
+               while (isRunning) {
+                       OUT nextElement = serializer.createInstance();
+                       nextElement =  format.nextRecord(nextElement);
+                       if (nextElement == null && splitIterator.hasNext()) {
+                               format.open(splitIterator.next());
+                               continue;
+                       } else if (nextElement == null) {
+                               break;
+                       }
+                       ctx.collect(nextElement);
+               }
+       }
+
+       @Override
+       public void cancel() {
+               isRunning = false;
+       }
+
+
+       /**
+        * Returns the {@code InputFormat}. This is only needed because we need 
to set the input
+        * split assigner on the {@code StreamGraph}.
+        */
+       public InputFormat<OUT, InputSplit> getFormat() {
+               return format;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 70c5cff..685655e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -425,8 +425,8 @@ public class StreamGraphGenerator {
                                null,
                                source.getOutputType(),
                                "Source: " + source.getName());
-               if (source.getOperator().getUserFunction() instanceof 
FileSourceFunction) {
-                       FileSourceFunction<T> fs = (FileSourceFunction<T>) 
source.getOperator().getUserFunction();
+               if (source.getOperator().getUserFunction() instanceof 
InputFormatSource) {
+                       InputFormatSource<T> fs = (InputFormatSource<T>) 
source.getOperator().getUserFunction();
                        streamGraph.setInputFormat(source.getId(), 
fs.getFormat());
                }
                streamGraph.setParallelism(source.getId(), 
source.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
index 86677a6..4517eea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 public interface OutputTypeConfigurable<OUT> {
 
        /**
-        * Is called by the {@link 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, 
StreamOperator, TypeInformation, TypeInformation, String)}
+        * Is called by the {@link 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, String, 
StreamOperator, TypeInformation, TypeInformation, String)}
         * method when the {@link 
org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
         * method is called with the output {@link TypeInformation} which is 
also used for the
         * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output 
serializer.

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 69e920f..9ed715e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -69,7 +69,7 @@ public interface StreamOperator<OUT> extends Serializable {
 
         * <p>
         * The method is expected to flush all remaining buffered data. 
Exceptions during this flushing
-        * of buffered should be propagated, in order to cause the operation to 
be recognized asa failed,
+        * of buffered should be propagated, in order to cause the operation to 
be recognized as failed,
         * because the last data items are not processed properly.
         * 
         * @throws java.lang.Exception An exception in this method causes the 
operator to fail.

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c5f983a..6e2e9f9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -141,6 +141,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
        }
 
+       public Object getCheckpointLock() {
+               return mockTask.getCheckpointLock();
+       }
+
        public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, 
TypeInformation<K> keyType) {
                ClosureCleaner.clean(keySelector, false);
                config.setStatePartitioner(0, keySelector);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7be7840..f6dab1e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
-
-import org.apache.flink.annotation.{Internal, PublicEvolving, Public}
+import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,14 +27,12 @@ import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.AbstractStateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaEnv}
-import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source._
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
 import org.apache.flink.util.SplittableIterator
 
 import scala.collection.JavaConverters._
-
 import _root_.scala.language.implicitConversions
 
 @Public
@@ -454,20 +451,67 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
         DataStream[T] =
     asScalaStream(javaEnv.readFile(inputFormat, filePath))
 
-
   /**
-   * Creates a DataStream that contains the contents of file created while
-   * system watches the given path. The file will be read with the system's
-   * default character set. The user can check the monitoring interval in 
milliseconds,
-   * and the way file modifications are handled. By default it checks for only 
new files
-   * every 100 milliseconds.
-   *
-   */
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
-                     watchType: WatchType = WatchType.ONLY_NEW_FILES): 
DataStream[String] =
+    * Creates a DataStream that contains the contents of file created while
+    * system watches the given path. The file will be read with the system's
+    * default character set. The user can check the monitoring interval in 
milliseconds,
+    * and the way file modifications are handled. By default it checks for 
only new files
+    * every 100 milliseconds.
+    *
+    */
+  @Deprecated
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100,
+                     watchType: FileMonitoringFunction.WatchType =
+                     FileMonitoringFunction.WatchType.ONLY_NEW_FILES): 
DataStream[String] =
     asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, 
watchType))
 
   /**
+    * Reads the contents of the user-specified path based on the given 
[[FileInputFormat]].
+    * Depending on the provided [[FileProcessingMode]], the source
+    * may periodically monitor (every `interval` ms) the path for new data
+    * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
+    * once the data currently in the path and exit
+    * ([[FileProcessingMode.PROCESS_ONCE]]). In addition,
+    * if the path contains files not to be processed, the user can specify a 
custom
+    * [[FilePathFilter]]. As a default implementation you can use
+    * [[FilePathFilter.createDefaultFilter()]].
+    *
+    * ** NOTES ON CHECKPOINTING: ** If the `watchType` is set to
+    * [[FileProcessingMode#PROCESS_ONCE]], the source monitors the path ** 
once **,
+    * creates the [[org.apache.flink.core.fs.FileInputSplit FileInputSplits]]
+    * to be processed, forwards them to the downstream
+    * [[ContinuousFileReaderOperator readers]] to read the actual data,
+    * and exits, without waiting for the readers to finish reading. This
+    * implies that no more checkpoint barriers are going to be forwarded
+    * after the source exits, thus having no checkpoints after that point.
+    *
+    * @param inputFormat
+    *          The input format used to create the data stream
+    * @param filePath
+    *          The path of the file, as a URI (e.g., "file:///some/local/file" 
or
+    *          "hdfs://host:port/file/path")
+    * @param watchType
+    *          The mode in which the source should operate, i.e. monitor path 
and react
+    *          to new data, or process once and exit
+    * @param interval
+    *          In the case of periodic path monitoring, this specifies the 
interval (in millis)
+    *          between consecutive path scans
+    * @param filter
+    *          The files to be excluded from the processing
+    * @return The data stream that represents the data read from the given file
+    */
+  @PublicEvolving
+  def readFile[T: TypeInformation](
+      inputFormat: FileInputFormat[T],
+      filePath: String,
+      watchType: FileProcessingMode,
+      interval: Long,
+      filter: FilePathFilter): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, 
filter, typeInfo))
+  }
+
+  /**
    * Creates a new DataStream that contains the strings received infinitely
    * from socket. Received strings are decoded by the system's default
    * character set. The maximum retry interval is specified in seconds, in case

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
new file mode 100644
index 0000000..4c0f648
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ContinuousFileProcessingCheckpointITCase extends 
StreamFaultToleranceTestBase {
+
+       private static final int NO_OF_FILES = 9;
+       private static final int LINES_PER_FILE = 200;
+       private static final int NO_OF_RETRIES = 3;
+       private static final int PARALLELISM = 4;
+       private static final long INTERVAL = 2000;
+
+       private static File baseDir;
+       private static org.apache.hadoop.fs.FileSystem fs;
+       private static String localFsURI;
+       private FileCreator fc;
+
+       private static  Map<Integer, List<String>> finalCollectedContent = new 
HashMap<>();
+
+       @BeforeClass
+       public static void createHDFS() {
+               try {
+                       baseDir = new 
File("./target/localfs/fs_tests").getAbsoluteFile();
+                       FileUtil.fullyDelete(baseDir);
+
+                       org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+
+                       localFsURI = "file:///" + baseDir +"/";
+                       fs = new 
org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
+
+               } catch(Throwable e) {
+                       e.printStackTrace();
+                       Assert.fail("Test failed " + e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               try {
+                       FileUtil.fullyDelete(baseDir);
+               } catch (Throwable t) {
+                       throw new RuntimeException(t);
+               }
+       }
+
+       @Override
+       public void testProgram(StreamExecutionEnvironment env) {
+
+               // set the restart strategy.
+               env.getConfig().setRestartStrategy(
+                       RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
+               env.enableCheckpointing(20);
+               env.setParallelism(PARALLELISM);
+
+               // create and start the file creating thread.
+               fc = new FileCreator();
+               fc.start();
+
+               // create the monitoring source along with the necessary 
readers.
+               TestingSinkFunction sink = new TestingSinkFunction();
+               TextInputFormat format = new TextInputFormat(new 
org.apache.flink.core.fs.Path(localFsURI));
+               DataStream<String> inputStream = env.readFile(format, 
localFsURI,
+                       FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, 
FilePathFilter.createDefaultFilter());
+
+               inputStream.flatMap(new FlatMapFunction<String, String>() {
+                       @Override
+                       public void flatMap(String value, Collector<String> 
out) throws Exception {
+                               out.collect(value);
+                       }
+               }).addSink(sink).setParallelism(1);
+       }
+
+       @Override
+       public void postSubmit() throws Exception {
+               Map<Integer, List<String>> collected = finalCollectedContent;
+               Assert.assertEquals(collected.size(), 
fc.getFileContent().size());
+               for (Integer fileIdx: fc.getFileContent().keySet()) {
+                       Assert.assertTrue(collected.keySet().contains(fileIdx));
+
+                       List<String> cntnt = collected.get(fileIdx);
+                       Collections.sort(cntnt, new Comparator<String>() {
+                               @Override
+                               public int compare(String o1, String o2) {
+                                       return getLineNo(o1) - getLineNo(o2);
+                               }
+                       });
+
+                       StringBuilder cntntStr = new StringBuilder();
+                       for (String line: cntnt) {
+                               cntntStr.append(line);
+                       }
+                       Assert.assertEquals(fc.getFileContent().get(fileIdx), 
cntntStr.toString());
+               }
+
+               collected.clear();
+               finalCollectedContent.clear();
+               fc.clean();
+       }
+
+       private int getLineNo(String line) {
+               String[] tkns = line.split("\\s");
+               Assert.assertTrue(tkns.length == 6);
+               return Integer.parseInt(tkns[tkns.length - 1]);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom Functions
+       // 
--------------------------------------------------------------------------------------------
+
+       // -------------------------                    FILE CREATION           
        -------------------------------
+
+       /**
+        * A separate thread creating {@link #NO_OF_FILES} files, one file 
every {@link #INTERVAL} milliseconds.
+        * It serves for testing the file monitoring functionality of the 
{@link ContinuousFileMonitoringFunction}.
+        * The files are filled with data by the {@link #fillWithData(String, 
String, int, String)} method.
+        * */
+       private class FileCreator extends Thread {
+
+               private final Set<Path> filesCreated = new HashSet<>();
+               private final Map<Integer, String> fileContents = new 
HashMap<>();
+
+               public void run() {
+                       try {
+                               for(int i = 0; i < NO_OF_FILES; i++) {
+                                       Tuple2<org.apache.hadoop.fs.Path, 
String> file =
+                                               fillWithData(localFsURI, 
"file", i, "This is test line.");
+                                       filesCreated.add(file.f0);
+                                       fileContents.put(i, file.f1);
+
+                                       Thread.sleep((int) (INTERVAL / 
(3.0/2)));
+                               }
+                       } catch (IOException | InterruptedException e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               void clean() throws IOException {
+                       assert (fs != null);
+                       for (org.apache.hadoop.fs.Path path: filesCreated) {
+                               fs.delete(path, false);
+                       }
+                       fileContents.clear();
+               }
+
+               Map<Integer, String> getFileContent() {
+                       return this.fileContents;
+               }
+       }
+
+       /**
+        * Fill the file with content and put the content in the {@code 
hdPathContents} list.
+        * */
+       private Tuple2<Path, String> fillWithData(
+               String base, String fileName, int fileIdx, String sampleLine) 
throws IOException {
+
+               assert (fs != null);
+
+               org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+
+               org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+               FSDataOutputStream stream = fs.create(tmp);
+               StringBuilder str = new StringBuilder();
+               for(int i = 0; i < LINES_PER_FILE; i++) {
+                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+                       str.append(line);
+                       stream.write(line.getBytes());
+               }
+               stream.close();
+               fs.rename(tmp, file);
+               Assert.assertTrue("No result file present", fs.exists(file));
+               return new Tuple2<>(file, str.toString());
+       }
+
+       // --------------------------                   Task Sink               
        ------------------------------
+
+       private static class TestingSinkFunction extends 
RichSinkFunction<String>
+               implements Checkpointed<Tuple2<Long, Map<Integer, 
Set<String>>>>, CheckpointListener {
+
+               private static volatile boolean hasFailed = false;
+
+               private volatile int numSuccessfulCheckpoints;
+
+               private long count;
+
+               private long elementsToFailure;
+
+               private long elementCounter = 0;
+
+               private  Map<Integer, Set<String>> collectedContent = new 
HashMap<>();
+
+               TestingSinkFunction() {
+                       hasFailed = false;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       // this sink can only work with DOP 1
+                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       long failurePosMin = (long) (0.4 * LINES_PER_FILE);
+                       long failurePosMax = (long) (0.7 * LINES_PER_FILE);
+
+                       elementsToFailure = (new Random().nextLong() % 
(failurePosMax - failurePosMin)) + failurePosMin;
+
+                       if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
+                               finalCollectedContent = new HashMap<>();
+                               for (Map.Entry<Integer, Set<String>> result: 
collectedContent.entrySet()) {
+                                       
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
+                               }
+                               throw new SuccessException();
+                       }
+               }
+
+               @Override
+               public void close() {
+                       try {
+                               super.close();
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               @Override
+               public void invoke(String value) throws Exception {
+                       int fileIdx = 
Character.getNumericValue(value.charAt(0));
+
+                       Set<String> content = collectedContent.get(fileIdx);
+                       if (content == null) {
+                               content = new HashSet<>();
+                               collectedContent.put(fileIdx, content);
+                       }
+
+                       if (!content.add(value + "\n")) {
+                               fail("Duplicate line: " + value);
+                               System.exit(0);
+                       }
+
+
+                       elementCounter++;
+                       if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
+                               finalCollectedContent = new HashMap<>();
+                               for (Map.Entry<Integer, Set<String>> result: 
collectedContent.entrySet()) {
+                                       
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
+                               }
+                               throw new SuccessException();
+                       }
+
+                       count++;
+                       if (!hasFailed) {
+                               Thread.sleep(2);
+                               if (numSuccessfulCheckpoints >= 1 && count >= 
elementsToFailure) {
+                                       hasFailed = true;
+                                       throw new Exception("Task Failure");
+                               }
+                       }
+               }
+
+               @Override
+               public Tuple2<Long, Map<Integer, Set<String>>> 
snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+                       return new Tuple2<>(elementCounter, collectedContent);
+               }
+
+               @Override
+               public void restoreState(Tuple2<Long, Map<Integer, 
Set<String>>> state) throws Exception {
+                       this.elementCounter = state.f0;
+                       this.collectedContent = state.f1;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       numSuccessfulCheckpoints++;
+               }
+       }
+}

Reply via email to