[FLINK-2314] Make Streaming File Sources Persistent

This commit is a combination of several commits/changes. It combines
changes to the file input formats and the streaming file read operator
and integrates them into the API.

These are the messages of the other two commits:

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

[FLINK-3889] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d353895b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d353895b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d353895b

Branch: refs/heads/master
Commit: d353895ba512a5e30fb08a25643fd93f085e8456
Parents: bc19486
Author: kl0u <kklou...@gmail.com>
Authored: Sun Apr 10 16:56:42 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Jun 14 18:11:22 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      |  85 +++-
 .../io/avro/AvroSplittableInputFormatTest.java  |  95 +++-
 .../flink/api/common/io/BinaryInputFormat.java  | 121 +++--
 .../apache/flink/api/common/io/BlockInfo.java   |   5 +
 .../common/io/CheckpointableInputFormat.java    |  61 +++
 .../api/common/io/DelimitedInputFormat.java     | 114 +++--
 .../api/common/io/EnumerateNestedFilesTest.java |   2 +-
 .../api/common/io/FileInputFormatTest.java      |   9 +-
 .../api/common/io/SequentialFormatTestBase.java |  15 +-
 flink-fs-tests/pom.xml                          |  11 +-
 .../ContinuousFileMonitoringFunctionITCase.java | 300 +++++++++++++
 .../hdfstests/ContinuousFileMonitoringTest.java | 447 +++++++++++++++++++
 .../flink/api/java/io/CsvInputFormatTest.java   | 134 +++++-
 .../runtime/taskmanager/RuntimeEnvironment.java |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../api/datastream/DataStreamSource.java        |   5 +
 .../environment/StreamExecutionEnvironment.java | 256 +++++++++--
 .../ContinuousFileMonitoringFunction.java       | 328 ++++++++++++++
 .../source/ContinuousFileReaderOperator.java    | 390 ++++++++++++++++
 .../source/FileMonitoringFunction.java          |   3 +-
 .../api/functions/source/FilePathFilter.java    |  66 +++
 .../functions/source/FileProcessingMode.java    |  31 ++
 .../api/functions/source/FileReadFunction.java  |   3 +-
 .../functions/source/FileSourceFunction.java    | 148 ------
 .../api/functions/source/InputFormatSource.java | 148 ++++++
 .../api/graph/StreamGraphGenerator.java         |   6 +-
 .../api/operators/OutputTypeConfigurable.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |   2 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +
 .../api/scala/StreamExecutionEnvironment.scala  |  74 ++-
 ...ontinuousFileProcessingCheckpointITCase.java | 327 ++++++++++++++
 .../StreamFaultToleranceTestBase.java           |   8 +-
 32 files changed, 2889 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 605ce69..a920275 100644
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -22,13 +22,15 @@ package org.apache.flink.api.java.io;
 import java.io.IOException;
 
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
@@ -42,15 +44,16 @@ import org.apache.flink.util.InstantiationUtil;
 
 /**
  * Provides a {@link FileInputFormat} for Avro records.
- * 
+ *
  * @param <E>
  *            the type of the result Avro record. If you specify
  *            {@link GenericRecord} then the result will be returned as a
  *            {@link GenericRecord}, so you do not have to know the schema 
ahead
  *            of time.
  */
-public class AvroInputFormat<E> extends FileInputFormat<E> implements 
ResultTypeQueryable<E> {
-       
+public class AvroInputFormat<E> extends FileInputFormat<E> implements 
ResultTypeQueryable<E>,
+       CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(AvroInputFormat.class);
@@ -59,16 +62,19 @@ public class AvroInputFormat<E> extends FileInputFormat<E> 
implements ResultType
        
        private boolean reuseAvroValue = true;
 
-       private transient FileReader<E> dataFileReader;
+       private transient DataFileReader<E> dataFileReader;
 
        private transient long end;
-       
+
+       private transient long recordsReadSinceLastSync;
+
+       private transient long lastSync = -1l;
+
        public AvroInputFormat(Path filePath, Class<E> type) {
                super(filePath);
                this.avroValueType = type;
        }
-       
-       
+
        /**
         * Sets the flag whether to reuse the Avro value instance for all 
records.
         * By default, the input format reuses the Avro value.
@@ -102,30 +108,34 @@ public class AvroInputFormat<E> extends 
FileInputFormat<E> implements ResultType
        @Override
        public void open(FileInputSplit split) throws IOException {
                super.open(split);
+               dataFileReader = initReader(split);
+               dataFileReader.sync(split.getStart());
+               lastSync = dataFileReader.previousSync();
+       }
 
+       private DataFileReader<E> initReader(FileInputSplit split) throws 
IOException {
                DatumReader<E> datumReader;
-               
+
                if (org.apache.avro.generic.GenericRecord.class == 
avroValueType) {
                        datumReader = new GenericDatumReader<E>();
                } else {
                        datumReader = 
org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
-                                       ? new 
SpecificDatumReader<E>(avroValueType) : new 
ReflectDatumReader<E>(avroValueType);
+                               ? new SpecificDatumReader<E>(avroValueType) : 
new ReflectDatumReader<E>(avroValueType);
                }
-
                if (LOG.isInfoEnabled()) {
                        LOG.info("Opening split {}", split);
                }
 
                SeekableInput in = new FSDataInputStreamWrapper(stream, 
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+               DataFileReader<E> dataFileReader = (DataFileReader) 
DataFileReader.openReader(in, datumReader);
 
-               dataFileReader = DataFileReader.openReader(in, datumReader);
-               
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Loaded SCHEMA: {}", 
dataFileReader.getSchema());
                }
-               
-               dataFileReader.sync(split.getStart());
-               this.end = split.getStart() + split.getLength();
+
+               end = split.getStart() + split.getLength();
+               recordsReadSinceLastSync = 0;
+               return dataFileReader;
        }
 
        @Override
@@ -133,11 +143,24 @@ public class AvroInputFormat<E> extends 
FileInputFormat<E> implements ResultType
                return !dataFileReader.hasNext() || 
dataFileReader.pastSync(end);
        }
 
+       public long getRecordsReadFromBlock() {
+               return this.recordsReadSinceLastSync;
+       }
+
        @Override
        public E nextRecord(E reuseValue) throws IOException {
                if (reachedEnd()) {
                        return null;
                }
+
+               // if we start a new block, then register the event, and
+               // restart the counter.
+               if(dataFileReader.previousSync() != lastSync) {
+                       lastSync = dataFileReader.previousSync();
+                       recordsReadSinceLastSync = 0;
+               }
+               recordsReadSinceLastSync++;
+
                if (reuseAvroValue) {
                        return dataFileReader.next(reuseValue);
                } else {
@@ -148,4 +171,34 @@ public class AvroInputFormat<E> extends FileInputFormat<E> 
implements ResultType
                        }
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Checkpointing
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public Tuple2<Long, Long> getCurrentState() throws IOException {
+               return new Tuple2<>(this.lastSync, 
this.recordsReadSinceLastSync);
+       }
+
+       @Override
+       public void reopen(FileInputSplit split, Tuple2<Long, Long> state) 
throws IOException {
+               Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
+               Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
+
+               this.open(split);
+               if (state.f0 != -1) {
+
+                       // go to the block we stopped
+                       lastSync = state.f0;
+                       dataFileReader.seek(lastSync);
+
+                       // read until the record we were before the checkpoint 
and discard the values
+                       long recordsToDiscard = state.f1;
+                       for(int i = 0; i < recordsToDiscard; i++) {
+                               dataFileReader.next(null);
+                               recordsReadSinceLastSync++;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
index 898b8fd..37a83d1 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.io.avro.generated.Colors;
 import org.apache.flink.api.io.avro.generated.Fixed16;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -166,7 +167,7 @@ public class AvroSplittableInputFormatTest {
                Configuration parameters = new Configuration();
                
                AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               
+
                format.configure(parameters);
                FileInputSplit[] splits = format.createInputSplits(4);
                assertEquals(splits.length, 4);
@@ -191,6 +192,98 @@ public class AvroSplittableInputFormatTest {
                format.close();
        }
 
+       @Test
+       public void testAvroRecoveryWithFailureAtStart() throws Exception {
+               final int recordsUntilCheckpoint = 132;
+
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.configure(parameters);
+
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+
+               int elements = 0;
+               int elementsPerSplit[] = new int[4];
+               for(int i = 0; i < splits.length; i++) {
+                       format.reopen(splits[i], format.getCurrentState());
+                       while(!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+
+                               if(format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
+
+                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
+                                       Tuple2<Long, Long> state = 
format.getCurrentState();
+
+                                       // this is to make sure that nothing 
stays from the previous format
+                                       // (as it is going to be in the normal 
case)
+                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+                                       format.reopen(splits[i], state);
+                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+                               }
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       @Test
+       public void testAvroRecovery() throws Exception {
+               final int recordsUntilCheckpoint = 132;
+
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.configure(parameters);
+
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+
+               int elements = 0;
+               int elementsPerSplit[] = new int[4];
+               for(int i = 0; i < splits.length; i++) {
+                       format.open(splits[i]);
+                       while(!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+
+                               if(format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
+
+                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
+                                       Tuple2<Long, Long> state = 
format.getCurrentState();
+
+                                       // this is to make sure that nothing 
stays from the previous format
+                                       // (as it is going to be in the normal 
case)
+                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+                                       format.reopen(splits[i], state);
+                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+                               }
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
        /*
        This test is gave the reference values for the test of Flink's IF.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 3789544..eb83bda 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -30,6 +31,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,36 +42,44 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Base class for all input formats that use blocks of fixed size. The input 
splits are aligned to these blocks. Without
- * configuration, these block sizes equal the native block sizes of the HDFS.
+ * Base class for all input formats that use blocks of fixed size. The input 
splits are aligned to these blocks,
+ * meaning that each split will consist of one block. Without configuration, 
these block sizes equal the native
+ * block sizes of the HDFS.
+ *
+ * A block will contain a {@link BlockInfo} at the end of the block. There, 
the reader can find some statistics
+ * about the split currently being read, that will help correctly parse the 
contents of the block.
  */
 @Public
-public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
+public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
+       implements CheckpointableInputFormat<FileInputSplit, Tuple2<Long, 
Long>> {
+
        private static final long serialVersionUID = 1L;
 
-       /**
-        * The log.
-        */
+       /** The log. */
        private static final Logger LOG = 
LoggerFactory.getLogger(BinaryInputFormat.class);
 
-       /**
-        * The config parameter which defines the fixed length of a record.
-        */
+       /** The config parameter which defines the fixed length of a record. */
        public static final String BLOCK_SIZE_PARAMETER_KEY = 
"input.block_size";
 
        public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
-       /**
-        * The block size to use.
-        */
+       /** The block size to use. */
        private long blockSize = NATIVE_BLOCK_SIZE;
 
        private transient DataInputViewStreamWrapper dataInputStream;
 
+       /** The BlockInfo for the Block corresponding to the split currently 
being read. */
        private transient BlockInfo blockInfo;
 
-       private long readRecords;
+       /** A wrapper around the block currently being read. */
+       private transient BlockBasedInput blockBasedInput = null;
 
+       /**
+        * The number of records already read from the block.
+        * This is used to decide if the end of the block has been
+        * reached.
+        */
+       private long readRecords = 0;
 
        @Override
        public void configure(Configuration parameters) {
@@ -193,6 +203,20 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                return new BlockInfo();
        }
 
+       private BlockInfo createAndReadBlockInfo() throws IOException {
+               BlockInfo blockInfo = new BlockInfo();
+               if (this.splitLength > blockInfo.getInfoSize()) {
+                       // At first we go and read  the block info containing 
the recordCount, the accumulatedRecordCount
+                       // and the firstRecordStart offset in the current 
block. This is written at the end of the block and
+                       // is of fixed size, currently 3 * Long.SIZE.
+
+                       // TODO: seek not supported by compressed streams. Will 
throw exception
+                       this.stream.seek(this.splitStart + this.splitLength - 
blockInfo.getInfoSize());
+                       blockInfo.read(new 
DataInputViewStreamWrapper(this.stream));
+               }
+               return blockInfo;
+       }
+
        /**
         * Fill in the statistics. The last modification time and the total 
input size are prefilled.
         *
@@ -207,8 +231,7 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                        return null;
                }
 
-               BlockInfo blockInfo = this.createBlockInfo();
-
+               BlockInfo blockInfo = new BlockInfo();
                long totalCount = 0;
                for (FileStatus file : files) {
                        // invalid file
@@ -249,20 +272,16 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
        public void open(FileInputSplit split) throws IOException {
                super.open(split);
 
-               final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
-                       this.filePath.getFileSystem().getDefaultBlockSize() : 
this.blockSize;
-
-               this.blockInfo = this.createBlockInfo();
-               if (this.splitLength > this.blockInfo.getInfoSize()) {
-                       // TODO: seek not supported by compressed streams. Will 
throw exception
-                       this.stream.seek(this.splitStart + this.splitLength - 
this.blockInfo.getInfoSize());
-                       this.blockInfo.read(new 
DataInputViewStreamWrapper(this.stream));
-               }
+               this.blockInfo = this.createAndReadBlockInfo();
 
+               // We set the size of the BlockBasedInput to splitLength as 
each split contains one block.
+               // After reading the block info, we seek in the file to the 
correct position.
+               
+               this.readRecords = 0;
                this.stream.seek(this.splitStart + 
this.blockInfo.getFirstRecordStart());
-               BlockBasedInput blockBasedInput = new 
BlockBasedInput(this.stream, (int) blockSize);
+               this.blockBasedInput = new BlockBasedInput(this.stream,
+                       (int) blockInfo.getFirstRecordStart(), 
this.splitLength);
                this.dataInputStream = new 
DataInputViewStreamWrapper(blockBasedInput);
-               this.readRecords = 0;
        }
 
        @Override
@@ -275,7 +294,6 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                if (this.reachedEnd()) {
                        return null;
                }
-
                record = this.deserialize(record, this.dataInputStream);
                this.readRecords++;
                return record;
@@ -284,8 +302,8 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
        protected abstract T deserialize(T reuse, DataInputView dataInput) 
throws IOException;
 
        /**
-        * Writes a block info at the end of the blocks.<br>
-        * Current implementation uses only int and not long.
+        * Reads the content of a block of data. The block contains its {@link 
BlockInfo}
+        * at the end, and this method takes this into account when reading the 
data.
         */
        protected class BlockBasedInput extends FilterInputStream {
                private final int maxPayloadSize;
@@ -297,6 +315,12 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                        this.blockPos = (int) 
BinaryInputFormat.this.blockInfo.getFirstRecordStart();
                        this.maxPayloadSize = blockSize - 
BinaryInputFormat.this.blockInfo.getInfoSize();
                }
+               
+               public BlockBasedInput(FSDataInputStream in, int startPos, long 
length) {
+                       super(in);
+                       this.blockPos = startPos;
+                       this.maxPayloadSize = (int) (length - 
BinaryInputFormat.this.blockInfo.getInfoSize());
+               }
 
                @Override
                public int read() throws IOException {
@@ -306,9 +330,16 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                        return this.in.read();
                }
 
+               private long getCurrBlockPos() {
+                       return this.blockPos;
+               }
+
                private void skipHeader() throws IOException {
                        byte[] dummy = new 
byte[BinaryInputFormat.this.blockInfo.getInfoSize()];
                        this.in.read(dummy, 0, dummy.length);
+
+                       // the blockPos is set to 0 for the case of remote 
reads,
+                       // these are the cases where the last record of a block 
spills on the next block
                        this.blockPos = 0;
                }
 
@@ -337,4 +368,36 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                        return totalRead;
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Checkpointing
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public Tuple2<Long, Long> getCurrentState() throws IOException {
+               if (this.blockBasedInput == null) {
+                       throw new RuntimeException("You must have forgotten to 
call open() on your input format.");
+               }
+
+               return  new Tuple2<>(
+                       this.blockBasedInput.getCurrBlockPos(),                 
// the last read index in the block
+                       this.readRecords                                        
                        // the number of records read
+               );
+       }
+
+       @Override
+       public void reopen(FileInputSplit split, Tuple2<Long, Long> state) 
throws IOException {
+               Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
+               Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
+
+               this.open(split);
+               this.blockInfo = this.createAndReadBlockInfo();
+
+               long blockPos = state.f0;
+               this.readRecords = state.f1;
+
+               this.stream.seek(this.splitStart + blockPos);
+               this.blockBasedInput = new BlockBasedInput(this.stream, (int) 
blockPos, this.splitLength);
+               this.dataInputStream = new 
DataInputViewStreamWrapper(blockBasedInput);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
index 0ac2e50..2cb18ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
@@ -25,6 +25,11 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+/**
+ * A block of 24 bytes written at the <i>end</i> of a block in a binary file, 
and containing
+ * i) the number of records in the block, ii) the accumulated number of 
records, and
+ * iii) the offset of the first record in the block.
+ * */
 @Public
 public class BlockInfo implements IOReadableWritable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
new file mode 100644
index 0000000..17b0625
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.InputSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An interface the describes {@link InputFormat}s that allow 
checkpointing/restoring their state.
+ *
+ * @param <S> The type of input split.
+ * @param <T> The type of the channel state to be checkpointed / included in 
the snapshot.
+ */
+@PublicEvolving
+public interface CheckpointableInputFormat<S extends InputSplit, T extends 
Serializable> {
+
+       /**
+        * Returns the split currently being read, along with its current state.
+        * This will be used to restore the state of the reading channel when 
recovering from a task failure.
+        * In the case of a simple text file, the state can correspond to the 
last read offset in the split.
+        *
+        * @return The state of the channel.
+        *
+        * @throws Exception Thrown if the creation of the state object failed.
+        */
+       T getCurrentState() throws IOException;
+
+       /**
+        * Restores the state of a parallel instance reading from an {@link 
InputFormat}.
+        * This is necessary when recovering from a task failure. When this 
method is called,
+        * the input format it guaranteed to be configured.
+        *
+        * <p/>
+        * <b>NOTE: </b> The caller has to make sure that the provided split is 
the one to whom
+        * the state belongs.
+        *
+        * @param split The split to be opened.
+        * @param state The state from which to start from. This can contain 
the offset,
+        *                 but also other data, depending on the input format.
+        */
+       void reopen(S split, T state) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 243e2a4..3a77200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -19,6 +19,9 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -28,9 +31,6 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -43,7 +43,7 @@ import java.util.ArrayList;
  * <p>The default delimiter is the newline character {@code '\n'}.</p>
  */
 @Public
-public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
+public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> 
implements CheckpointableInputFormat<FileInputSplit, Long> {
        
        private static final long serialVersionUID = 1L;
 
@@ -56,7 +56,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
 
        /** The default charset  to convert strings to bytes */
        private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-       
+
        /**
         * The default read buffer size = 1MB.
         */
@@ -81,7 +81,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
         * The maximum size of a sample record before sampling is aborted. To 
catch cases where a wrong delimiter is given.
         */
        private static int MAX_SAMPLE_LEN;
-       
+
        static { loadGlobalConfigParams(); }
        
        protected static void loadGlobalConfigParams() {
@@ -135,7 +135,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        private transient int readPos;
 
        private transient int limit;
-       
+
        private transient byte[] currBuffer;            // buffer in which 
current record byte sequence is found
        private transient int currOffset;                       // offset in 
above buffer
        private transient int currLen;                          // length of 
current byte sequence
@@ -143,8 +143,9 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        private transient boolean overLimit;
 
        private transient boolean end;
-       
-       
+
+       private transient long offset = -1;
+
        // 
--------------------------------------------------------------------------------------------
        //  The configuration parameters. Configured on the instance and 
serialized to be shipped.
        // 
--------------------------------------------------------------------------------------------
@@ -182,7 +183,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                
                this.delimiter = delimiter;
        }
-       
+
        public void setDelimiter(char delimiter) {
                setDelimiter(String.valueOf(delimiter));
        }
@@ -226,7 +227,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                
                this.numLineSamples = numLineSamples;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  User-defined behavior
        // 
--------------------------------------------------------------------------------------------
@@ -404,17 +405,33 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        /**
         * Opens the given input split. This method opens the input stream to 
the specified file, allocates read buffers
         * and positions the stream at the correct position, making sure that 
any partial record at the beginning is skipped.
-        * 
+        *
         * @param split The input split to open.
-        * 
+        *
         * @see 
org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit)
         */
        @Override
        public void open(FileInputSplit split) throws IOException {
                super.open(split);
-               
+               initBuffers();
+
+               this.offset = splitStart;
+               if (this.splitStart != 0) {
+                       this.stream.seek(offset);
+                       readLine();
+                       // if the first partial record already pushes the 
stream over
+                       // the limit of our split, then no record starts within 
this split
+                       if (this.overLimit) {
+                               this.end = true;
+                       }
+               } else {
+                       fillBuffer();
+               }
+       }
+
+       private void initBuffers() {
                this.bufferSize = this.bufferSize <= 0 ? 
DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
-               
+
                if (this.readBuffer == null || this.readBuffer.length != 
this.bufferSize) {
                        this.readBuffer = new byte[this.bufferSize];
                }
@@ -426,19 +443,6 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                this.limit = 0;
                this.overLimit = false;
                this.end = false;
-
-               if (this.splitStart != 0) {
-                       this.stream.seek(this.splitStart);
-                       readLine();
-                       
-                       // if the first partial record already pushes the 
stream over the limit of our split, then no
-                       // record starts within this split 
-                       if (this.overLimit) {
-                               this.end = true;
-                       }
-               } else {
-                       fillBuffer();
-               }
        }
 
        /**
@@ -489,6 +493,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                        if (this.readPos >= this.limit) {
                                if (!fillBuffer()) {
                                        if (countInWrapBuffer > 0) {
+                                               this.offset += 
countInWrapBuffer;
                                                setResult(this.wrapBuffer, 0, 
countInWrapBuffer);
                                                return true;
                                        } else {
@@ -506,13 +511,14 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                                } else {
                                        i = 0;
                                }
-
                        }
 
                        // check why we dropped out
                        if (i == this.delimiter.length) {
                                // line end
-                               count = this.readPos - startPos - 
this.delimiter.length;
+                               int totalBytesRead = this.readPos - startPos;
+                               this.offset += countInWrapBuffer + 
totalBytesRead;
+                               count = totalBytesRead - this.delimiter.length;
 
                                // copy to byte array
                                if (countInWrapBuffer > 0) {
@@ -535,7 +541,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                                count = this.limit - startPos;
                                
                                // check against the maximum record length
-                               if ( ((long) countInWrapBuffer) + count > 
this.lineLengthLimit) {
+                               if (((long) countInWrapBuffer) + count > 
this.lineLengthLimit) {
                                        throw new IOException("The record 
length exceeded the maximum record length (" + 
                                                        this.lineLengthLimit + 
").");
                                }
@@ -604,13 +610,11 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                        return true;
                }
        }
-       
-       // 
============================================================================================
-       //  Parametrization via configuration
-       // 
============================================================================================
-       
-       // ------------------------------------- Config Keys 
------------------------------------------
-       
+
+       // 
--------------------------------------------------------------------------------------------
+       // Config Keys for Parametrization via configuration
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * The configuration key to set the record delimiter.
         */
@@ -620,4 +624,38 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
         * The configuration key to set the number of samples to take for the 
statistics.
         */
        private static final String NUM_STATISTICS_SAMPLES = 
"delimited-format.numSamples";
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Checkpointing
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public Long getCurrentState() throws IOException {
+               return this.offset;
+       }
+
+       @Override
+       public void reopen(FileInputSplit split, Long state) throws IOException 
{
+               Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
+               Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
+
+               this.open(split);
+               this.offset = state;
+               if (state > this.splitStart + split.getLength()) {
+                       this.end = true;
+               } else if (state > split.getStart()) {
+                       initBuffers();
+
+                       this.stream.seek(this.offset);
+                       if (split.getLength() == -1) {
+                               // this is the case for unsplittable files
+                               fillBuffer();
+                       } else {
+                               this.splitLength = this.splitStart + 
split.getLength() - this.offset;
+                               if (splitLength <= 0) {
+                                       this.end = true;
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index d0caa22..68465a3 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -394,4 +394,4 @@ public class EnumerateNestedFilesTest {
                        return null;
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 63cb966..ae8802b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -377,7 +377,7 @@ public class FileInputFormatTest {
                final int numBlocks = 3;
                FileOutputStream fileOutputStream = new 
FileOutputStream(tempFile);
                for (int i = 0; i < blockSize * numBlocks; i++) {
-                       fileOutputStream.write(new byte[]{1});
+                       fileOutputStream.write(new byte[]{(byte) i});
                }
                fileOutputStream.close();
 
@@ -392,11 +392,12 @@ public class FileInputFormatTest {
                FileInputSplit[] inputSplits = inputFormat.createInputSplits(3);
 
                byte[] bytes = null;
+               byte prev = 0;
                for (FileInputSplit inputSplit : inputSplits) {
                        inputFormat.open(inputSplit);
                        while (!inputFormat.reachedEnd()) {
                                if ((bytes = inputFormat.nextRecord(bytes)) != 
null) {
-                                       Assert.assertArrayEquals(new 
byte[]{(byte) 0xFE}, bytes);
+                                       Assert.assertArrayEquals(new 
byte[]{--prev}, bytes);
                                }
                        }
                }
@@ -420,14 +421,13 @@ public class FileInputFormatTest {
                }
        }
 
-
        private static final class MyDecoratedInputFormat extends 
FileInputFormat<byte[]> {
 
                private static final long serialVersionUID = 1L;
 
                @Override
                public boolean reachedEnd() throws IOException {
-                       return this.splitLength <= this.stream.getPos();
+                       return this.stream.getPos() >= this.splitStart + 
this.splitLength;
                }
 
                @Override
@@ -442,7 +442,6 @@ public class FileInputFormatTest {
                        inputStream = super.decorateInputStream(inputStream, 
fileSplit);
                        return new InputStreamFSInputWrapper(new 
InvertedInputStream(inputStream));
                }
-
        }
 
        private static final class InvertedInputStream extends InputStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 2ff6fab..b00ca95 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -127,17 +128,29 @@ public abstract class SequentialFormatTestBase<T> extends 
TestLogger {
         * Tests if the expected sequence and amount of data can be read
         */
        @Test
-       public void checkRead() throws IOException {
+       public void checkRead() throws Exception {
                BinaryInputFormat<T> input = this.createInputFormat();
                FileInputSplit[] inputSplits = input.createInputSplits(0);
                Arrays.sort(inputSplits, new InputSplitSorter());
+
                int readCount = 0;
+
                for (FileInputSplit inputSplit : inputSplits) {
                        input.open(inputSplit);
+                       input.reopen(inputSplit, input.getCurrentState());
+
                        T record = createInstance();
+
                        while (!input.reachedEnd()) {
                                if (input.nextRecord(record) != null) {
                                        
this.checkEquals(this.getRecord(readCount), record);
+
+                                       if (!input.reachedEnd()) {
+                                               Tuple2<Long, Long> state = 
input.getCurrentState();
+
+                                               input = 
this.createInputFormat();
+                                               input.reopen(inputSplit, state);
+                                       }
                                        readCount++;
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml
index 2c9500f..53cb503 100644
--- a/flink-fs-tests/pom.xml
+++ b/flink-fs-tests/pom.xml
@@ -78,7 +78,15 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
-               
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-hdfs</artifactId>
@@ -94,5 +102,6 @@ under the License.
                        <type>test-jar</type>
                        
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
                </dependency>
+
        </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
new file mode 100644
index 0000000..e6cd5d9
--- /dev/null
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ContinuousFileMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+       private static final int NO_OF_FILES = 10;
+       private static final int LINES_PER_FILE = 10;
+
+       private static final long INTERVAL = 100;
+
+       private File baseDir;
+
+       private org.apache.hadoop.fs.FileSystem hdfs;
+       private String hdfsURI;
+       private MiniDFSCluster hdfsCluster;
+
+       private static Map<Integer, String> expectedContents = new HashMap<>();
+
+       //                                              PREPARING FOR THE TESTS
+
+       @Before
+       public void createHDFS() {
+               try {
+                       baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+                       FileUtil.fullyDelete(baseDir);
+
+                       org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+                       hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+                       hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+                       MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+                       hdfsCluster = builder.build();
+
+                       hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+                       hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+               } catch(Throwable e) {
+                       e.printStackTrace();
+                       Assert.fail("Test failed " + e.getMessage());
+               }
+       }
+
+       @After
+       public void destroyHDFS() {
+               try {
+                       FileUtil.fullyDelete(baseDir);
+                       hdfsCluster.shutdown();
+               } catch (Throwable t) {
+                       throw new RuntimeException(t);
+               }
+       }
+
+       //                                              END OF PREPARATIONS
+
+       @Override
+       protected void testProgram() throws Exception {
+
+               /*
+               * This test checks the interplay between the monitor and the 
reader
+               * and also the failExternally() functionality. To test the 
latter we
+               * set the parallelism to 1 so that we have the chaining between 
the sink,
+               * which throws the SuccessException to signal the end of the 
test, and the
+               * reader.
+               * */
+
+               FileCreator fileCreator = new FileCreator(INTERVAL);
+               Thread t = new Thread(fileCreator);
+               t.start();
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               format.setFilePath(hdfsURI);
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setParallelism(1);
+
+                       ContinuousFileMonitoringFunction<String> 
monitoringFunction =
+                               new ContinuousFileMonitoringFunction<>(format, 
hdfsURI,
+                                       FilePathFilter.createDefaultFilter(),
+                                       FileProcessingMode.PROCESS_CONTINUOUSLY,
+                                       env.getParallelism(), INTERVAL);
+
+                       TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+                       ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
+                       TestingSinkFunction sink = new 
TestingSinkFunction(monitoringFunction);
+
+                       DataStream<FileInputSplit> splits = 
env.addSource(monitoringFunction);
+                       splits.transform("FileSplitReader", typeInfo, 
reader).addSink(sink).setParallelism(1);
+                       env.execute();
+
+               } catch (Exception e) {
+                       Throwable th = e;
+                       int depth = 0;
+
+                       for (; depth < 20; depth++) {
+                               if (th instanceof SuccessException) {
+                                       try {
+                                               postSubmit();
+                                       } catch (Exception e1) {
+                                               e1.printStackTrace();
+                                       }
+                                       return;
+                               } else if (th.getCause() != null) {
+                                       th = th.getCause();
+                               } else {
+                                       break;
+                               }
+                       }
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+
+       private static class TestingSinkFunction extends 
RichSinkFunction<String> {
+
+               private final ContinuousFileMonitoringFunction src;
+
+               private int elementCounter = 0;
+               private Map<Integer, Integer> elementCounters = new HashMap<>();
+               private Map<Integer, List<String>> collectedContent = new 
HashMap<>();
+
+               TestingSinkFunction(ContinuousFileMonitoringFunction 
monitoringFunction) {
+                       this.src = monitoringFunction;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       // this sink can only work with DOP 1
+                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+               }
+
+               @Override
+               public void close() {
+                       // check if the data that we collected are the ones 
they are supposed to be.
+
+                       Assert.assertEquals(collectedContent.size(), 
expectedContents.size());
+                       for (Integer fileIdx: expectedContents.keySet()) {
+                               
Assert.assertTrue(collectedContent.keySet().contains(fileIdx));
+
+                               List<String> cntnt = 
collectedContent.get(fileIdx);
+                               Collections.sort(cntnt, new 
Comparator<String>() {
+                                       @Override
+                                       public int compare(String o1, String 
o2) {
+                                               return getLineNo(o1) - 
getLineNo(o2);
+                                       }
+                               });
+
+                               StringBuilder cntntStr = new StringBuilder();
+                               for (String line: cntnt) {
+                                       cntntStr.append(line);
+                               }
+                               Assert.assertEquals(cntntStr.toString(), 
expectedContents.get(fileIdx));
+                       }
+                       expectedContents.clear();
+
+                       src.cancel();
+                       try {
+                               src.close();
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               private int getLineNo(String line) {
+                       String[] tkns = line.split("\\s");
+                       Assert.assertTrue(tkns.length == 6);
+                       return Integer.parseInt(tkns[tkns.length - 1]);
+               }
+
+               @Override
+               public void invoke(String value) throws Exception {
+                       int fileIdx = 
Character.getNumericValue(value.charAt(0));
+
+                       Integer counter = elementCounters.get(fileIdx);
+                       if (counter == null) {
+                               counter = 0;
+                       } else if (counter == LINES_PER_FILE) {
+                               // ignore duplicate lines.
+                               Assert.fail("Duplicate lines detected.");
+                       }
+                       elementCounters.put(fileIdx, ++counter);
+
+                       List<String> content = collectedContent.get(fileIdx);
+                       if (content == null) {
+                               content = new ArrayList<>();
+                               collectedContent.put(fileIdx, content);
+                       }
+                       content.add(value + "\n");
+
+                       elementCounter++;
+                       if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
+                               throw new SuccessException();
+                       }
+               }
+       }
+
+       /**
+        * A separate thread creating {@link #NO_OF_FILES} files, one file 
every {@link #INTERVAL} milliseconds.
+        * It serves for testing the file monitoring functionality of the 
{@link ContinuousFileMonitoringFunction}.
+        * The files are filled with data by the {@link #fillWithData(String, 
String, int, String)} method.
+        * */
+       private class FileCreator implements Runnable {
+
+               private final long interval;
+
+               FileCreator(long interval) {
+                       this.interval = interval;
+               }
+
+               public void run() {
+                       try {
+                               for (int i = 0; i < NO_OF_FILES; i++) {
+                                       fillWithData(hdfsURI, "file", i, "This 
is test line.");
+                                       Thread.sleep(interval);
+                               }
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       } catch (InterruptedException e) {
+                               // we just close without any message.
+                       }
+               }
+       }
+
+       /**
+        * Fill the file with content.
+        * */
+       private void fillWithData(String base, String fileName, int fileIdx, 
String sampleLine) throws IOException {
+               assert (hdfs != null);
+
+               org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+
+               org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+               FSDataOutputStream stream = hdfs.create(tmp);
+               StringBuilder str = new StringBuilder();
+               for (int i = 0; i < LINES_PER_FILE; i++) {
+                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+                       str.append(line);
+                       stream.write(line.getBytes());
+               }
+               stream.close();
+
+               hdfs.rename(tmp, file);
+
+               expectedContents.put(fileIdx, str.toString());
+
+               Assert.assertTrue("No result file present", hdfs.exists(file));
+       }
+
+       public static class SuccessException extends Exception {
+               private static final long serialVersionUID = 
-7011865671593955887L;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
new file mode 100644
index 0000000..87567e3
--- /dev/null
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class ContinuousFileMonitoringTest {
+
+       private static final int NO_OF_FILES = 10;
+       private static final int LINES_PER_FILE = 10;
+
+       private static final long INTERVAL = 100;
+
+       private static File baseDir;
+
+       private static org.apache.hadoop.fs.FileSystem hdfs;
+       private static String hdfsURI;
+       private static MiniDFSCluster hdfsCluster;
+
+       //                                              PREPARING FOR THE TESTS
+
+       @BeforeClass
+       public static void createHDFS() {
+               try {
+                       baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+                       FileUtil.fullyDelete(baseDir);
+
+                       org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+                       hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+                       hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+                       MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+                       hdfsCluster = builder.build();
+
+                       hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+                       hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+               } catch(Throwable e) {
+                       e.printStackTrace();
+                       Assert.fail("Test failed " + e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               try {
+                       FileUtil.fullyDelete(baseDir);
+                       hdfsCluster.shutdown();
+               } catch (Throwable t) {
+                       throw new RuntimeException(t);
+               }
+       }
+
+       //                                              END OF PREPARATIONS
+
+       //                                              TESTS
+
+       @Test
+       public void testFileReadingOperator() throws Exception {
+               Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+               Map<Integer, String> expectedFileContents = new HashMap<>();
+               for(int i = 0; i < NO_OF_FILES; i++) {
+                       Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+                       filesCreated.add(file.f0);
+                       expectedFileContents.put(i, file.f1);
+               }
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+               ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
+               OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
+                       new OneInputStreamOperatorTestHarness<>(reader);
+
+               reader.setOutputType(typeInfo, new ExecutionConfig());
+               tester.open();
+
+               // create the necessary splits for the test
+               FileInputSplit[] splits = format.createInputSplits(
+                       
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+               // and feed them to the operator
+               for(FileInputSplit split: splits) {
+                       tester.processElement(new StreamRecord<>(split));
+               }
+
+               // then close the reader gracefully
+               synchronized (tester.getCheckpointLock()) {
+                       tester.close();
+               }
+
+               /*
+               * Given that the reader is multithreaded, the test finishes 
before the reader thread finishes
+               * reading. This results in files being deleted by the test 
before being read, thus throwing an exception.
+               * In addition, even if file deletion happens at the end, the 
results are not ready for testing.
+               * To face this, we wait until all the output is collected or 
until the waiting time exceeds 1000 ms, or 1s.
+               */
+
+               long start = System.currentTimeMillis();
+               Queue<Object> output;
+               do {
+                       output = tester.getOutput();
+                       Thread.sleep(50);
+               } while ((output == null || output.size() != NO_OF_FILES * 
LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+
+               Map<Integer, List<String>> actualFileContents = new HashMap<>();
+               for(Object line: tester.getOutput()) {
+                       StreamRecord<String> element = (StreamRecord<String>) 
line;
+
+                       int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
+                       List<String> content = actualFileContents.get(fileIdx);
+                       if(content == null) {
+                               content = new ArrayList<>();
+                               actualFileContents.put(fileIdx, content);
+                       }
+                       content.add(element.getValue() +"\n");
+               }
+
+               Assert.assertEquals(actualFileContents.size(), 
expectedFileContents.size());
+               for (Integer fileIdx: expectedFileContents.keySet()) {
+                       
Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
+
+                       List<String> cntnt = actualFileContents.get(fileIdx);
+                       Collections.sort(cntnt, new Comparator<String>() {
+                               @Override
+                               public int compare(String o1, String o2) {
+                                       return getLineNo(o1) - getLineNo(o2);
+                               }
+                       });
+
+                       StringBuilder cntntStr = new StringBuilder();
+                       for (String line: cntnt) {
+                               cntntStr.append(line);
+                       }
+                       Assert.assertEquals(cntntStr.toString(), 
expectedFileContents.get(fileIdx));
+               }
+
+               for(org.apache.hadoop.fs.Path file: filesCreated) {
+                       hdfs.delete(file, false);
+               }
+       }
+
+       private static class PathFilter extends FilePathFilter {
+
+               @Override
+               public boolean filterPath(Path filePath) {
+                       return filePath.getName().startsWith("**");
+               }
+       }
+
+       @Test
+       public void testFilePathFiltering() throws Exception {
+               Set<String> uniqFilesFound = new HashSet<>();
+               Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+
+               // create the files to be discarded
+               for (int i = 0; i < NO_OF_FILES; i++) {
+                       Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+                       filesCreated.add(file.f0);
+               }
+
+               // create the files to be kept
+               for (int i = 0; i < NO_OF_FILES; i++) {
+                       Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+                       filesCreated.add(file.f0);
+               }
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               ContinuousFileMonitoringFunction<String> monitoringFunction =
+                       new ContinuousFileMonitoringFunction<>(format, hdfsURI, 
new PathFilter(),
+                               FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+               monitoringFunction.open(new Configuration());
+               monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+               Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+               for(int i = 0; i < NO_OF_FILES; i++) {
+                       org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
+                       
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
+               }
+
+               for(org.apache.hadoop.fs.Path file: filesCreated) {
+                       hdfs.delete(file, false);
+               }
+       }
+
+       @Test
+       public void testFileSplitMonitoringReprocessWithAppended() throws 
Exception {
+               Set<String> uniqFilesFound = new HashSet<>();
+
+               FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
+               fc.start();
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               ContinuousFileMonitoringFunction<String> monitoringFunction =
+                       new ContinuousFileMonitoringFunction<>(format, hdfsURI, 
FilePathFilter.createDefaultFilter(),
+                               FileProcessingMode.PROCESS_CONTINUOUSLY, 1, 
INTERVAL);
+
+               monitoringFunction.open(new Configuration());
+               monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+               // wait until the sink also sees all the splits.
+               synchronized (uniqFilesFound) {
+                       while (uniqFilesFound.size() < NO_OF_FILES) {
+                               uniqFilesFound.wait(7 * INTERVAL);
+                       }
+               }
+
+               Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
+               Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+
+               Set<org.apache.hadoop.fs.Path> filesCreated = 
fc.getFilesCreated();
+               Set<String> fileNamesCreated = new HashSet<>();
+               for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
+                       fileNamesCreated.add(path.toString());
+               }
+
+               for(String file: uniqFilesFound) {
+                       Assert.assertTrue(fileNamesCreated.contains(file));
+               }
+
+               for(org.apache.hadoop.fs.Path file: filesCreated) {
+                       hdfs.delete(file, false);
+               }
+       }
+
+       @Test
+       public void testFileSplitMonitoringProcessOnce() throws Exception {
+               Set<String> uniqFilesFound = new HashSet<>();
+
+               FileCreator fc = new FileCreator(INTERVAL, 1);
+               fc.start();
+
+               // to make sure that at least one file is created
+               Set<org.apache.hadoop.fs.Path> filesCreated = 
fc.getFilesCreated();
+               synchronized (filesCreated) {
+                       if (filesCreated.size() == 0) {
+                               filesCreated.wait();
+                       }
+               }
+               Assert.assertTrue(fc.getFilesCreated().size() >= 1);
+
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               ContinuousFileMonitoringFunction<String> monitoringFunction =
+                       new ContinuousFileMonitoringFunction<>(format, hdfsURI, 
FilePathFilter.createDefaultFilter(),
+                               FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+               monitoringFunction.open(new Configuration());
+               monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+               // wait until all the files are created
+               fc.join();
+
+               Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+
+               Set<String> fileNamesCreated = new HashSet<>();
+               for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
+                       fileNamesCreated.add(path.toString());
+               }
+
+               Assert.assertTrue(uniqFilesFound.size() >= 1 && 
uniqFilesFound.size() < fileNamesCreated.size());
+               for(String file: uniqFilesFound) {
+                       Assert.assertTrue(fileNamesCreated.contains(file));
+               }
+
+               for(org.apache.hadoop.fs.Path file: filesCreated) {
+                       hdfs.delete(file, false);
+               }
+       }
+
+       // -------------                End of Tests
+
+       private int getLineNo(String line) {
+               String[] tkns = line.split("\\s");
+               Assert.assertTrue(tkns.length == 6);
+               return Integer.parseInt(tkns[tkns.length - 1]);
+       }
+
+       /**
+        * A separate thread creating {@link #NO_OF_FILES} files, one file 
every {@link #INTERVAL} milliseconds.
+        * It serves for testing the file monitoring functionality of the 
{@link ContinuousFileMonitoringFunction}.
+        * The files are filled with data by the {@link #fillWithData(String, 
String, int, String)} method.
+        * */
+       private class FileCreator extends Thread {
+
+               private final long interval;
+               private final int noOfFilesBeforeNotifying;
+
+               private final Set<org.apache.hadoop.fs.Path> filesCreated = new 
HashSet<>();
+
+               FileCreator(long interval, int notificationLim) {
+                       this.interval = interval;
+                       this.noOfFilesBeforeNotifying = notificationLim;
+               }
+
+               public void run() {
+                       try {
+                               for(int i = 0; i < NO_OF_FILES; i++) {
+                                       Tuple2<org.apache.hadoop.fs.Path, 
String> file =
+                                               fillWithData(hdfsURI, "file", 
i, "This is test line.");
+
+                                       synchronized (filesCreated) {
+                                               filesCreated.add(file.f0);
+                                               if (filesCreated.size() == 
noOfFilesBeforeNotifying) {
+                                                       
filesCreated.notifyAll();
+                                               }
+                                       }
+                                       Thread.sleep(interval);
+                               }
+                       } catch (IOException | InterruptedException e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               Set<org.apache.hadoop.fs.Path> getFilesCreated() {
+                       return this.filesCreated;
+               }
+       }
+
+       private class TestingSourceContext implements 
SourceFunction.SourceContext<FileInputSplit> {
+
+               private final ContinuousFileMonitoringFunction src;
+               private final Set<String> filesFound;
+               private final Object lock = new Object();
+
+               TestingSourceContext(ContinuousFileMonitoringFunction 
monitoringFunction, Set<String> uniqFilesFound) {
+                       this.filesFound = uniqFilesFound;
+                       this.src = monitoringFunction;
+               }
+
+               @Override
+               public void collect(FileInputSplit element) {
+
+                       String filePath = element.getPath().toString();
+                       if (filesFound.contains(filePath)) {
+                               // check if we have duplicate splits that are 
open during the first time
+                               // the monitor sees them, and they then close, 
so the modification time changes.
+                               Assert.fail("Duplicate file: " + filePath);
+                       }
+
+                       filesFound.add(filePath);
+                       try {
+                               if (filesFound.size() == NO_OF_FILES) {
+                                       this.src.cancel();
+                                       this.src.close();
+                                       synchronized (filesFound) {
+                                               filesFound.notifyAll();
+                                       }
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               @Override
+               public void collectWithTimestamp(FileInputSplit element, long 
timestamp) {
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return lock;
+               }
+
+               @Override
+               public void close() {
+               }
+       }
+
+       /**
+        * Fill the file with content.
+        * */
+       private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(String 
base, String fileName, int fileIdx, String sampleLine) throws IOException {
+               assert (hdfs != null);
+
+               org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+               Assert.assertTrue (!hdfs.exists(file));
+
+               org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+               FSDataOutputStream stream = hdfs.create(tmp);
+               StringBuilder str = new StringBuilder();
+               for(int i = 0; i < LINES_PER_FILE; i++) {
+                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+                       str.append(line);
+                       stream.write(line.getBytes());
+               }
+               stream.close();
+
+               hdfs.rename(tmp, file);
+
+               Assert.assertTrue("No result file present", hdfs.exists(file));
+               return new Tuple2<>(file, str.toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index f44fe9e..ecf55c3 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -20,7 +20,6 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
@@ -58,23 +58,135 @@ public class CsvInputFormatTest {
        private static final String SECOND_PART = "That is the second part";
 
        @Test
-       public void ignoreInvalidLines() {
+       public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
+               testSplitCsvInputStream(1024 * 1024, false);
+       }
+
+       @Test
+       public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
+               testSplitCsvInputStream(2, false);
+       }
+
+       private void testSplitCsvInputStream(int bufferSize, boolean 
failAtStart) throws Exception {
+               final String fileContent =
+                       "this is|1|2.0|\n"+
+                       "a test|3|4.0|\n" +
+                       "#next|5|6.0|\n" +
+                       "asdadas|5|30.0|\n";
+
+               // create temporary file with 3 blocks
+               final File tempFile = 
File.createTempFile("input-stream-decoration-test", "tmp");
+               tempFile.deleteOnExit();
+
+               FileOutputStream fileOutputStream = new 
FileOutputStream(tempFile);
+               fileOutputStream.write(fileContent.getBytes());
+               fileOutputStream.close();
+
+               // fix the number of blocks and the size of each one.
+               final int noOfBlocks = 3;
+
+               final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = 
TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+               CsvInputFormat<Tuple3<String, Integer, Double>> format = new 
TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
+               format.setLenient(true);
+               format.setBufferSize(bufferSize);
+
+               final Configuration config = new Configuration();
+               format.configure(config);
+
+               long[] offsetsAfterRecord = new long[]{ 15, 29, 42, 58};
+               long[] offsetAtEndOfSplit = new long[]{ 20, 40, 58};
+               int recordCounter = 0;
+               int splitCounter = 0;
+
+               FileInputSplit[] inputSplits = 
format.createInputSplits(noOfBlocks);
+               Tuple3<String, Integer, Double> result = new Tuple3<>();
+
+               for (FileInputSplit inputSplit : inputSplits) {
+                       assertEquals(inputSplit.getStart() + 
inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+                       splitCounter++;
+
+                       format.open(inputSplit);
+                       format.reopen(inputSplit, format.getCurrentState());
+
+                       while (!format.reachedEnd()) {
+                               if ((result = format.nextRecord(result)) != 
null) {
+                                       assertEquals((long) 
format.getCurrentState(), offsetsAfterRecord[recordCounter]);
+                                       recordCounter++;
+
+                                       if (recordCounter == 1) {
+                                               assertNotNull(result);
+                                               assertEquals("this is", 
result.f0);
+                                               assertEquals(new Integer(1), 
result.f1);
+                                               assertEquals(new Double(2.0), 
result.f2);
+                                               assertEquals((long) 
format.getCurrentState(), 15);
+                                       } else if (recordCounter == 2) {
+                                               assertNotNull(result);
+                                               assertEquals("a test", 
result.f0);
+                                               assertEquals(new Integer(3), 
result.f1);
+                                               assertEquals(new Double(4.0), 
result.f2);
+                                               assertEquals((long) 
format.getCurrentState(), 29);
+                                       } else if (recordCounter == 3) {
+                                               assertNotNull(result);
+                                               assertEquals("#next", 
result.f0);
+                                               assertEquals(new Integer(5), 
result.f1);
+                                               assertEquals(new Double(6.0), 
result.f2);
+                                               assertEquals((long) 
format.getCurrentState(), 42);
+                                       } else {
+                                               assertNotNull(result);
+                                               assertEquals("asdadas", 
result.f0);
+                                               assertEquals(new Integer(5), 
result.f1);
+                                               assertEquals(new Double(30.0), 
result.f2);
+                                               assertEquals((long) 
format.getCurrentState(), 58);
+                                       }
+
+                                       // simulate checkpoint
+                                       Long state = format.getCurrentState();
+                                       long offsetToRestore = state;
+
+                                       // create a new format
+                                       format = new TupleCsvInputFormat<>(new 
Path(tempFile.toURI()), "\n", "|", typeInfo);
+                                       format.setLenient(true);
+                                       format.setBufferSize(bufferSize);
+                                       format.configure(config);
+
+                                       // simulate the restore operation.
+                                       format.reopen(inputSplit, 
offsetToRestore);
+                               } else {
+                                       result = new Tuple3<>();
+                               }
+                       }
+                       format.close();
+               }
+               Assert.assertEquals(4, recordCounter);
+       }
+
+       @Test
+       public void ignoreInvalidLinesAndGetOffsetInLargeBuffer() {
+               ignoreInvalidLines(1024 * 1024);
+       }
+
+       @Test
+       public void ignoreInvalidLinesAndGetOffsetInSmallBuffer() {
+               ignoreInvalidLines(2);
+       }
+
+       private void ignoreInvalidLines(int bufferSize) {
                try {
-                       
-                       
                        final String fileContent =  "#description of the 
data\n" + 
                                                                                
"header1|header2|header3|\n"+
                                                                                
"this is|1|2.0|\n"+
                                                                                
"//a comment\n" +
                                                                                
"a test|3|4.0|\n" +
-                                                                               
"#next|5|6.0|\n";
-                       
+                                                                               
"#next|5|6.0|\n" +
+                                                                               
"asdasdas";
+
                        final FileInputSplit split = 
createTempFile(fileContent);
 
                        final TupleTypeInfo<Tuple3<String, Integer, Double>> 
typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, 
Double.class);
                        final CsvInputFormat<Tuple3<String, Integer, Double>> 
format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", 
"|", typeInfo);
                        format.setLenient(true);
-               
+                       format.setBufferSize(bufferSize);
+
                        final Configuration parameters = new Configuration();
                        format.configure(parameters);
                        format.open(split);
@@ -86,21 +198,25 @@ public class CsvInputFormatTest {
                        assertEquals("this is", result.f0);
                        assertEquals(new Integer(1), result.f1);
                        assertEquals(new Double(2.0), result.f2);
-                       
+                       assertEquals((long) format.getCurrentState(), 65);
+
                        result = format.nextRecord(result);
                        assertNotNull(result);
                        assertEquals("a test", result.f0);
                        assertEquals(new Integer(3), result.f1);
                        assertEquals(new Double(4.0), result.f2);
-                       
+                       assertEquals((long) format.getCurrentState(), 91);
+
                        result = format.nextRecord(result);
                        assertNotNull(result);
                        assertEquals("#next", result.f0);
                        assertEquals(new Integer(5), result.f1);
                        assertEquals(new Double(6.0), result.f2);
+                       assertEquals((long) format.getCurrentState(), 104);
 
                        result = format.nextRecord(result);
                        assertNull(result);
+                       assertEquals(fileContent.length(), (long) 
format.getCurrentState());
                }
                catch (Exception ex) {
                        ex.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 80c5fbc..ee4d31f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -101,8 +101,8 @@ public class RuntimeEnvironment implements Environment {
                        InputGate[] inputGates,
                        ActorGateway jobManager,
                        TaskManagerRuntimeInfo taskManagerInfo,
-                       Task containingTask,
-                       TaskMetricGroup metrics) {
+                       TaskMetricGroup metrics,
+                       Task containingTask) {
 
                this.jobId = checkNotNull(jobId);
                this.jobVertexId = checkNotNull(jobVertexId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c1cbaa6..548d7d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,7 +526,7 @@ public class Task implements Runnable {
                                        userCodeClassLoader, memoryManager, 
ioManager,
                                        broadcastVariableManager, 
accumulatorRegistry,
                                        splitProvider, distributedCacheEntries,
-                                       writers, inputGates, jobManager, 
taskManagerConfig, this, metrics);
+                                       writers, inputGates, jobManager, 
taskManagerConfig, metrics, this);
 
                        // let the task code create its readers and writers
                        invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 21ad762..e824758 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -44,6 +44,11 @@ public class DataStreamSource<T> extends 
SingleOutputStreamOperator<T> {
                }
        }
 
+       public DataStreamSource(SingleOutputStreamOperator<T> operator) {
+               super(operator.environment, operator.getTransformation());
+               this.isParallel = true;
+       }
+
        @Override
        public DataStreamSource<T> setParallelism(int parallelism) {
                if (parallelism > 1 && !isParallel) {

Reply via email to