[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r64883424
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -135,16 +135,22 @@ protected static void loadGlobalConfigParams() {
private transient int readPos;
 
private transient int limit;
-   
+
+   private transient FileInputSplit currSplit;
+
+   private transient FileInputSplit restoredSplit;
+   private transient Long restoredOffset;
--- End diff --

Maybe use a primitive type here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-24 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r64395572
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 ---
@@ -104,8 +106,25 @@ public void runCheckpointedProgram() {
postSubmit();
}
catch (Exception e) {
+   Throwable th = e;
+   int depth = 0;
+
+   for (; depth < 20; depth++) {
+   if (th instanceof SuccessException) {
+   try {
+   postSubmit();
+   } catch (Exception e1) {
+   e1.printStackTrace();
--- End diff --

Thanks @aljoscha ! Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-24 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r64392589
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 ---
@@ -104,8 +106,25 @@ public void runCheckpointedProgram() {
postSubmit();
}
catch (Exception e) {
+   Throwable th = e;
+   int depth = 0;
+
+   for (; depth < 20; depth++) {
+   if (th instanceof SuccessException) {
+   try {
+   postSubmit();
+   } catch (Exception e1) {
+   e1.printStackTrace();
--- End diff --

Should we not forward the exception here? You introduced this block so that 
`postSubmit()` also runs when the `SuccessException` was thrown, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/2020#issuecomment-221227518
  
CC: @StephanEwen 

By the way, it might not look like it but the only additional methods this 
introduces on `StreamExecutionEnvironment` are are these three:

```
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

WatchType watchType,

long interval)

public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

WatchType watchType,

long interval,

FilePathFilter filter)

public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

WatchType watchType,

long interval,

FilePathFilter filter,

TypeInformation typeInformation)
```

The rest are unfortunately public methods and we can't remove them, even 
though some should probably be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/2020#issuecomment-220988362
  
Thanks, the changes look good. 

R: @StephanEwen for taking a look at the API, you would only look at 
`StreamExecutionEnvironment`, for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-23 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/2020#issuecomment-220986678
  
@aljoscha Thanks a lot for the comments! 
I integrated them already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/2020#issuecomment-220981894
  
All in all, very good work!

One thing I'd like to change is the order of parameters in the `readFile` 
methods. For these telescoping methods is usual to append new parameters to the 
end. For example in these two methods, where you add the additional filter 
method the filter would go to the end on the second method because it is an 
additional parameter. This way, users can just append additional parameters to 
existing ones without changing the order.

```
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

WatchType watchType,

long interval)
```

```
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

WatchType watchType,

FilePathFilter filter,

long interval)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r64221544
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
 ---
@@ -44,6 +44,11 @@ public DataStreamSource(StreamExecutionEnvironment 
environment,
}
}
 
+   public DataStreamSource(SingleOutputStreamOperator operator) {
--- End diff --

Here, we should always set `isParallel` to `true`. It is not quite obvious 
but the field is used to disallow changing the parallelism for a 
`SourceFunction` that cannot be parallelized. Our new operator can always run 
in parallel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r64220867
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * An interface to be implemented by the user when using the {@link 
FileSplitMonitoringFunction}.
+ * The {@link #filterPath(Path)} method is responsible for deciding if a 
path is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files 
that
+ * are still being written.
+ *
+ *
+ * A default implementation is the {@link DefaultFilter} which excludes 
files starting with ".", "_", or
+ * contain the "_COPYING_" in their names. This can be retrieved by {@link 
DefaultFilter#getInstance()}.
+ * */
+public interface FilePathFilter extends Serializable {
--- End diff --

We should probably make this @PublicEvolving, just to be on the save site. 
I can fix it up when merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persi...

2016-05-23 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2020

[FLINK-2314] Make Streaming File Sources Persistent

This PR solves FLINK-2314 and combines a number of sub-tasks. In addition, 
it solves FLINK-3896 which was introduced as part of this task.

The way File Input sources are now processed is the following:
 * One task monitors (parallelism 1) a user-specified path for new 
files/data
 * The above task assigns FileInputSplits to downstream (parallel) 
readers to actually read the data

The monitoring entity scans the path, splits the files to be processed in 
splits, and assigns them downstream. For now, two modes are supported. These 
are the PROCESS_ONCE which just processes the current contents of the path and 
exits, and the REPROCESS_WITH_APPENDED which periodically monitors the path and 
reprocesses new files and (the entire contents of) files with new data.

In addition, these sources are checkpointed, i.e. in the case of a task 
failure the job will resume from where it left off.

Finally, some changes were introduced in the way we are handling 
FileInputFormats after discussions with @aljoscha .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink api_ft_files

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2020


commit 0d378f85ef7beec598701d78e0537f7479be99d9
Author: kl0u 
Date:   2016-05-10T16:56:58Z

[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom 
Operators
can make their containing task fail when needed.

commit 1a06e70d4cc72593663ed5065e9c68c5b9fadac1
Author: kl0u 
Date:   2016-04-10T14:56:42Z

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentChannelState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

commit 13db59ff214c6c1790b73e8b06984a7170924c5a
Author: kl0u 
Date:   2016-04-18T14:37:54Z

[FLINK-3889] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.

commit 0c8e852b96752a716c36452f7ced11c79cca5560
Author: kl0u 
Date:   2016-05-18T14:44:45Z

[FLINK-2314] Make Streaming File Sources Persistent




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---