[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1984 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1984#issuecomment-219742461 Thanks for the comments @aljoscha . The only comment not yet integrated is the one with the {{OutputTypeConfigurable}} which I have to understand a bit better how to implement correctly. As for the spaces, it it intellij that add them. I will try to fix them also later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63523040 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperatorextends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class); + + private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null); + + private transient SplitReader reader; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + + private Tuple3readerState; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = checkNotNull(format); + this.typeInfo = checkNotNull(typeInfo); + this.configuration = checkNotNull(configuration); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector<>(output); + + TypeSerializer serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + Object checkpointLock = getContainingTask().getCheckpointLock(); + + this.reader = new SplitReader<>(format, serializer, collector, checkpointLock); + this.reader.setReaderState(this.readerState); + this.reader.start(); + this.readerState = null; + }
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63521733 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperatorextends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class); + + private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null); + + private transient SplitReader reader; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + + private Tuple3readerState; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = checkNotNull(format); + this.typeInfo = checkNotNull(typeInfo); + this.configuration = checkNotNull(configuration); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector<>(output); + + TypeSerializer serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + Object checkpointLock = getContainingTask().getCheckpointLock(); + + this.reader = new SplitReader<>(format, serializer, collector, checkpointLock); + this.reader.setReaderState(this.readerState); + this.reader.start(); + this.readerState = null; + } +
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1984#issuecomment-219673873 Overall, the code looks very good! I had some inline comments about Javadoc/comments. One thing that might be wrong, though is the interplay between `CheckpointableInputFormat.getCurrentChannelState()` and `SplitReader.getReaderState()`. In the latter to check whether the result of the former is null, and then do something based on this. The former, however will never return `null` but instead always returns a `Tuple2` that can have fields set to `null`. This might be an artifact form an earlier mode of implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63495352 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperatorextends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class); + + private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null); + + private transient SplitReader reader; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + + private Tuple3readerState; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = checkNotNull(format); + this.typeInfo = checkNotNull(typeInfo); + this.configuration = checkNotNull(configuration); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector<>(output); + + TypeSerializer serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + Object checkpointLock = getContainingTask().getCheckpointLock(); + + this.reader = new SplitReader<>(format, serializer, collector, checkpointLock); + this.reader.setReaderState(this.readerState); + this.reader.start(); + this.readerState = null; + }
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63494863 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperatorextends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class); + + private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null); + + private transient SplitReader reader; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + + private Tuple3readerState; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = checkNotNull(format); + this.typeInfo = checkNotNull(typeInfo); + this.configuration = checkNotNull(configuration); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector<>(output); + + TypeSerializer serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + Object checkpointLock = getContainingTask().getCheckpointLock(); + + this.reader = new SplitReader<>(format, serializer, collector, checkpointLock); + this.reader.setReaderState(this.readerState); + this.reader.start(); + this.readerState = null; + }
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63494540 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperatorextends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitReadOperator.class); + + private static final FileInputSplit EOF = new FileInputSplit(-1, null, -1, -1, null); + + private transient SplitReader reader; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; --- End diff -- This is a very subtle thing but not all `TypeInformation` are `Serializable` and none of them should be. This is a problem that we introduced a while back. The way to do it is to implement `OutputTypeConfigurable`, there the `TypeSerializer` can be created. In `open()` you should then ensure that you actually have a `TypeSerializer`. And yes, I know that no-one can really know this without having encountered a serialization problem once ... ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63493915 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +@Internal +public class FileSplitMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private List > splitsToFwdOrderedAscByModTime; + + private Tuple2 currentSplitsToFwd; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + /** +* This is the {@link Configuration} to be used to initialize the input format at the reader +* (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called, +* it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a +* configuration object at the constructor, which is shipped to the remote tasks. +* */ + private Configuration configuration; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, Configuration configuration, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(),
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63493233 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +@Internal +public class FileSplitMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private List > splitsToFwdOrderedAscByModTime; + + private Tuple2 currentSplitsToFwd; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + /** +* This is the {@link Configuration} to be used to initialize the input format at the reader +* (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called, +* it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a +* configuration object at the constructor, which is shipped to the remote tasks. +* */ + private Configuration configuration; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, Configuration configuration, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(),
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63492862 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +@Internal +public class FileSplitMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private List > splitsToFwdOrderedAscByModTime; + + private Tuple2 currentSplitsToFwd; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + /** +* This is the {@link Configuration} to be used to initialize the input format at the reader +* (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called, +* it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a +* configuration object at the constructor, which is shipped to the remote tasks. +* */ + private Configuration configuration; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, Configuration configuration, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(),
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63492517 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +@Internal +public class FileSplitMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private List > splitsToFwdOrderedAscByModTime; + + private Tuple2 currentSplitsToFwd; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + /** +* This is the {@link Configuration} to be used to initialize the input format at the reader +* (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called, +* it is passed a new configuration, thus ignoring potential user-specified parameters. Now, we pass a +* configuration object at the constructor, which is shipped to the remote tasks. +* */ + private Configuration configuration; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, Configuration configuration, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, configuration, FilePathFilter.DefaultFilter.getInstance(),
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63492326 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +@Internal +public class FileSplitMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private List > splitsToFwdOrderedAscByModTime; + + private Tuple2 currentSplitsToFwd; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + /** +* This is the {@link Configuration} to be used to initialize the input format at the reader +* (see {@link #open(Configuration)}). In the codebase, whenever {@link #open(Configuration)} is called, --- End diff -- This is only true for streaming programs, something like `In streaming programs, whenever ...` should do the trick. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63492168 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits --- End diff -- We could mention here that it is meant to work together with the `FileSplitReadOperator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63491931 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +/** + * An interface to be implemented by the user when using the {@link FileSplitMonitoringFunction}. + * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further + * processing or not. This can serve to exclude temporary or partial files that + * are still being written. + * + * + * A default implementation is the {@link DefaultFilter} which excludes files starting with ".", "_", or + * contain the "_COPYING_" in their names. This can be retrieved by {@link DefaultFilter#getInstance()}. + * */ +public interface FilePathFilter extends Serializable { + + /** +* @return {@code true} if the {@code filePath} given is to be --- End diff -- I think having a `@return` here is strange. normally a `@return` follows the main description body. Could just have `Returns {@code true} ...` here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63491422 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java --- @@ -115,7 +115,7 @@ public void testHDFS() { } Assert.assertTrue("No result file present", hdfs.exists(result)); - --- End diff -- Unrelated whitespace change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63491234 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -463,7 +463,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * every 100 milliseconds. * */ - def readFileStream(StreamPath: String, intervalMillis: Long = 100, --- End diff -- Unrelated change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63490526 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java --- @@ -394,4 +394,4 @@ public IntValue nextRecord(IntValue reuse) throws IOException { return null; } } -} \ No newline at end of file +} --- End diff -- Just whitespace changes in this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63490499 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -235,7 +235,7 @@ protected FileInputFormat(Path filePath) { // // Getters/setters for the configurable parameters // - + --- End diff -- Just whitespace changes in this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1984#discussion_r63490061 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java --- @@ -26,6 +26,11 @@ import org.apache.flink.core.memory.DataOutputView; @Public --- End diff -- `@Public` should go after Javadoc, IMHO. Also, the last line of the Javadoc has an extra `*`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1984#issuecomment-219117647 After the discussion we had today with @StephanEwen and @aljoscha , I also added the PROCESS_ONCE watchType which processes the current (when invoked) content of a file/directory and exits. This is to be able to accommodate bounded file sources (a la batch). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3889] Make File Monitoring Function che...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1984 [FLINK-3889] Make File Monitoring Function checkpointable. This pull request introduces the underlying functionality to make Streaming File Sources persistent. It does not yet change the API calls, as this will be done after agreeing on the current architecture and implementation. In addition, this PR includes a commit for FLINK-3896. This allows an operator to cancel its container task. The need for this functionality came during a discussion with @StephanEwen and @aljoscha and it is a separate commit. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink ft_files Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1984.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1984 commit 7deb92236cec47ddcfbb3abfa396fd9d15f770b7 Author: kl0uDate: 2016-05-10T16:56:58Z [FLINK-3896] Allow a StreamTask to be Externally Cancelled It adds a method failExternally() to the StreamTask, so that custom Operators can make their containing task fail when needed. commit c9682b7606451c4eecf6f2f6df9a498fb6d39577 Author: kl0u Date: 2016-04-10T14:56:42Z [FLINK-3717] Make FileInputFormat checkpointable This adds a new interface called CheckpointableInputFormat which describes input formats whose state is queryable, i.e. getCurrentChannelState() returns where the reader is in the underlying source, and they can resume reading from a user-specified position. This functionality is not yet leveraged by current readers. commit cbbfd8d7e6db0f8f114675b4047aecb94996e500 Author: kl0u Date: 2016-04-18T14:37:54Z [FLINK-3889][FLINK-3808] Refactor File Monitoring Source This is meant to replace the different file reading sources in Flink streaming. Now there is one monitoring source with DOP 1 monitoring a directory and assigning input split to downstream readers. In addition, it makes the new features added by FLINK-3717 and FLINK-3808 work together. Now we have a file monitoring source that is also fault tolerant and can guarantee exactly once semantics. This does not replace the old API calls. This will be done in a future commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---