[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-13 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/2871


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2871#discussion_r90639571
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -58,12 +63,14 @@
  */
 @Internal
 public class ContinuousFileMonitoringFunction
-   extends RichSourceFunction implements 
Checkpointed {
+   extends RichSourceFunction implements 
CheckpointedFunction {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
 
+   private static final String FILE_MONITORING_STATE_NAME = 
"file-monitoring-state";
--- End diff --

Instead of this you could also have
```
private static final ListStateDescriptor STATE_DESCRIPTOR =
new ListStateDescriptor<>("file-monitoring-state", 
LongSerializer.INSTANCE);
```

and then use this descriptor later directly instead of initialising with 
this field.

That's just a personal style nitpick. Your version is also fine. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2871#discussion_r90641264
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
 ---
@@ -52,12 +52,4 @@ public void fromCollectionTest() throws Exception {
Arrays.asList(1, 2, 3;
assertEquals(expectedList, actualList);
}
-
--- End diff --

I think it would be good to have a snapshot/restore test for this source, 
verifying that we see all the expected elements (no matter the order).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2871#discussion_r90641362
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
 ---
@@ -73,27 +76,31 @@
  * }
  * }
  * }
- * 
+ *
+ * NOTE: This source has a parallelism of {@code 1}.
+ *
  * @param  The type of the messages created by the source.
  * @param  The type of unique IDs which may be used to acknowledge 
elements.
  */
 @PublicEvolving
 public abstract class MessageAcknowledgingSourceBase
extends RichSourceFunction
-   implements Checkpointed, CheckpointListener 
{
+   implements CheckpointedFunction, CheckpointListener {
 
private static final long serialVersionUID = -8689291992192955579L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class);
 
+   private static final String MESSAGE_ACKNOWLEDGING_SOURCE_STATE = 
"message-acknowledging-source-state";
--- End diff --

See comments on the other functions, I'm not writing it again ... 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2871#discussion_r90643076
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -104,21 +114,63 @@ public ContinuousFileMonitoringFunction(
);
 
this.format = Preconditions.checkNotNull(format, "Unspecified 
File Input Format.");
-   this.path = Preconditions.checkNotNull(path, "Unspecified 
Path.");
+   this.path = 
Preconditions.checkNotNull(format.getFilePath().toString(), "Unspecified 
Path.");
 
this.interval = interval;
this.watchType = watchType;
this.readerParallelism = Math.max(readerParallelism, 1);
this.globalModificationTime = Long.MIN_VALUE;
}
 
+   public long getGlobalModificationTime() {
--- End diff --

This should probably have `@VisibleForTesting`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2871#discussion_r90639653
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 ---
@@ -62,6 +73,9 @@
/** Flag to make the source cancelable */
private volatile boolean isRunning = true;
 
+   private transient ListState checkpointedState;
+
+   private static final String FROM_ELEMENT_STATE_NAME = 
"from-element-state";
--- End diff --

Instead of this you could also have
```
private static final ListStateDescriptor STATE_DESCRIPTOR =
new ListStateDescriptor<>("from-elements-state", 
IntSerializer.INSTANCE);
```

and then use this descriptor later directly instead of initialising with 
this field.

That's just a personal style nitpick. Your version is also fine. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-12-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2871#discussion_r90641176
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 ---
@@ -18,25 +18,44 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
 
 /**
  * A stateful streaming source that emits each number from a given 
interval exactly once,
  * possibly in parallel.
+ * 
+ * For the source to be re-scalable, the first time the job is run, we 
precompute all the elements
+ * that each of the tasks should emit and upon checkpointing, each element 
constitutes its own
+ * partition. When rescaling, these partitions will be randomly 
re-assigned to the new tasks.
+ * 
+ * This strategy guarantees that each element will be emitted 
exactly-once, but elements will not
+ * necessarily be emitted in ascending order, even for the same tasks.
  */
 @PublicEvolving
-public class StatefulSequenceSource extends 
RichParallelSourceFunction implements Checkpointed {
+public class StatefulSequenceSource extends 
RichParallelSourceFunction implements CheckpointedFunction {

private static final long serialVersionUID = 1L;
 
private final long start;
private final long end;
 
-   private long collected;
-
private volatile boolean isRunning = true;
 
+   private transient Deque valuesToEmit;
+
+   private static final String STATEFUL_SOURCE_STATE = 
"stateful-source-state";
--- End diff --

Instead of this you could also have
```
private static final ListStateDescriptor STATE_DESCRIPTOR =
new ListStateDescriptor<>("stateful-source-state", 
LongSerializer.INSTANCE);
```

and then use this descriptor later directly instead of initialising with 
this field.

That's just a personal style nitpick. Your version is also fine. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2871: [FLINK-5163] Ports the production functions to the...

2016-11-25 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2871

[FLINK-5163] Ports the production functions to the new state abstraction.

This includes the following functions:
1) `StatefulSequenceSource`
2) `MessageAcknowledgingSourceBase`
3) `FromElementsFunction`
4) `ContinuousFileMonitoringFunction`

Each of them is a separate commit, for ease of reviewing. 
Most of the functions assume parallelism of 1. The only exception is the 
`StatefulSequenceSource`.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink dop1-source-rescaling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2871.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2871


commit 3a436c5ef7d0123f0f244700f8a62bb325fac118
Author: kl0u 
Date:   2016-11-17T13:54:08Z

[FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state 
abstractions.

commit a49fce791553c94dc866140c7616f52354d788fc
Author: kl0u 
Date:   2016-11-17T15:52:50Z

[FLINK-5163] Port the FromElementsFunction to the new state abstractions.

commit ecee88819bc7be09681047fc0f8c2e347ddbdd06
Author: kl0u 
Date:   2016-11-18T15:07:45Z

[FLINK-5163] Port the MessageAcknowledgingSourceBase to the new state 
abstractions.

commit 1dde7c7d7b978cfac086ee93eb775069763b7788
Author: kl0u 
Date:   2016-11-21T17:50:30Z

[FLINK-5163] Port the StatefulSequenceSource to the new state abstractions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---