[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/2871 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2871#discussion_r90639571 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -58,12 +63,14 @@ */ @Internal public class ContinuousFileMonitoringFunction - extends RichSourceFunction implements Checkpointed { + extends RichSourceFunction implements CheckpointedFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); + private static final String FILE_MONITORING_STATE_NAME = "file-monitoring-state"; --- End diff -- Instead of this you could also have ``` private static final ListStateDescriptor STATE_DESCRIPTOR = new ListStateDescriptor<>("file-monitoring-state", LongSerializer.INSTANCE); ``` and then use this descriptor later directly instead of initialising with this field. That's just a personal style nitpick. Your version is also fine. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2871#discussion_r90641264 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java --- @@ -52,12 +52,4 @@ public void fromCollectionTest() throws Exception { Arrays.asList(1, 2, 3; assertEquals(expectedList, actualList); } - --- End diff -- I think it would be good to have a snapshot/restore test for this source, verifying that we see all the expected elements (no matter the order). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2871#discussion_r90641362 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java --- @@ -73,27 +76,31 @@ * } * } * } - * + * + * NOTE: This source has a parallelism of {@code 1}. + * * @param The type of the messages created by the source. * @param The type of unique IDs which may be used to acknowledge elements. */ @PublicEvolving public abstract class MessageAcknowledgingSourceBaseextends RichSourceFunction - implements Checkpointed , CheckpointListener { + implements CheckpointedFunction, CheckpointListener { private static final long serialVersionUID = -8689291992192955579L; private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class); + private static final String MESSAGE_ACKNOWLEDGING_SOURCE_STATE = "message-acknowledging-source-state"; --- End diff -- See comments on the other functions, I'm not writing it again ... ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2871#discussion_r90643076 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -104,21 +114,63 @@ public ContinuousFileMonitoringFunction( ); this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format."); - this.path = Preconditions.checkNotNull(path, "Unspecified Path."); + this.path = Preconditions.checkNotNull(format.getFilePath().toString(), "Unspecified Path."); this.interval = interval; this.watchType = watchType; this.readerParallelism = Math.max(readerParallelism, 1); this.globalModificationTime = Long.MIN_VALUE; } + public long getGlobalModificationTime() { --- End diff -- This should probably have `@VisibleForTesting`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2871#discussion_r90639653 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java --- @@ -62,6 +73,9 @@ /** Flag to make the source cancelable */ private volatile boolean isRunning = true; + private transient ListState checkpointedState; + + private static final String FROM_ELEMENT_STATE_NAME = "from-element-state"; --- End diff -- Instead of this you could also have ``` private static final ListStateDescriptor STATE_DESCRIPTOR = new ListStateDescriptor<>("from-elements-state", IntSerializer.INSTANCE); ``` and then use this descriptor later directly instead of initialising with this field. That's just a personal style nitpick. Your version is also fine. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2871#discussion_r90641176 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java --- @@ -18,25 +18,44 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayDeque; +import java.util.Deque; /** * A stateful streaming source that emits each number from a given interval exactly once, * possibly in parallel. + * + * For the source to be re-scalable, the first time the job is run, we precompute all the elements + * that each of the tasks should emit and upon checkpointing, each element constitutes its own + * partition. When rescaling, these partitions will be randomly re-assigned to the new tasks. + * + * This strategy guarantees that each element will be emitted exactly-once, but elements will not + * necessarily be emitted in ascending order, even for the same tasks. */ @PublicEvolving -public class StatefulSequenceSource extends RichParallelSourceFunction implements Checkpointed { +public class StatefulSequenceSource extends RichParallelSourceFunction implements CheckpointedFunction { private static final long serialVersionUID = 1L; private final long start; private final long end; - private long collected; - private volatile boolean isRunning = true; + private transient Deque valuesToEmit; + + private static final String STATEFUL_SOURCE_STATE = "stateful-source-state"; --- End diff -- Instead of this you could also have ``` private static final ListStateDescriptor STATE_DESCRIPTOR = new ListStateDescriptor<>("stateful-source-state", LongSerializer.INSTANCE); ``` and then use this descriptor later directly instead of initialising with this field. That's just a personal style nitpick. Your version is also fine. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2871 [FLINK-5163] Ports the production functions to the new state abstraction. This includes the following functions: 1) `StatefulSequenceSource` 2) `MessageAcknowledgingSourceBase` 3) `FromElementsFunction` 4) `ContinuousFileMonitoringFunction` Each of them is a separate commit, for ease of reviewing. Most of the functions assume parallelism of 1. The only exception is the `StatefulSequenceSource`. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink dop1-source-rescaling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2871.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2871 commit 3a436c5ef7d0123f0f244700f8a62bb325fac118 Author: kl0uDate: 2016-11-17T13:54:08Z [FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state abstractions. commit a49fce791553c94dc866140c7616f52354d788fc Author: kl0u Date: 2016-11-17T15:52:50Z [FLINK-5163] Port the FromElementsFunction to the new state abstractions. commit ecee88819bc7be09681047fc0f8c2e347ddbdd06 Author: kl0u Date: 2016-11-18T15:07:45Z [FLINK-5163] Port the MessageAcknowledgingSourceBase to the new state abstractions. commit 1dde7c7d7b978cfac086ee93eb775069763b7788 Author: kl0u Date: 2016-11-21T17:50:30Z [FLINK-5163] Port the StatefulSequenceSource to the new state abstractions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---