[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-28 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61402967
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+@Internal
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   private Configuration configuration;
--- End diff --

We should explain why we have the Configuration here. That the 
Configuration that we get in open() is not valid. Same in the 
FileSplitReadOperator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-28 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61402951
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
--- End diff --

In the Kafka source we have it like this as well. I think if it is set in 
open this can lead to race conditions with cancel being called at weird places 
and stuff. I vaguely remember that that was the reason for why it is like this 
in the Kafka source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-28 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61402512
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 100;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private static Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   private static final Object lock = new Object();
+
+   @Override
+   protected void testProgram() throws Exception {
+   FileCreator fileCreator = new FileCreator(INTERVAL);
+   Thread t = new Thread(fileCreator);
+   t.start();
+
+   StringFileFormat format = new StringFileFormat();
+   format.setFilePath(hdfsURI);
+
+   Configuration config = new Configuration();
+   config.setString("input.file.path", hdfsURI);
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61273003
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61233316
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   private static final Object lock = new Object();
+
+   private boolean[] finished;
+
+   @Override
+   protected void testProgram() throws Exception {
+   FileCreator fileCreator = new FileCreator(INTERVAL);
+   Thread t = new Thread(fileCreator);
+   t.start();
+   Thread.sleep(100);
+
+   StringFileFormat format = new StringFileFormat();
+   format.setFilePath(hdfsURI);
+
+   Configuration config = new 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61233284
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
--- End diff --

These fields are not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61232860
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private static File baseDir;
+
+   private static org.apache.hadoop.fs.FileSystem hdfs;
+   private static String hdfsURI;
+   private static MiniDFSCluster hdfsCluster;
+
+   //  PREPARING FOR THE TESTS
+
+   @BeforeClass
+   public static void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @AfterClass
+   public static void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileReadingOperator() throws Exception {
+   Set filesCreated = new HashSet<>();
+   Map fileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   fileContents.put(i, file.f1);
+   }
+
+   StringFileFormat format = new StringFileFormat();
+   Configuration config = new Configuration();
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61232298
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61231656
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61231507
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61231326
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
--- End diff --

I think the queue can be moved to the `SplitReader`, the split reader can 
then have a method `addSplit`. This would better isolate concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230608
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
--- End 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230524
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
--- End diff --

Superfluous `OUT` parameter, can be `<>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230544
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
--- End diff --

Missing generic, can be `<>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230370
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = false;
+
+   private Configuration configuration;
--- End diff --

We should explain why we have the `Configuration` here. That the 
`Configuration` that we get in `open()` is not valid. Same in the 
`FileSplitReadOperator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61229319
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = false;
+
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, 
interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   FilePathFilter filter, WatchType watchType, int 
readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+   this.configuration = Preconditions.checkNotNull(configuration);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61228702
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = false;
+
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, 
interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   FilePathFilter filter, WatchType watchType, int 
readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+   this.configuration = Preconditions.checkNotNull(configuration);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61228675
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
--- End diff --

We should declare as `@Internal`, just to be on the save side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61228546
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = false;
+
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, 
interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   FilePathFilter filter, WatchType watchType, int 
readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+   this.configuration = Preconditions.checkNotNull(configuration);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61065673
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   private static final Object lock = new Object();
+
+   private boolean[] finished;
+
+   @Override
+   protected void testProgram() throws Exception {
+   FileCreator fileCreator = new FileCreator(INTERVAL);
+   Thread t = new Thread(fileCreator);
+   t.start();
+   Thread.sleep(100);
+
+   StringFileFormat format = new StringFileFormat();
+   format.setFilePath(hdfsURI);
+
+   Configuration config = new 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61063235
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   private static final Object lock = new Object();
+
+   private boolean[] finished;
+
+   @Override
+   protected void testProgram() throws Exception {
+   FileCreator fileCreator = new FileCreator(INTERVAL);
+   Thread t = new Thread(fileCreator);
+   t.start();
+   Thread.sleep(100);
+
+   StringFileFormat format = new StringFileFormat();
+   format.setFilePath(hdfsURI);
+
+   Configuration config = new 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61062846
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   private static final Object lock = new Object();
+
+   private boolean[] finished;
+
+   @Override
+   protected void testProgram() throws Exception {
+   FileCreator fileCreator = new FileCreator(INTERVAL);
+   Thread t = new Thread(fileCreator);
+   t.start();
+   Thread.sleep(100);
+
+   StringFileFormat format = new StringFileFormat();
+   format.setFilePath(hdfsURI);
+
+   Configuration config = new 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-26 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214664097
  
One additional remark. I'd like to get rid of these fields in 
`FileSplitMonitoringFunctionTest`:
```
private Set hdPaths = new HashSet<>();
private Set hdPathNames = new HashSet<>();
private Map hdPathContents = new HashMap<>();
```
and replace them by local variables in the tests. Like it is now the 
interactions of the different methods are hard to follow. Also, in the 
`FileSplitMonitoringITCase` the fields are there but not read.

Also, we should replace the `@Before` and `@After` hooks by `@BeforeClass` 
and `@AfterClass` hooks because the creation/deletion of the HDFS cluster is 
very expensive and we do it once per test right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61043013
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

Ah alright, i thought you meant that all sources going through addSource() 
are executed in a non-parallel fashion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60943147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

Actually in the addSource() there is this line:
boolean isParallel = function instanceof ParallelSourceFunction;

whose result is passed in the constructor of the StreamSource.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60942903
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
--- End diff --

I would prefer local files since
* we would no longer require the hadoop dependency
* it probably reduces the test time
* you interact very little with the FIleSystem anyway


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60942837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

yup you're right, didn't know about the RIchParallelSourceFunction class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60942382
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

addSource(...) makes no such guarantee, all calls like readFile(..) etc. 
all go through addSource(...).

You could be right about the NonParallelInput though, will check quickly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214420551
  
We may want to think about adding a createInputSplits(int minNumSplits, 
List files) to the FileInputFormat class; as it stands it scans through 
the entire directory although we could already exclude several files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60939797
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

This is used for input formats, AFAIK. The RichSourceFunction is guaranteed 
to be of parallelism 1.
As I understand from the addSource(SourceFunction function, String 
sourceName, TypeInformation typeInfo) in the StreamExecutionEnvironment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60939583
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   format.configure(parameters);
+   }
+
+   /**
+* Creates the input splits for the path to be assigned to the 
downstream tasks.
+* Those are going to read their contents for further 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60939625
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   format.configure(parameters);
+   }
+
+   /**
+* Creates the input splits for the path to be assigned to the 
downstream tasks.
+* Those are going to read their contents for further 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60938941
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
--- End diff --

now this is really nit-picky, but i would prefer this being false by 
default, and being set to true in open().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60938712
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

This function should implement NonParallelInput; it guarantees that it will 
always run with a parallelism of 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60937977
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
--- End diff --

Please add a JavaDoc to this class describing how/where it is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60937348
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -35,12 +35,9 @@
 import java.util.Set;
 
 /**
- * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
- * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
- *
- * This method keeps track of which splits have already being processed by 
which task, and at which point
- * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
- * of the last modification time for each file, so that it can detect new 
data.
+ * This is the single (non-parallel) task, that monitors a user-provided 
path, and assigns splits
--- End diff --

both commas don't belong in this sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60936805
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
--- End diff --

Yes, a local filesystem would do the job. 
Just wanted to have some tests with HDFS to be sure that it works, as this 
is closer to a distributed deployment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60935904
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60933644
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
--- End diff --

this test doesn't test anything in regards to the MonitoringFunction, does 
it?


---
If your project is set up for it, you 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60934370
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
+   // validates the output
+   for (org.apache.hadoop.fs.Path file : hdPaths) {
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60932653
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java
 ---
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Created by kkloudas on 4/25/16.
--- End diff --

What do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60932233
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60932263
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60931033
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
--- End diff --

method? This is the class javadoc!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214387476
  
Can you provide a rough description as to how the 
FileSourceMonitoringFunction works and how it interacts with the actual formats?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60930454
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java
 ---
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Created by kkloudas on 4/25/16.
+ */
+public class DefaultPathFilter {
--- End diff --

this class is never used as far as i can tell.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60929807
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
+   // validates the output
+   for (org.apache.hadoop.fs.Path file : hdPaths) {
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60929665
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
+   // validates the output
+   for (org.apache.hadoop.fs.Path file : hdPaths) {
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928894
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
--- End diff --

is it really necessary to spin up a dfs cluster for this test? it should 
just as well with local files, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928614
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
--- End diff --

where is ONLY_NEW_FILES ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928509
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
--- End diff --

I would remove the ```This can be currently done in 3 ways.``` part. It 
doesn't really add anything to the description, and is likely to become 
out-dated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928251
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
+* ignored when processing a directory, e.g.
+* 
+* {@code
+*
+* public boolean filterPaths(Path filePath) {
+* return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+* }
+* }
+* */
+   boolean filterPaths(Path filePath);
+
+
+   /**
+* The default file path filtering method and is used
+* if no other such function is provided. This filter leaves out
+* files starting with ".", "_", and "_COPYING_".
+* */
+   public class DefaultFilter implements FilePathFilter {
+
+   private static DefaultFilter instance = null;
+
+   DefaultFilter() {
+
--- End diff --

unnecessary empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928155
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
+* ignored when processing a directory, e.g.
+* 
+* {@code
+*
+* public boolean filterPaths(Path filePath) {
+* return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+* }
+* }
+* */
+   boolean filterPaths(Path filePath);
--- End diff --

IMO this method should be called ```filterPath``` since it only receives a 
single path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928227
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
+* ignored when processing a directory, e.g.
+* 
+* {@code
+*
+* public boolean filterPaths(Path filePath) {
+* return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+* }
+* }
+* */
+   boolean filterPaths(Path filePath);
+
+
+   /**
+* The default file path filtering method and is used
+* if no other such function is provided. This filter leaves out
+* files starting with ".", "_", and "_COPYING_".
+* */
+   public class DefaultFilter implements FilePathFilter {
+
+   private static DefaultFilter instance = null;
+
+   DefaultFilter() {
+
+   }
+
+   public static DefaultFilter getInstance() {
+   if(instance == null) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928005
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java
 ---
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Created by kkloudas on 4/25/16.
--- End diff --

did you meant to include this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60927886
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -458,7 +458,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* every 100 milliseconds.
*
*/
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
--- End diff --

unrelated change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60927783
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+