[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-26 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/1984


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1984#issuecomment-219742461
  
Thanks for the comments @aljoscha . 

The only comment not yet integrated is the one with the 
{{OutputTypeConfigurable}} which I have to understand a bit better how to 
implement correctly. As for the spaces, it it intellij that add them. I will 
try to fix them also later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63523040
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator
+   implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitReadOperator.class);
+
+   private static final FileInputSplit EOF = new FileInputSplit(-1, null, 
-1, -1, null);
+
+   private transient SplitReader reader;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+
+   private Tuple3 readerState;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = checkNotNull(format);
+   this.typeInfo = checkNotNull(typeInfo);
+   this.configuration = checkNotNull(configuration);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector<>(output);
+
+   TypeSerializer serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+   Object checkpointLock = getContainingTask().getCheckpointLock();
+
+   this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock);
+   this.reader.setReaderState(this.readerState);
+   this.reader.start();
+   this.readerState = null;
+   }

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63521733
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator
+   implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitReadOperator.class);
+
+   private static final FileInputSplit EOF = new FileInputSplit(-1, null, 
-1, -1, null);
+
+   private transient SplitReader reader;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+
+   private Tuple3 readerState;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = checkNotNull(format);
+   this.typeInfo = checkNotNull(typeInfo);
+   this.configuration = checkNotNull(configuration);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector<>(output);
+
+   TypeSerializer serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+   Object checkpointLock = getContainingTask().getCheckpointLock();
+
+   this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock);
+   this.reader.setReaderState(this.readerState);
+   this.reader.start();
+   this.readerState = null;
+   }
+
  

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1984#issuecomment-219673873
  
Overall, the code looks very good! I had some inline comments about 
Javadoc/comments.

One thing that might be wrong, though is the interplay between 
`CheckpointableInputFormat.getCurrentChannelState()` and 
`SplitReader.getReaderState()`. In the latter to check whether the result of 
the former is null, and then do something based on this. The former, however 
will never return `null` but instead always returns a `Tuple2` that can have 
fields set to `null`. This might be an artifact form an earlier mode of 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63495352
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator
+   implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitReadOperator.class);
+
+   private static final FileInputSplit EOF = new FileInputSplit(-1, null, 
-1, -1, null);
+
+   private transient SplitReader reader;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+
+   private Tuple3 readerState;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = checkNotNull(format);
+   this.typeInfo = checkNotNull(typeInfo);
+   this.configuration = checkNotNull(configuration);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector<>(output);
+
+   TypeSerializer serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+   Object checkpointLock = getContainingTask().getCheckpointLock();
+
+   this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock);
+   this.reader.setReaderState(this.readerState);
+   this.reader.start();
+   this.readerState = null;
+   }

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63494863
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator
+   implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitReadOperator.class);
+
+   private static final FileInputSplit EOF = new FileInputSplit(-1, null, 
-1, -1, null);
+
+   private transient SplitReader reader;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+
+   private Tuple3 readerState;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = checkNotNull(format);
+   this.typeInfo = checkNotNull(typeInfo);
+   this.configuration = checkNotNull(configuration);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector<>(output);
+
+   TypeSerializer serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+   Object checkpointLock = getContainingTask().getCheckpointLock();
+
+   this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock);
+   this.reader.setReaderState(this.readerState);
+   this.reader.start();
+   this.readerState = null;
+   }

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63494540
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator
+   implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitReadOperator.class);
+
+   private static final FileInputSplit EOF = new FileInputSplit(-1, null, 
-1, -1, null);
+
+   private transient SplitReader reader;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
--- End diff --

This is a very subtle thing but not all `TypeInformation` are 
`Serializable` and none of them should be. This is a problem that we introduced 
a while back.

The way to do it is to implement `OutputTypeConfigurable`, there the 
`TypeSerializer` can be created. In `open()` you should then ensure that you 
actually have a `TypeSerializer`.

And yes, I know that no-one can really know this without having encountered 
a serialization problem once ...  😅


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63493915
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+@Internal
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   PROCESS_ONCE,   // Processes the 
current content of a file/path only ONCE, and stops monitoring.
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private List> 
splitsToFwdOrderedAscByModTime;
+
+   private Tuple2 currentSplitsToFwd;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   /**
+* This is the {@link Configuration} to be used to initialize the input 
format at the reader
+* (see {@link #open(Configuration)}). In the codebase, whenever {@link 
#open(Configuration)} is called,
+* it is passed a new configuration, thus ignoring potential 
user-specified parameters. Now, we pass a
+* configuration object at the constructor, which is shipped to the 
remote tasks.
+* */
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), 

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63493233
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+@Internal
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   PROCESS_ONCE,   // Processes the 
current content of a file/path only ONCE, and stops monitoring.
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private List> 
splitsToFwdOrderedAscByModTime;
+
+   private Tuple2 currentSplitsToFwd;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   /**
+* This is the {@link Configuration} to be used to initialize the input 
format at the reader
+* (see {@link #open(Configuration)}). In the codebase, whenever {@link 
#open(Configuration)} is called,
+* it is passed a new configuration, thus ignoring potential 
user-specified parameters. Now, we pass a
+* configuration object at the constructor, which is shipped to the 
remote tasks.
+* */
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), 

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63492862
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+@Internal
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   PROCESS_ONCE,   // Processes the 
current content of a file/path only ONCE, and stops monitoring.
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private List> 
splitsToFwdOrderedAscByModTime;
+
+   private Tuple2 currentSplitsToFwd;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   /**
+* This is the {@link Configuration} to be used to initialize the input 
format at the reader
+* (see {@link #open(Configuration)}). In the codebase, whenever {@link 
#open(Configuration)} is called,
+* it is passed a new configuration, thus ignoring potential 
user-specified parameters. Now, we pass a
+* configuration object at the constructor, which is shipped to the 
remote tasks.
+* */
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), 

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63492517
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+@Internal
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   PROCESS_ONCE,   // Processes the 
current content of a file/path only ONCE, and stops monitoring.
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private List> 
splitsToFwdOrderedAscByModTime;
+
+   private Tuple2 currentSplitsToFwd;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   /**
+* This is the {@link Configuration} to be used to initialize the input 
format at the reader
+* (see {@link #open(Configuration)}). In the codebase, whenever {@link 
#open(Configuration)} is called,
+* it is passed a new configuration, thus ignoring potential 
user-specified parameters. Now, we pass a
+* configuration object at the constructor, which is shipped to the 
remote tasks.
+* */
+   private Configuration configuration;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, Configuration 
configuration,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), 

[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63492326
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+@Internal
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   PROCESS_ONCE,   // Processes the 
current content of a file/path only ONCE, and stops monitoring.
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private List> 
splitsToFwdOrderedAscByModTime;
+
+   private Tuple2 currentSplitsToFwd;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   /**
+* This is the {@link Configuration} to be used to initialize the input 
format at the reader
+* (see {@link #open(Configuration)}). In the codebase, whenever {@link 
#open(Configuration)} is called,
--- End diff --

This is only true for streaming programs, something like `In streaming 
programs, whenever ...` should do the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63492168
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
--- End diff --

We could mention here that it is meant to work together with the 
`FileSplitReadOperator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63491931
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * An interface to be implemented by the user when using the {@link 
FileSplitMonitoringFunction}.
+ * The {@link #filterPath(Path)} method is responsible for deciding if a 
path is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files 
that
+ * are still being written.
+ *
+ *
+ * A default implementation is the {@link DefaultFilter} which excludes 
files starting with ".", "_", or
+ * contain the "_COPYING_" in their names. This can be retrieved by {@link 
DefaultFilter#getInstance()}.
+ * */
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
--- End diff --

I think having a `@return` here is strange. normally a `@return` follows 
the main description body. Could just have `Returns {@code true} ...` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63491422
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -115,7 +115,7 @@ public void testHDFS() {
}

Assert.assertTrue("No result file present", 
hdfs.exists(result));
-   
--- End diff --

Unrelated whitespace change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63491234
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -463,7 +463,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* every 100 milliseconds.
*
*/
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
--- End diff --

Unrelated change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63490526
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 ---
@@ -394,4 +394,4 @@ public IntValue nextRecord(IntValue reuse) throws 
IOException {
return null;
}
}
-}
\ No newline at end of file
+}
--- End diff --

Just whitespace changes in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63490499
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -235,7 +235,7 @@ protected FileInputFormat(Path filePath) {
// 

//  Getters/setters for the configurable parameters
// 

-   
+
--- End diff --

Just whitespace changes in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1984#discussion_r63490061
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java ---
@@ -26,6 +26,11 @@
 import org.apache.flink.core.memory.DataOutputView;
 
 @Public
--- End diff --

`@Public` should go after Javadoc, IMHO. Also, the last line of the Javadoc 
has an extra `*`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-13 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1984#issuecomment-219117647
  
After the discussion we had today with @StephanEwen and @aljoscha , I also 
added the PROCESS_ONCE watchType which processes the current (when invoked) 
content of a file/directory and exits. This is to be able to accommodate 
bounded file sources (a la batch).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...

2016-05-11 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/1984

[FLINK-3889] Make File Monitoring Function checkpointable.

This pull request introduces the underlying functionality to make Streaming 
File Sources persistent. 
It does not yet change the API calls, as this will be done after agreeing 
on the current architecture and 
implementation.

In addition, this PR includes a commit for FLINK-3896. This allows an 
operator to cancel its container task. The need for this functionality came 
during a discussion with @StephanEwen and @aljoscha and it is a separate commit.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink ft_files

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1984


commit 7deb92236cec47ddcfbb3abfa396fd9d15f770b7
Author: kl0u 
Date:   2016-05-10T16:56:58Z

[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom 
Operators
can make their containing task fail when needed.

commit c9682b7606451c4eecf6f2f6df9a498fb6d39577
Author: kl0u 
Date:   2016-04-10T14:56:42Z

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentChannelState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

commit cbbfd8d7e6db0f8f114675b4047aecb94996e500
Author: kl0u 
Date:   2016-04-18T14:37:54Z

[FLINK-3889][FLINK-3808] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 and FLINK-3808 work together. Now we have
a file monitoring source that is also fault tolerant
and can guarantee exactly once semantics.

This does not replace the old API calls. This
will be done in a future commit.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---