[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...

2018-05-29 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/6097
  
+1 Nice!


---


[GitHub] flink issue #6086: [FLINK-9452] Flink 1.5 document version title shows snaps...

2018-05-28 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/6086
  
@zentol I think the baseurl variable also needs to be changed -- for a 
stable release, it should not point to master.


---


[GitHub] flink pull request #6076: [hotfix][docs] Specify operators behaviour on proc...

2018-05-25 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/6076#discussion_r190841926
  
--- Diff: docs/dev/event_time.md ---
@@ -219,4 +219,17 @@ with late elements in event time windows.
 Please refer to the [Debugging Windows & Event Time]({{ site.baseurl 
}}/monitoring/debugging_event_time.html) section for debugging
 watermarks at runtime.
 
+## How operators are processing watermarks
+
+General rule is that operator are required to completely process a given 
watermark before forwarding it downstream. For example,
+`WindowOperator` will first evaluate which windows should be fired and 
only after producing all of the output triggered by
+the watermark, the watermark itself will be handled downstream. In other 
words, all elements produced due to occurrence of
+the watermark will be emitted before such watermark.
+
+Same rule applies to `TwoInputStreamOperator`. However in this case 
current watermark of the operator is defined as a minimum
+of both of it's inputs.
+
+Details of this behaviour is defined by implementations of methods 
`OneInputStreamOperator.processWatermark`,
+`TwoInputStreamOperator.processWatermark1` and 
`TwoInputStreamOperator.processWatermark2`.
+
--- End diff --

I offer some grammatical improvements. Also, is it correct to describe 
"operators are required to completely process a given watermark before 
forwarding it downstream" as a general rule, meaning that it might have 
exceptions, or should we simply say "operators are required ..." without adding 
this caveat?

I changed behaviour to behavior because most of the docs seem to be using 
American spellings rather than English ones, but I'm not sure if we have a 
policy regarding this.


---


[GitHub] flink pull request #6076: [hotfix][docs] Specify operators behaviour on proc...

2018-05-25 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/6076#discussion_r190841061
  
--- Diff: docs/dev/event_time.md ---
@@ -219,4 +219,17 @@ with late elements in event time windows.
 Please refer to the [Debugging Windows & Event Time]({{ site.baseurl 
}}/monitoring/debugging_event_time.html) section for debugging
 watermarks at runtime.
 
+## How operators are processing watermarks
+
+General rule is that operator are required to completely process a given 
watermark before forwarding it downstream. For example,
+`WindowOperator` will first evaluate which windows should be fired and 
only after producing all of the output triggered by
+the watermark, the watermark itself will be handled downstream. In other 
words, all elements produced due to occurrence of
+the watermark will be emitted before such watermark.
+
+Same rule applies to `TwoInputStreamOperator`. However in this case 
current watermark of the operator is defined as a minimum
+of both of it's inputs.
+
+Details of this behaviour is defined by implementations of methods 
`OneInputStreamOperator.processWatermark`,
+`TwoInputStreamOperator.processWatermark1` and 
`TwoInputStreamOperator.processWatermark2`.
+
--- End diff --

As a general rule, operators are required to completely process a given 
watermark before forwarding it downstream. For example,
`WindowOperator` will first evaluate which windows should be fired, and 
only after producing all of the output triggered by
the watermark will the watermark itself be sent downstream. In other words, 
all elements produced due to occurrence of a watermark will be emitted before 
the watermark.

The same rule applies to `TwoInputStreamOperator`. However, in this case 
the current watermark of the operator is defined as the minimum
of both of its inputs.

The details of this behavior are defined by the implementations of the 
`OneInputStreamOperator.processWatermark`,
`TwoInputStreamOperator.processWatermark1` and 
`TwoInputStreamOperator.processWatermark2` methods.


---


[GitHub] flink pull request #6010: [FLINK-9359][docs] Update quickstart docs to only ...

2018-05-14 Thread alpinegizmo
GitHub user alpinegizmo opened a pull request:

https://github.com/apache/flink/pull/6010

[FLINK-9359][docs] Update quickstart docs to only mention Java 8

This is a trivial but important fix to the quickstart setup page in the 
docs. This fix should be made to the 1.4, 1.5, and 1.6 docs so that folks 
aren't confused when Java 7, 9, and 10 all fail to work.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink 9359-quickstart-docs-java8

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6010.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6010


commit df430cbdc6e9e30edcba791eadc34980b6fe7d3d
Author: David Anderson <david@...>
Date:   2018-05-14T11:57:39Z

[FLINK-9359][docs] Update quickstart docs to only mention Java 8




---


[GitHub] flink issue #5976: [FLINK-9309] Recommend HA setup on Production Readiness C...

2018-05-09 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5976
  
+1


---


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread alpinegizmo
GitHub user alpinegizmo reopened a pull request:

https://github.com/apache/flink/pull/5949

[FLINK-9288][docs] clarify the event time / watermark docs

This PR only affects the documentation (for event time and watermarks). I 
wanted to make a couple of things clearer, and to provide a couple of 
additional internal links.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink event-time-watermarks-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5949


commit a5c66a56845ada7b0a20471a4842175c5a6566d6
Author: David Anderson <david@...>
Date:   2018-05-02T11:50:48Z

[FLINK-9288][docs] clarify the event time / watermark docs

commit a100cab6fec6ab3affa4ecc13c46e0081bd19b62
Author: David Anderson <david@...>
Date:   2018-05-03T05:12:12Z

Reworked the section on event time to be less absolutist.




---


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread alpinegizmo
Github user alpinegizmo closed the pull request at:

https://github.com/apache/flink/pull/5949


---


[GitHub] flink issue #5949: [FLINK-9288][docs] clarify the event time / watermark doc...

2018-05-02 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5949
  
@bowenli86 Thanks for the feedback. I've reworked that event time section. 
Hopefully it's now more complete and accurate without being too complex.


---


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread alpinegizmo
GitHub user alpinegizmo opened a pull request:

https://github.com/apache/flink/pull/5949

[FLINK-9288][docs] clarify the event time / watermark docs

This PR only affects the documentation (for event time and watermarks). I 
wanted to make a couple of things clearer, and to provide a couple of 
additional internal links.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink event-time-watermarks-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5949


commit a5c66a56845ada7b0a20471a4842175c5a6566d6
Author: David Anderson <david@...>
Date:   2018-05-02T11:50:48Z

[FLINK-9288][docs] clarify the event time / watermark docs




---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184970245
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to streams whose elements are *broadcasted*,
+ 3. the only operation available to a stream with broadcast state is to be 
*connected* to another keyed or non-keyed stream,
+ 4. such a broadcast stream can have *multiple broadcast states* with 
different names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolve over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPart

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184969214
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to streams whose elements are *broadcasted*,
+ 3. the only operation available to a stream with broadcast state is to be 
*connected* to another keyed or non-keyed stream,
+ 4. such a broadcast stream can have *multiple broadcast states* with 
different names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolve over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPart

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184968333
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to streams whose elements are *broadcasted*,
+ 3. the only operation available to a stream with broadcast state is to be 
*connected* to another keyed or non-keyed stream,
+ 4. such a broadcast stream can have *multiple broadcast states* with 
different names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolve over time. 
--- End diff --

the set of interesting patterns evolves over time.


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184969839
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to streams whose elements are *broadcasted*,
+ 3. the only operation available to a stream with broadcast state is to be 
*connected* to another keyed or non-keyed stream,
+ 4. such a broadcast stream can have *multiple broadcast states* with 
different names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolve over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPart

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184971409
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
--- End diff --

I find this intro paragraph confusing. Would it be correct to rewrite it 
like this?

[Working with State](state.html) describes operator state which **upon 
restore** is either **evenly** distributed among the parallel tasks of an 
operator, or **unioned**, with the whole state being used to initialize the 
restored parallel tasks.


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184971516
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
--- End diff --

use cases (no hyphen)


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r184969718
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to streams whose elements are *broadcasted*,
+ 3. the only operation available to a stream with broadcast state is to be 
*connected* to another keyed or non-keyed stream,
+ 4. such a broadcast stream can have *multiple broadcast states* with 
different names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolve over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPart

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184962653
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,539 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
--- End diff --

Flink’s Table & SQL API makes it possible to work with queries written in 
the SQL language, but these queries need to be embedded within a table program 
that is written in either Java or Scala. Moreover, these programs need to be 
packaged with a build tool before being submitted to a cluster. This more or 
less limits the usage of Flink to Java/Scala programmers.


---


[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5887#discussion_r184960782
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -271,16 +271,39 @@ override def onTimer(timestamp: Long, ctx: 
OnTimerContext, out: Collector[OUT]):
 
 
 
-## Optimizations
+## Timers
 
-### Timer Coalescing
+Every timer registered via `registerEventTimeTimer()` or 
`registerProcessingTimeTimer()` will be stored on `TimerService`
+and enqueued for execution.
 
-Every timer registered at the `TimerService` via 
`registerEventTimeTimer()` or
-`registerProcessingTimeTimer()` will be stored on the Java heap and 
enqueued for execution. There is,
-however, a maximum of one timer per key and timestamp at a millisecond 
resolution and thus, in the
-worst case, every key may have a timer for each upcoming millisecond. Even 
if you do not do any
-processing for outdated timers in `onTimer`, this may put a significant 
burden on the
-Flink runtime.
+Invocations of `onTimer()` and `processElement()` are always synchronized, 
so that users don't have to worry about
+concurrent modification of state.
+
+Note that there is a maximum of one timer per key and timestamp at a 
millisecond resolution and thus, in the worst case,
+every key may have a timer for each upcoming millisecond. Even if you do 
not do any processing for outdated timers in `onTimer`,
+this may put a significant burden on the Flink runtime.
+
+### Fault Tolerance
+
+Timers registered within `ProcessFunction` are fault tolerant. They are 
synchronously checkpointed by Flink, regardless of
+configurations of state backends. (Therefore, a large number of timers can 
significantly increase checkpointing time. See optimizations
+section for advice to reduce the number of timers.)
--- End diff --

See the optimizations section for advice on how to reduce the number of 
timers.


---


[GitHub] flink issue #5924: [hotfix][README.md] Update building prerequisites

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5924
  
+1

I frequently see newcomers to Flink getting hung up on this.


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-04-30 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r184959202
  
--- Diff: docs/dev/stream/state/checkpointing.md ---
@@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
-  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
 
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
+- `state.checkpoints.dir`: The target directory for storing checkpoints 
data files and meta data of [externalized checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported 
filesystem. Note: State backend must be accessible from the JobManager, use 
`file://` only for local setups.
--- End diff --

This seems potentially misleading -- isn't it okay to use a `file://` URI 
in the case of a distributed filesystem that is mounted at the same mount point 
across the cluster?


---


[GitHub] flink issue #5785: [FLINK-9108][docs] Fix invalid link

2018-03-30 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5785
  
+1


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
These lines need to be restored to the Gemfile. The hawkins plugin is 
needed for the incremental build  and live reload feature

group :jekyll_plugins do
  gem 'hawkins'
end

The bundled version of jekyll (3.7.2) requires ruby >= 2.1, but our build 
machines use ruby 2.0. If we can't get the apache INFRA team to upgrade ruby, 
we'll have to rework this.

The Gemfile.lock doesn't work with ruby 2.1 -- I get this error

ruby_dep-1.5.0 requires ruby version >= 2.2.5, which is incompatible 
with the current version, ruby 2.1.10p492

but re-bundling fixes this. I think we should commit a Gemfile.lock file 
that is compatible back to Ruby 2.1, if we determine we can get ruby 2.1 -- 
otherwise we'll have to roll back jekyll and then re-bundle with ruby 2.0.

This PR works fine on ruby 2.3 and 2.4. The latest stable release of rvm 
doesn't yet support ruby 2.5, so I didn't test it.




---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
Sure.


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
It looks good, but I haven't tested it. I'm wondering what versions of Ruby 
this has been tested with. At a minimum it needs to work with whatever version 
we can get on the production build infrastructure, as well as 2.3 and 2.4, 
since most developers will have one of those versions. And maybe 2.5, since 
that's out now. 


---


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-01-31 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
It would be lovely to ditch ruby 1.9. However, if I understand correctly, 
we don't fully control the environment that builds the docs, and the last time 
we tried this we couldn't get a newer ruby there. But it has been a while, and 
maybe the ASF infrastructure folks have seen the light. @rmetzger Can we 
somehow determine what ruby version is running on the build machines? 


---


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-22 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r16290
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
--- End diff --

This needs some clarification. Is 
taskmanager.network.memory.buffers-per-channel only used in credit-based flow 
control mode? Does choosing a value greater than 2 provide any benefit?


---


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-22 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r162906372
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
+- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra 
network buffers to use for each outgoing/incoming gate (result partition/input 
gate). In credit-based flow control mode, it indicates how many floating 
credits are shared for all the input channels. The floating buffers are 
distributed based on backlog (real-time output buffers in the subpartition) 
feedback. So the floating buffers can help relief back-pressure caused by 
imbalance data distribution among subpartitions.
+
--- End diff --

Number of extra network buffers used by each outgoing/incoming gate (result 
partition/input gate). In credit-based flow control mode, this indicates how 
many floating credits are shared among all the input channels. The floating 
buffers are distributed based on backlog (real-time output buffers in the 
subpartition) feedback, and can help relieve back-pressure caused by unbalanced 
data distribution among the subpartitions.


---


[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...

2018-01-22 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5277#discussion_r162902645
  
--- Diff: docs/concepts/runtime.md ---
@@ -46,19 +46,23 @@ The Flink runtime consists of two types of processes:
   - The **JobManagers** (also called *masters*) coordinate the distributed 
execution. They schedule tasks, coordinate
 checkpoints, coordinate recovery on failures, etc.
 
-There is always at least one Job Manager. A high-availability setup 
will have multiple JobManagers, one of
-which one is always the *leader*, and the others are *standby*.
+There is always at least one Job Manager. A high-availability setup 
should have multiple JobManagers, one of
--- End diff --

"A high-availability setup should have multiple JobManagers" is, in 
general, not true -- this is a detail that depends on the underlying cluster 
management framework. 

I suggest reworking as follows:

There is always at least one Job Manager, but some [high-availability 
setups]({{ site.baseurl }}/ops/jobmanager_high_availability.html) will have 
multiple JobManagers.


---


[GitHub] flink issue #5129: [Hotfix for State doc in 1.4 and 1.5] update State's doc ...

2017-12-11 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5129
  
+1


---


[GitHub] flink issue #5119: [FLINK-6590] Integrate automatic docs generation

2017-12-05 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5119
  
Very pleased to see this coming to fruition. 


---


[GitHub] flink pull request #5075: [hotfix] [docs] Fix typos in State Backends doc

2017-11-27 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5075#discussion_r153149948
  
--- Diff: docs/ops/state/state_backends.md ---
@@ -110,10 +110,10 @@ Limitations of the RocksDBStateBackend:
 
 The RocksDBStateBackend is encouraged for:
 
-  - Jobs with very large state, long windows, large key/value states.
+  - Jobs with a very large state, long windows, large key/value states.
--- End diff --

This change is ungrammatical, but the rest are good.


---


[GitHub] flink issue #5039: [hotfix][docs] Fix some typos in the documentation.

2017-11-21 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5039
  
lgtm


---


[GitHub] flink pull request #4992: [FLINK-6163] Document per-window state in ProcessW...

2017-11-10 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4992#discussion_r150194036
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -978,6 +978,24 @@ input
 
 
 
+### Using per-window state in ProcessWindowFunction
+
+In addition to accessing keyed state (as any rich function can) a 
`ProcessWindowFunction` can
+also use keyed state that is scoped to the window that the function is 
currently processing. The are
--- End diff --

The are --> There are


---


[GitHub] flink issue #4869: [FLINK-7843][docs] Improve documentation for metrics

2017-10-25 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4869
  
I can't vouch for the accuracy of the info, but +1 for adding this level of 
detail.


---


[GitHub] flink pull request #4833: [FLINK-5968] Add documentation for WindowedStream....

2017-10-16 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r144842788
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -721,6 +808,111 @@ input
 
 
 
+ Incremental Window Aggregation with AggregateFunction
+
+The following example shows how an incremental `AggregateFunction` can be 
combined with
+a `ProcesWindowFunction` to compute the average and also emit the key and 
window along with
+the average.
+
+
+
+{% highlight java %}
+DataStream<Tuple2<String, Long> input = ...;
+
+input
+  .keyBy()
+  .timeWindow()
+  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code 
getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, 
Double> {
+  @Override
+  public Tuple2<Long, Long> createAccumulator() {
+return new Tuple2<>(0L, 0L);
+  }
+
+  @Override
+  public Tuple2<Long, Long> add(
+Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+  }
+
+  @Override
+  public Double getResult(Tuple2<Long, Long> accumulator) {
+return accumulator.f0 / accumulator.f1;
+  }
+
+  @Override
+  public Tuple2<Long, Long> merge(
+Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
+return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
+  }
+}
+
+private static class MyProcessWindowFunction
+implements ProcessWindowFunction<Double, Tuple2<String, Double>, 
String, TimeWindow> {
+
+  public void apply(String key,
+Context context,
+Iterable averages,
+Collector<Tuple2<String, Double>> out) {
+  Double average = averags.iterator().next();
--- End diff --

should be averages, not averags


---


[GitHub] flink issue #4816: [hotfix][docs] CEP docs review to remove weasel words, fi...

2017-10-12 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4816
  
+1


---


[GitHub] flink issue #4756: [FLINK-7744][docs] Add missing top links to documentation

2017-10-07 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4756
  
Yes, looks good to me. +1


---


[GitHub] flink pull request #4760: [hotfix][docs] Polish docs index page for consiste...

2017-10-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4760#discussion_r142630633
  
--- Diff: docs/index.md ---
@@ -25,27 +25,26 @@ under the License.
 
 
 
-This documentation is for Apache Flink version {{ site.version_title }}. 
These pages have been built at: {% build_time %}.
+This documentation is for Apache Flink version {{ site.version_title }}. 
These pages were built at: {% build_time %}.
 
-Apache Flink is an open source platform for distributed stream and batch 
data processing. Flink’s core is a streaming dataflow engine that provides 
data distribution, communication, and fault tolerance for distributed 
computations over data streams. Flink also builds batch processing on top of 
the streaming engine, overlaying native iteration support, managed memory, and 
program optimization.
+Apache Flink is an open source platform for distributed stream and batch 
data processing. Flink’s core is a streaming dataflow engine that provides 
data distribution, communication, and fault tolerance for distributed 
computations over data streams. Flink builds batch processing on top of the 
streaming engine, overlaying native iteration support, managed memory, and 
program optimization.
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow 
Programming Model](concepts/programming-model.html) and [Distributed Runtime 
Environment](concepts/runtime.html). This will help you to fully understand the 
other parts of the documentation, including the setup and programming guides. 
It is highly recommended to read these sections first.
+- **Concepts**: Start with the basic concepts of Flink's [Dataflow 
Programming Model](concepts/programming-model.html) and [Distributed Runtime 
Environment](concepts/runtime.html). This will help you understand other parts 
of the documentation, including the setup and programming guides. We 
recommended you read these sections first.
 
 - **Quickstarts**: [Run an example 
program](quickstart/setup_quickstart.html) on your local machine or [study some 
examples](examples/index.html).
 
-- **Programming Guides**: You can check out our guides about [basic API 
concepts](dev/api_concepts.html) and the [DataStream 
API](dev/datastream_api.html) or [DataSet API](dev/batch/index.html) to learn 
how to write your first Flink programs.
+- **Programming Guides**: You can read our guides about [basic API 
concepts](dev/api_concepts.html) and the [DataStream 
API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to 
learn how to write your first Flink programs.
 
 ## Deployment
 
-Before putting your Flink job into production, be sure to read the 
[Production Readiness Checklist](ops/production_ready.html).
+Before putting your Flink job into production, read the [Production 
Readiness Checklist](ops/production_ready.html).
 
 ## Migration Guide
 
-For users of earlier versions of Apache Flink we recommend the [API 
migration guide](dev/migration.html).
-While all parts of the API that were marked as public and stable are still 
supported (the public API is backwards compatible), we suggest migrating 
applications to the
-newer interfaces where applicable.
+For users of earlier versions of Apache Flink, we recommend the [API 
migration guide](dev/migration.html).
+While all parts of the API marked as public and stable are still supported 
(the public API is backwards compatible), we suggest migrating applications to 
the newer interfaces where applicable.
--- End diff --

I think "were marked" better communicates the intent here, as it is saying 
that the parts of the API that were marked as public and stable in previous 
releases are still supported (regardless of how they are currently marked).

Otherwise, +1.


---


[GitHub] flink pull request #4687: [hotfix][docs][CEP] wrong method name for PatternF...

2017-09-20 Thread alpinegizmo
GitHub user alpinegizmo opened a pull request:

https://github.com/apache/flink/pull/4687

[hotfix][docs][CEP] wrong method name for PatternFlatSelectFunction

The name of the single abstract method in the PatternFlatSelectFunction 
interface is flatSelect, not select.

This PR is just a trivial change to the docs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink PatternFlatSelectFunction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4687.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4687


commit bf5bcf4b980f168fc01652df6d0bd0e0fe48a8ff
Author: David Anderson <da...@alpinegizmo.com>
Date:   2017-09-20T13:57:40Z

[hotfix][docs] method name for PatternFlatSelectFunction should be 
flatSelect




---


[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139691882
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687800
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139685466
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139690107
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139686793
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139692206
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139691118
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139685795
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687153
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687994
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139684257
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
--- End diff --

on top of the normal load from the pipeline’s data processing work.

(add "the")


---


[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139689544
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139691558
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139690574
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139687055
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139689818
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-09-19 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r139690806
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first checkpoint

[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation

2017-09-05 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4634#discussion_r137000137
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -663,25 +623,42 @@ input
 .keyBy()
 .window()
 .process(new MyProcessWindowFunction())
+
+/* ... */
+
+class MyWindowFunction extends ProcessWindowFunction[(String, Long), 
String, String, TimeWindow] {
+
+  def apply(key: String, context: Context, input: Iterable[(String, 
Long)], out: Collector[String]): () = {
+var count = 0L
+for (in <- input) {
+  count = count + 1
+}
+out.collect(s"Window ${context.window} count: $count")
+  }
+}
 {% endhighlight %}
 
 
 
-### WindowFunction with Incremental Aggregation
+The example shows a `ProcessWindowFunction` that counts the elements in a 
window. In addition, the window function adds information about the window to 
the output.
+
+Attention Note that using 
`ProcessWindowFunction` for simple aggregates such as count is quite 
inefficient. The next section shows how a `ReduceFunction` can be combined with 
a `ProcessWindowFunction` to get both incremental aggregation and the added 
information of a `ProcessWindowFunction`.
 
-A `WindowFunction` can be combined with either a `ReduceFunction` or a 
`FoldFunction` to
+### ProcessWindowFunction with Incremental Aggregation
+
+A `ProcessWindowFunction` can be combined with either a `ReduceFunction` 
or a `FoldFunction` to
--- End diff --

Sure, that sounds fine.


---


[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation

2017-09-05 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4634#discussion_r136973577
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -460,118 +465,14 @@ The above example appends all input `Long` values to 
an initially empty `String`
 
 Attention `fold()` cannot be used 
with session windows or other mergeable windows.
 
-### WindowFunction - The Generic Case
+### ProcessWindowFunction
 
-A `WindowFunction` gets an `Iterable` containing all the elements of the 
window and provides
+A `ProcessWindowFunction` gets an `Iterable` containing all the elements 
of the window and provides
 the most flexibility of all window functions. This comes
--- End diff --

A `ProcessWindowFunction` gets an `Iterable` containing all the elements of 
the window, and a `Context` object with access to time and state information, 
which enables it to provide more flexibility than other window functions.


---


[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation

2017-09-05 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4634#discussion_r136972599
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -111,6 +111,11 @@ windows) assign elements to windows based on time, 
which can either be processin
 time. Please take a look at our section on [event time]({{ site.baseurl 
}}/dev/event_time.html) to learn
 about the difference between processing time and event time and how 
timestamps and watermarks are generated.
 
+Time-based windows have a *start timestamp* (inclusive) and an *end 
timestamp* (exclusive)
+that together describe the size of the window. In code, Flink uses 
`TimeWindow` when working with
+time-based windows, this has methods for querying the start- and 
end-timestamp and also an
--- End diff --

"this has methods" => "which has methods"


---


[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation

2017-09-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4634#discussion_r136843813
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -663,25 +623,42 @@ input
 .keyBy()
 .window()
 .process(new MyProcessWindowFunction())
+
+/* ... */
+
+class MyWindowFunction extends ProcessWindowFunction[(String, Long), 
String, String, TimeWindow] {
+
+  def apply(key: String, context: Context, input: Iterable[(String, 
Long)], out: Collector[String]): () = {
+var count = 0L
+for (in <- input) {
+  count = count + 1
+}
+out.collect(s"Window ${context.window} count: $count")
+  }
+}
 {% endhighlight %}
 
 
 
-### WindowFunction with Incremental Aggregation
+The example shows a `ProcessWindowFunction` that counts the elements in a 
window. In addition, the window function adds information about the window to 
the output.
+
+Attention Note that using 
`ProcessWindowFunction` for simple aggregates such as count is quite 
inefficient. The next section shows how a `ReduceFunction` can be combined with 
a `ProcessWindowFunction` to get both incremental aggregation and the added 
information of a `ProcessWindowFunction`.
 
-A `WindowFunction` can be combined with either a `ReduceFunction` or a 
`FoldFunction` to
+### ProcessWindowFunction with Incremental Aggregation
+
+A `ProcessWindowFunction` can be combined with either a `ReduceFunction` 
or a `FoldFunction` to
--- End diff --

Do we want to still mention FoldFunction here without mentioning it has 
been deprecated? Maybe we should talk about AggregateFunction instead.


---


[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation

2017-09-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4634#discussion_r136845008
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -490,15 +490,35 @@ public abstract class ProcessWindowFunction<IN, OUT, 
KEY, W extends Window> impl
 Iterable elements,
 Collector out) throws Exception;
 
-/**
- * The context holding window metadata
- */
-public abstract class Context {
-/**
- * @return The window that is being evaluated.
- */
-public abstract W window();
-}
+   /**
+* The context holding window metadata.
+*/
+   public abstract class Context implements java.io.Serializable {
+   /**
+* Returns the window that is being evaluated.
+*/
+   public abstract W window();
+   
+   /** Returns the current processing time. */
+   public abstract long currentProcessingTime();
+   
+   /** Returns the current event-time watermark. */
+   public abstract long currentWatermark();
+   
+   /**
+* State accessor for per-key and per-window state.
+*
+* NOTE:If you use per-window state you have to 
ensure that you clean it up
+* by implementing {@link 
ProcessWindowFunction#clear(Context)}.
+*/
+   public abstract KeyedStateStore windowState();
+   
+   /**
+* State accessor for per-key global state.
+*/
+   public abstract KeyedStateStore globalState();
+   }
--- End diff --

The difference between per-key per-window state and per-key global state 
deserves more explanation. It would be even better to sketch out scenarios of 
what one might do with these accessors. There's a fair bit of power being 
exposed here, and that's not at all obvious.


---


[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...

2017-08-16 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4553
  
+1 I like these improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4480: [FLINK-6995] [docs] Enable is_latest attribute to false

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4480
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131354369
  
--- Diff: docs/dev/testing.md ---
@@ -0,0 +1,189 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-id: testing
+nav-pos: 99
+---
+
+
+This page briefly discusses how to test Flink application in the local 
environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. 
For example if one implement following `ReduceFunction`:
+
+~~~java
+public class SumReduce implements ReduceFunction {
+@Override
+public Long reduce(Long value1, Long value2) throws Exception {
+return value1 + value2;
+}
+}
+~~~
+
+it is very easy to unit test it with your favorite framework:
+
+~~~java
+public class SumReduceTest {
+@Test
+public void testSum() throws Exception {
+SumReduce sumReduce = new SumReduce();
+
+assertEquals(42L, sumReduce.reduce(40L, 2L));
+}
+}
+~~~
+
+Or in scala:
+
+~~~scala
+class SumReduce extends ReduceFunction[Long] {
+override def reduce(value1: java.lang.Long,
+value2: java.lang.Long): java.lang.Long = value1 + 
value2
+}
+~~~
+
+~~~scala
+class SumReduceTest extends FlatSpec with Matchers {
+"SumReduce" should "add values" in {
+val sumReduce: SumReduce = new SumReduce()
+sumReduce.reduce(40L, 2L) should be (42L)
+}
+}
+~~~
+
+## Integration testing
+
+You also can write integration tests that are executed against local Flink 
mini cluster.
+In order to do so add a test dependency `flink-test-utils`.
+
+~~~ xml
+
+  org.apache.flink
+  flink-test-utils{{site.scala_version_suffix}}
+  {{site.version}}
+
+~~~
--- End diff --

To get this to work in my outside-of-flink project, I had to modify this as:


  org.apache.flink
  
flink-test-utils_{{site.scala_version_suffix}}
  ${flink.version}
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131352677
  
--- Diff: docs/dev/testing.md ---
@@ -90,13 +121,69 @@ public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
 public static final List values = new ArrayList<>();
 
 @Override
-public void invoke(Long value) throws Exception {
+public synchronized void invoke(Long value) throws Exception {
 values.add(value);
 }
 }
 }
 ~~~
 
-Static variable in `CollectSink` is required because Flink serializes all 
operators before distributing them across a cluster.
+or in Scala:
+
+~~~scala
+class MultiplyByTwo extends MapFunction[Long, Long] {
+  override def map(value: java.lang.Long): java.lang.Long = value * 2
+}
+~~~
+
+~~~scala
+class ExampleIntegrationTest extends FlatSpec with Matchers {
+"MultiplyByTwo" should "multiply it input by two" in {
+val env: StreamExecutionEnvironment =
+StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+// values are collected on a static variable
+CollectSink.values.clear()
+env
+.fromElements(1L, 21L, 22L)
+.map(new MultiplyByTwo())
+.addSink(new CollectSink())
+env.execute()
+CollectSink.values should be (Lists.newArrayList(2L, 42L, 44L))
+}
+}
+
+object CollectSink {
+// must be static
+val values: List[Long] = new ArrayList()
+}
+
+class CollectSink extends SinkFunction[Long] {
+override def invoke(value: java.lang.Long): Unit = {
+synchronized {
+values.add(value)
+}
+}
+}
+~~~
+
+Static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
+Communicating with operators instantiated by a local flink mini cluster 
via static variables is one way around this issue.
 Alternatively in your test sink you could for example write the data to 
files in a temporary directory.
 Of course you could use your own custom sources and sinks, which can emit 
watermarks.
+
+## Testing checkpointing and state handling
+
+One way to test state handling is to enable checkpointing in integration 
tests. You can do that by
+configuring `environment` in the test:
+~~~java
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+~~~
+and for example adding to your Flink application an identity mapper 
operator that will throw and exception
--- End diff --

"throw an exception" (typo)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4454
  
+1 for adding this section to the docs. And another +1 for adding something 
about testing watermarks / timestamps -- this is a very frequently asked 
question.

Could, perhaps, include a link to 
https://github.com/ottogroup/flink-spector/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4441: [FLINK-7301] [docs] Rework state documentation

2017-08-03 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4441
  
@twalthr Duh, of course, you're right. 

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4441: [FLINK-7301] [docs] Rework state documentation

2017-08-03 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4441#discussion_r131087040
  
--- Diff: docs/dev/stream/state/index.md ---
@@ -0,0 +1,56 @@
+---
+title: "State & Fault Tolerance"
+nav-id: streaming_state
+nav-title: "State & Fault Tolerance"
+nav-parent_id: streaming
+nav-pos: 3
+nav-show_overview: true
+---
+
+
+Stateful functions and operators store data across the processing of 
individual elements/events, making state a critical building block for
+any type of more elaborate operation.
+
+For example:
+
+  - When an application searches for certain event patterns, the state 
will store the sequence of events encountered so far.
+  - When aggregating events per minute/hour/day, the state holds the 
pending aggregates.
+  - When training a machine learning model over a stream of data points, 
the state holds the current version of the model parameters.
+  - When historic data needs to be managed, the state allows efficient 
access to events occured in the past. 
+
+Flink needs to be aware of the state in order to make state fault tolerant 
using [checkpoints](checkpointing.html) and allow [savepoints]({{ site.baseurl 
}}/ops/state/savepoints.html) of streaming applications.
--- End diff --

"and to allow [savepoints]"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4441: [FLINK-7301] [docs] Rework state documentation

2017-08-03 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4441#discussion_r131085493
  
--- Diff: docs/dev/stream/state/custom_serialization.md ---
@@ -0,0 +1,188 @@
+---
+title: "Custom Serialization for Managed State"
+nav-title: "Custom Serialization"
+nav-parent_id: streaming_state
+nav-pos: 10
+---
+
+
+If your application uses Flink's managed state, it might be necessary to 
implement a custom serialization logic for special use cases.
--- End diff --

drop the word "a" in "implement a custom serialization logic" so that it 
reads "implement custom serialization logic"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4441: [FLINK-7301] [docs] Rework state documentation

2017-08-03 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4441#discussion_r131086039
  
--- Diff: docs/dev/stream/state/custom_serialization.md ---
@@ -0,0 +1,188 @@
+---
+title: "Custom Serialization for Managed State"
+nav-title: "Custom Serialization"
+nav-parent_id: streaming_state
+nav-pos: 10
+---
+
+
+If your application uses Flink's managed state, it might be necessary to 
implement a custom serialization logic for special use cases.
+
+This page is targeted as a guideline for users who require the use of 
custom serialization for their state, covering how
+to provide a custom serializer and how to handle upgrades to the 
serializer for compatibility. If you're simply using
+Flink's own serializers, this page is irrelevant and can be skipped.
+
+### Using custom serializers
+
+As demonstrated in the above examples, when registering a managed operator 
or keyed state, a `StateDescriptor` is required
+to specify the state's name, as well as information about the type of the 
state. The type information is used by Flink's
+[type serialization framework](../../types_serialization.html) to create 
appropriate serializers for the state.
+
+It is also possible to completely bypass this and let Flink use your own 
custom serializer to serialize managed states,
+simply by directly instantiating the `StateDescriptor` with your own 
`TypeSerializer` implementation:
+
+
+
+{% highlight java %}
+public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, 
Integer>> {...};
+
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+new ListStateDescriptor<>(
+"state-name",
+new CustomTypeSerializer());
+
+checkpointedState = getRuntimeContext().getListState(descriptor);
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
+
+val descriptor = new ListStateDescriptor[(String, Integer)](
+"state-name",
+new CustomTypeSerializer)
+)
+
+checkpointedState = getRuntimeContext.getListState(descriptor);
+{% endhighlight %}
+
+
+
+Note that Flink writes state serializers along with the state as metadata. 
In certain cases on restore (see following
+subsections), the written serializer needs to be deserialized and used. 
Therefore, it is recommended to avoid using
+anonymous classes as your state serializers. Anonymous classes do not have 
a guarantee on the generated classname,
+varying across compilers and depends on the order that they are 
instantiated within the enclosing class, which can 
--- End diff --

"varying across compilers and depends" ==> "which varies across compilers 
and depends"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4136: [FLINK-6940][docs] Clarify the effect of configuri...

2017-07-10 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4136#discussion_r126431179
  
--- Diff: docs/ops/state_backends.md ---
@@ -123,8 +123,7 @@ RocksDBStateBackend is currently the only backend that 
offers incremental checkp
 
 ## Configuring a State Backend
 
-State backends can be configured per job. In addition, you can define a 
default state backend to be used when the
-job does not explicitly define a state backend.
+State backends can be configured per job in code. In addition, you can 
define a default state backend in **flink-conf.yaml** that is used when the job 
does not explicitly define a state backend.
--- End diff --

@zentol I find that "in code" reads rather awkwardly, and I don't see how 
it adds any value, since the details of how to do per-job configuration are 
shown below. Nevertheless, this topic can be a bit confusing, so I would 
suggest something more like this (assuming I got the details right):

The default state backend, if you specify nothing, is the jobmanager. If 
you wish to establish a different default for all jobs on your cluster, you can 
do so by defining a new default state backend in **flink-conf.yaml**. The 
default state backend can be overridden on a per-job basis, as shown below.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4291: [FLINK-7136][docs] Improve search by adding facets...

2017-07-10 Thread alpinegizmo
GitHub user alpinegizmo opened a pull request:

https://github.com/apache/flink/pull/4291

[FLINK-7136][docs] Improve search by adding facets and turning off ads

This PR expands the custom search engine used by the documentation to 
include some additional sources, and expands the UI to include tabs for 
breaking out the results by source, and turns off ads (which we are entitled to 
do, since the ASF is a non-profit org).

The specifications for the list of sites to be searched, their weights, and 
the UI are checked in as XML files. However, editing these files won't have any 
direct impact. Instead, they must be re-uploaded to the google custom search 
console. The reason to check them in is to make the search settings visible, 
and to preserve a copy of all the settings within the project.

Before and after screenshots when searching for "timers":

https://user-images.githubusercontent.com/43608/28008542-3ae67428-6558-11e7-881a-5e79369ef9d3.png;>

https://user-images.githubusercontent.com/43608/28008577-60e11fca-6558-11e7-82f5-e28c7c9f364a.png;>




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink 7136-doc-search

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4291.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4291


commit 9f2ed8cbae957b030774d48ac4206904d7cb8385
Author: David Anderson <da...@alpinegizmo.com>
Date:   2017-07-07T20:54:17Z

[FLINK-7136][docs] Improve search by adding facets and turning off ads




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4088: [FLINK-6652] [core] Fix handling of delimiters split by b...

2017-06-16 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4088
  
FYI, the reference solutions to the batch training exercises are failing, 
and it looks to me like it's because of this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3720: [FLINK-6302] Documentation build error on ruby 2.4

2017-06-06 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/3720
  
Yes, let's get this done.

On Jun 6, 2017 19:03, "Stephan Ewen" <notificati...@github.com> wrote:

    > @alpinegizmo <https://github.com/alpinegizmo> @zentol
> <https://github.com/zentol> Now that #4043
> <https://github.com/apache/flink/pull/4043> is merged, can we rebase this
> one?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/flink/pull/3720#issuecomment-306551741>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AACqWFH4ZhNVBH2qCD6bsOrQaukYd3Dfks5sBYZKgaJpZM4M8pJW>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119893904
  
--- Diff: docs/dev/table/sql.md ---
@@ -22,20 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified using the `sql()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a `Table` 
which can be converted into a `DataSet` or `DataStream`, used in subsequent 
Table API queries, or written to a `TableSink` (see [Writing Tables to External 
Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can 
seamlessly mixed and are holistically optimized and translated into a single 
DataStream or DataSet program.
+Flink supports specifying DataStream or DataSet programs with SQL queries 
using the `sql()` method of the `TableEnvironment`. The method returns the 
result of the SQL query as a `Table`. A `Table` can be used in the subsequent 
SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in 
subsequent Table API queries or written to a `TableSink` (see [Writing Tables 
to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
 
-A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be 
registered in the `TableEnvironment` in order to be accessible by a SQL query 
(see [Registering Tables](#registering-tables)). For convenience 
`Table.toString()` will automatically register an unique table name under the 
`Table`'s `TableEnvironment` and return the table name. So it allows to call 
SQL directly on tables in a string concatenation (see examples below).
+To access the data in the SQL queries, users must register data sources, 
including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the 
`TableEnvironment` (see [Registering 
Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can 
also register external catalogs in the `TableEnvironment` to specify the 
location of the data sources.
 
-*Note: Flink's SQL support is not feature complete, yet. Queries that 
include unsupported SQL features will cause a `TableException`. The limitations 
of SQL on batch and streaming tables are listed in the following sections.*
+For convenience `Table.toString()` will automatically register an unique 
table name under the `Table`'s `TableEnvironment` and return the table name. So 
it allows to call SQL directly on tables in a string concatenation (see 
examples below).
--- End diff --

... a unique table name ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119894456
  
--- Diff: docs/dev/table/sql.md ---
@@ -22,20 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified using the `sql()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a `Table` 
which can be converted into a `DataSet` or `DataStream`, used in subsequent 
Table API queries, or written to a `TableSink` (see [Writing Tables to External 
Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can 
seamlessly mixed and are holistically optimized and translated into a single 
DataStream or DataSet program.
+Flink supports specifying DataStream or DataSet programs with SQL queries 
using the `sql()` method of the `TableEnvironment`. The method returns the 
result of the SQL query as a `Table`. A `Table` can be used in the subsequent 
SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in 
subsequent Table API queries or written to a `TableSink` (see [Writing Tables 
to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
 
-A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be 
registered in the `TableEnvironment` in order to be accessible by a SQL query 
(see [Registering Tables](#registering-tables)). For convenience 
`Table.toString()` will automatically register an unique table name under the 
`Table`'s `TableEnvironment` and return the table name. So it allows to call 
SQL directly on tables in a string concatenation (see examples below).
+To access the data in the SQL queries, users must register data sources, 
including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the 
`TableEnvironment` (see [Registering 
Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can 
also register external catalogs in the `TableEnvironment` to specify the 
location of the data sources.
 
-*Note: Flink's SQL support is not feature complete, yet. Queries that 
include unsupported SQL features will cause a `TableException`. The limitations 
of SQL on batch and streaming tables are listed in the following sections.*
+For convenience `Table.toString()` will automatically register an unique 
table name under the `Table`'s `TableEnvironment` and return the table name. So 
it allows to call SQL directly on tables in a string concatenation (see 
examples below).
 
-**TODO: Rework intro. Move some parts below. **
+*Note: Flink's SQL support is not feature complete, yet. Queries that 
include unsupported SQL features will cause a `TableException`. The limitations 
of SQL on batch and streaming tables are listed in the following sections.*
 
 * This will be replaced by the TOC
 {:toc}
 
 Specifying a Query
 ---
 
+Here are a few examples on how to specify a DataStream / DataSet program 
using SQL:
--- End diff --

examples of how to


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119894379
  
--- Diff: docs/dev/table/sql.md ---
@@ -22,20 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified using the `sql()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a `Table` 
which can be converted into a `DataSet` or `DataStream`, used in subsequent 
Table API queries, or written to a `TableSink` (see [Writing Tables to External 
Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can 
seamlessly mixed and are holistically optimized and translated into a single 
DataStream or DataSet program.
+Flink supports specifying DataStream or DataSet programs with SQL queries 
using the `sql()` method of the `TableEnvironment`. The method returns the 
result of the SQL query as a `Table`. A `Table` can be used in the subsequent 
SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in 
subsequent Table API queries or written to a `TableSink` (see [Writing Tables 
to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
 
-A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be 
registered in the `TableEnvironment` in order to be accessible by a SQL query 
(see [Registering Tables](#registering-tables)). For convenience 
`Table.toString()` will automatically register an unique table name under the 
`Table`'s `TableEnvironment` and return the table name. So it allows to call 
SQL directly on tables in a string concatenation (see examples below).
+To access the data in the SQL queries, users must register data sources, 
including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the 
`TableEnvironment` (see [Registering 
Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can 
also register external catalogs in the `TableEnvironment` to specify the 
location of the data sources.
 
-*Note: Flink's SQL support is not feature complete, yet. Queries that 
include unsupported SQL features will cause a `TableException`. The limitations 
of SQL on batch and streaming tables are listed in the following sections.*
+For convenience `Table.toString()` will automatically register an unique 
table name under the `Table`'s `TableEnvironment` and return the table name. So 
it allows to call SQL directly on tables in a string concatenation (see 
examples below).
 
-**TODO: Rework intro. Move some parts below. **
+*Note: Flink's SQL support is not feature complete, yet. Queries that 
include unsupported SQL features will cause a `TableException`. The limitations 
of SQL on batch and streaming tables are listed in the following sections.*
--- End diff --

Flink's SQL support is not yet feature complete.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119896766
  
--- Diff: docs/dev/table/sql.md ---
@@ -22,20 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified using the `sql()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a `Table` 
which can be converted into a `DataSet` or `DataStream`, used in subsequent 
Table API queries, or written to a `TableSink` (see [Writing Tables to External 
Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can 
seamlessly mixed and are holistically optimized and translated into a single 
DataStream or DataSet program.
+Flink supports specifying DataStream or DataSet programs with SQL queries 
using the `sql()` method of the `TableEnvironment`. The method returns the 
result of the SQL query as a `Table`. A `Table` can be used in the subsequent 
SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in 
subsequent Table API queries or written to a `TableSink` (see [Writing Tables 
to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
 
-A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be 
registered in the `TableEnvironment` in order to be accessible by a SQL query 
(see [Registering Tables](#registering-tables)). For convenience 
`Table.toString()` will automatically register an unique table name under the 
`Table`'s `TableEnvironment` and return the table name. So it allows to call 
SQL directly on tables in a string concatenation (see examples below).
+To access the data in the SQL queries, users must register data sources, 
including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the 
`TableEnvironment` (see [Registering 
Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can 
also register external catalogs in the `TableEnvironment` to specify the 
location of the data sources.
--- End diff --

This reads a bit awkwardly. How about this?

Before using data in a SQL query, the data source(s) must first be 
registered in the `TableEnvironment` (see [Registering 
Tables](common.html#register-a-table-in-the-catalog)). Possible data sources 
include `Table`s, `DataSet`s, `DataStream`s, and external `TableSource`s. 
Alternatively, ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119894264
  
--- Diff: docs/dev/table/sql.md ---
@@ -22,20 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified using the `sql()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a `Table` 
which can be converted into a `DataSet` or `DataStream`, used in subsequent 
Table API queries, or written to a `TableSink` (see [Writing Tables to External 
Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can 
seamlessly mixed and are holistically optimized and translated into a single 
DataStream or DataSet program.
+Flink supports specifying DataStream or DataSet programs with SQL queries 
using the `sql()` method of the `TableEnvironment`. The method returns the 
result of the SQL query as a `Table`. A `Table` can be used in the subsequent 
SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in 
subsequent Table API queries or written to a `TableSink` (see [Writing Tables 
to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
 
-A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be 
registered in the `TableEnvironment` in order to be accessible by a SQL query 
(see [Registering Tables](#registering-tables)). For convenience 
`Table.toString()` will automatically register an unique table name under the 
`Table`'s `TableEnvironment` and return the table name. So it allows to call 
SQL directly on tables in a string concatenation (see examples below).
+To access the data in the SQL queries, users must register data sources, 
including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the 
`TableEnvironment` (see [Registering 
Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can 
also register external catalogs in the `TableEnvironment` to specify the 
location of the data sources.
 
-*Note: Flink's SQL support is not feature complete, yet. Queries that 
include unsupported SQL features will cause a `TableException`. The limitations 
of SQL on batch and streaming tables are listed in the following sections.*
+For convenience `Table.toString()` will automatically register an unique 
table name under the `Table`'s `TableEnvironment` and return the table name. So 
it allows to call SQL directly on tables in a string concatenation (see 
examples below).
--- End diff --

This allows SQL to be called directly on tables in a string concatenation 
(see examples below).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119893598
  
--- Diff: docs/dev/table/sql.md ---
@@ -22,20 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified using the `sql()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a `Table` 
which can be converted into a `DataSet` or `DataStream`, used in subsequent 
Table API queries, or written to a `TableSink` (see [Writing Tables to External 
Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can 
seamlessly mixed and are holistically optimized and translated into a single 
DataStream or DataSet program.
+Flink supports specifying DataStream or DataSet programs with SQL queries 
using the `sql()` method of the `TableEnvironment`. The method returns the 
result of the SQL query as a `Table`. A `Table` can be used in the subsequent 
SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in 
subsequent Table API queries or written to a `TableSink` (see [Writing Tables 
to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
--- End diff --

A `Table` can be used in subsequent SQL / Table API queries, be converted 
into a `DataSet` or `DataStream`, or written to a `TableSink` ...

_(drop "the" and the redundant "used in subsequent Table API queries")_




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4046#discussion_r119894751
  
--- Diff: docs/dev/table/sql.md ---
@@ -82,14 +84,12 @@ val result2 = tableEnv.sql(
 
 
 
-**TODO: Add some intro.**
-
 {% top %}
 
 Supported Syntax
 
 
-Flink uses [Apache 
Calcite](https://calcite.apache.org/docs/reference.html) for SQL parsing. 
Currently, Flink SQL only supports query-related SQL syntax and only a subset 
of the comprehensive SQL standard. The following BNF-grammar describes the 
supported SQL features:
+Flink parses SQL using [Apache 
Calcite](https://calcite.apache.org/docs/reference.html). Flink supports the 
standard ANSI SQL but it provides no supports for DML and DDL. The following 
BNF-grammar describes the supported SQL features:
--- End diff --

Flink supports standard ANSI SQL, but it provides no support for DML or DDL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119883989
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +333,118 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between consecutive 
events:
 
-
-
-{% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
-{% endhighlight %}
-
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
 
-
-{% highlight scala %}
-val strictNext: Pattern[Event, _] = start.next("middle")
-{% endhighlight %}
-
-
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
 
-Non-strict contiguity means that other events are allowed to occur 
in-between two matching events.
-A non-strict contiguity pattern state can be created via the `followedBy` 
or `followedByAny` method.
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
--- End diff --

+1 for the added formatting. I find the explanations hard to parse. How 
about this:

1. Strict Contiguity: `{a2 b}` -- the `"c"` after `"a1"` causes `"a1"` to 
be discarded.

2. Relaxed Contiguity: `{a1 b}` and `{a1 a2 b}` -- `"c"` is simply ignored.

3. Non-Deterministic Relaxed Contiguity: `{a1 b}`, `{a2 b}`, and `{a1 a2 
b}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119883183
  
--- Diff: docs/dev/libs/cep.md ---
@@ -98,48 +128,106 @@ val result: DataStream[Alert] = 
patternStream.select(createAlert(_))
 
 
 
-Note that we use Java 8 lambdas in our Java code examples to make them 
more succinct.
-
 ## The Pattern API
 
-The pattern API allows you to quickly define complex event patterns.
-
-Each pattern consists of multiple stages or what we call states.
-In order to go from one state to the next, the user can specify conditions.
-These conditions can be the contiguity of events or a filter condition on 
an event.
-
-Each pattern has to start with an initial state:
-
-
-
-{% highlight java %}
-Pattern<Event, ?> start = Pattern.begin("start");
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-val start : Pattern[Event, _] = Pattern.begin("start")
-{% endhighlight %}
-
-
-
-Each state must have a unique name to identify the matched events later on.
-Additionally, we can specify a filter condition for the event to be 
accepted as the start event via the `where` method.
-These filtering conditions can be either an `IterativeCondition` or a 
`SimpleCondition`. 
-
-**Iterative Conditions:** This type of conditions can iterate over the 
previously accepted elements in the pattern and 
-decide to accept a new element or not, based on some statistic over those 
elements. 
-
-Below is the code for an iterative condition that accepts elements whose 
name start with "foo" and for which, the sum 
-of the prices of the previously accepted elements for a state named 
"middle", plus the price of the current event, do 
-not exceed the value of 5.0. Iterative condition can be very powerful, 
especially in combination with quantifiers, e.g.
-`oneToMany` or `zeroToMany`.
+The pattern API allows you to quickly define complex pattern sequences 
that you want to extract 
+from your input stream.
+
+Each such complex pattern sequence consists of multiple simple patterns, 
i.e. patterns looking for 
+individual events with the same properties. These simple patterns are 
called **states**. A complex pattern 
+can be seen as a graph of such states, where transition from one state to 
the next happens based on user-specified
--- End diff --

... where transitions from one state to the next occur ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119890353
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
+ANDed:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+});
+{% endhighlight %}
+
+
+
+or(condition)
+
+Adds a new condition which is ORed with an existing 
one. Only if an event passes one of the 
+conditions, it can match the state:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+}).or(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // alternative condition
+}
+});
+{% endhighlight %}
+
+
+   
+   subtype(subClass)
+   
+   Defines a subtype condition for the current pattern 
state. Only if an event is of this subtype, 
+   it can match the state:
 {% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
+patternState.subtype(SubEvent.class);
 {% endhighlight %}
+   
+   
+   
+  oneOrMore()
+  
+  Specifies that this state expects at least one occurrence 
of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+  {% highlight java %}
+  patternState.oneOrMore();
+  {% endhighlight %}
+  
+   
+   
+  times(#ofTimes)
+  
+  Specifies that this state expects an exact number of 
occurrences of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+{% highlight java %}
+patternState.times(2);
+{% endhighlight %}
+  
+   
+   
+  optional()
+  
+  Specifies that this pattern is optional, i.e. it may not 
occur at a

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119886927
  
--- Diff: docs/dev/libs/cep.md ---
@@ -700,26 +995,29 @@ class MyPatternFlatSelectFunction<IN, OUT> implements 
PatternFlatSelectFunction<
 
 
 
-The `select` method takes a selection function as argument, which is 
called for each matching event sequence.
-It receives a map of string/event pairs of the matched events.
-The string is defined by the name of the state to which the event has been 
matched.
-The selection function returns exactly one result per call.
+The `select()` method takes a selection function as argument, which is 
called for each matching event sequence.
+It receives a match in the form of `Map[String, Iterable[IN]]` where the 
key is the name of each state in your pattern 
+sequence and the value is an Iterable over all accepted events for that 
state (`IN` is the type of your input elements). 
+The events for a given state are ordered by timestamp. The reason for 
returning an iterable of accepted events for each 
+state is that when using looping states (e.g. `oneToMany()` and 
`times()`), more than one events may be accepted for a 
+given state. The selection function returns exactly one result per call.
 
 {% highlight scala %}
-def selectFn(pattern : mutable.Map[String, IN]): OUT = {
-val startEvent = pattern.get("start").get
-val endEvent = pattern.get("end").get
+def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
+val startEvent = pattern.get("start").get.next
+val endEvent = pattern.get("end").get.next
 OUT(startEvent, endEvent)
 }
 {% endhighlight %}
 
-The `flatSelect` method is similar to the `select` method. Their only 
difference is that the function passed to the `flatSelect` method can return an 
arbitrary number of results per call.
-In order to do this, the function for `flatSelect` has an additional 
`Collector` parameter which is used for the element output.
+The `flatSelect` method is similar to the `select` method. Their only 
difference is that the function passed to the 
+`flatSelect` method can return an arbitrary number of results per call. In 
order to do this, the function for 
+`flatSelect` has an additional `Collector` parameter which is used for 
forwarding your output elements downstream.
--- End diff --

... which is used to forward ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119885420
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
--- End diff --

Non-Deterministic Relaxed Contiguity is hard to understand. Maybe this is 
clearer:

... which further relaxes contiguity, allowing additional matches that 
ignore some matching events.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119879015
  
--- Diff: docs/dev/libs/cep.md ---
@@ -168,33 +256,34 @@ start.where(
 
 
 
-Attention The call to 
`Context.getEventsForPattern(...)` has to find the 
-elements that belong to the pattern. The cost of this operation can vary, 
so when implementing your condition, try 
-to minimize the times the method is called.
+Attention The call to 
`context.getEventsForPattern(...)` finds all the 
+previously accepted events for a given potential match. The cost of this 
operation can vary, so when implementing 
+your condition, try to minimize the times the method is called.
 
-**Simple Conditions:** This type of conditions extend the aforementioned 
`IterativeCondition` class. They are simple 
-filtering conditions that decide to accept an element or not, based only 
on properties of the element itself.
+**Simple Conditions:** This type of conditions extend the aforementioned 
`IterativeCondition` class and decides 
--- End diff --

... This type of condition extends the aforementioned `IterativeCondition` 
class and decides whether ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119890712
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
+ANDed:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+});
+{% endhighlight %}
+
+
+
+or(condition)
+
+Adds a new condition which is ORed with an existing 
one. Only if an event passes one of the 
+conditions, it can match the state:
--- End diff --

An event can match the state only if it passes at least one of the 
conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119878726
  
--- Diff: docs/dev/libs/cep.md ---
@@ -168,33 +256,34 @@ start.where(
 
 
 
-Attention The call to 
`Context.getEventsForPattern(...)` has to find the 
-elements that belong to the pattern. The cost of this operation can vary, 
so when implementing your condition, try 
-to minimize the times the method is called.
+Attention The call to 
`context.getEventsForPattern(...)` finds all the 
+previously accepted events for a given potential match. The cost of this 
operation can vary, so when implementing 
+your condition, try to minimize the times the method is called.
--- End diff --

... try to minimize its use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119888143
  
--- Diff: docs/dev/libs/cep.md ---
@@ -341,153 +765,67 @@ Pattern<Event, ?> start = 
Pattern.begin("start");
 
 
 
-Next
+next()
 
-Appends a new pattern state. A matching event has to 
directly succeed the previous matching event:
+Appends a new pattern state. A matching event has to 
directly succeed the previous matching event 
+(strict contiguity):
 {% highlight java %}
-Pattern<Event, ?> next = start.next("next");
+Pattern<Event, ?> next = start.next("middle");
 {% endhighlight %}
 
 
 
-FollowedBy
+followedBy()
 
-Appends a new pattern state. Other events can occur 
between a matching event and the previous matching event:
+Appends a new pattern state. Other events can occur 
between a matching event and the previous 
+matching event (relaxed contiguity):
 {% highlight java %}
-Pattern<Event, ?> followedBy = start.followedBy("next");
+Pattern<Event, ?> followedBy = start.followedBy("middle");
 {% endhighlight %}
 
 
 
-Where
+followedByAny()
 
-Defines a condition for the current pattern state. Only 
if an event satisifes the condition, it can match the state:
+Appends a new pattern state. Other events can occur 
between a matching event and the previous 
+matching event and alternative matches will be presented 
for every alternative matching event 
--- End diff --

add a comma:

... matching event, and alternative matches ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119853002
  
--- Diff: docs/dev/libs/cep.md ---
@@ -98,48 +128,106 @@ val result: DataStream[Alert] = 
patternStream.select(createAlert(_))
 
 
 
-Note that we use Java 8 lambdas in our Java code examples to make them 
more succinct.
-
 ## The Pattern API
 
-The pattern API allows you to quickly define complex event patterns.
-
-Each pattern consists of multiple stages or what we call states.
-In order to go from one state to the next, the user can specify conditions.
-These conditions can be the contiguity of events or a filter condition on 
an event.
-
-Each pattern has to start with an initial state:
-
-
-
-{% highlight java %}
-Pattern<Event, ?> start = Pattern.begin("start");
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-val start : Pattern[Event, _] = Pattern.begin("start")
-{% endhighlight %}
-
-
-
-Each state must have a unique name to identify the matched events later on.
-Additionally, we can specify a filter condition for the event to be 
accepted as the start event via the `where` method.
-These filtering conditions can be either an `IterativeCondition` or a 
`SimpleCondition`. 
-
-**Iterative Conditions:** This type of conditions can iterate over the 
previously accepted elements in the pattern and 
-decide to accept a new element or not, based on some statistic over those 
elements. 
-
-Below is the code for an iterative condition that accepts elements whose 
name start with "foo" and for which, the sum 
-of the prices of the previously accepted elements for a state named 
"middle", plus the price of the current event, do 
-not exceed the value of 5.0. Iterative condition can be very powerful, 
especially in combination with quantifiers, e.g.
-`oneToMany` or `zeroToMany`.
+The pattern API allows you to quickly define complex pattern sequences 
that you want to extract 
+from your input stream.
+
+Each such complex pattern sequence consists of multiple simple patterns, 
i.e. patterns looking for 
+individual events with the same properties. These simple patterns are 
called **states**. A complex pattern 
+can be seen as a graph of such states, where transition from one state to 
the next happens based on user-specified
+*conditions*, e.g. `event.getName().equals("start")`. A *match* is a 
sequence of input events which visit all 
+states of the complex pattern graph, through a sequence of valid state 
transitions.
+
+Attention Each state must have a 
unique name to identify the matched 
+events later on. 
+
+Attention State names **CANNOT** 
contain the character `:`.
+
+In the remainder, we start by describing how to define [States](#states), 
before describing how you can 
+combine individual states into [Complex Patterns](#combining-states).
+
+### Individual States
+
+A **State** can be either a *singleton* state, or a *looping* one. 
Singleton states accept a single 
+event, while looping ones can accept more than one. In pattern matching 
symbols, in the pattern `a b+ c? d` (or `a`, 
+followed by *one or more* `b`'s, optionally followed by a `c`, followed by 
a `d`), `a`, `c?`, and `d` are 
+singleton patterns, while `b+` is a looping one. By default, a state is a 
singleton state and you can transform 
+it to a looping one using [Quantifiers](#quantifiers). In addition, each 
state can have one or more 
+[Conditions](#conditions) based on which it accepts events.
+
+ Quantifiers
+
+In FlinkCEP, looping patterns can be specified using the methods: 
`pattern.oneOrMore()`, for states that expect one or
+more occurrences of a given event (e.g. the `b+` mentioned previously), 
and `pattern.times(#ofTimes)` for states that 
+expect a specific number of occurrences of a given type of event, e.g. 4 
`a`'s. All states, looping or not, can be made 
+optional using the `pattern.optional()` method. For a state named `start`, 
the following are valid quantifiers:
+ 
+ 
+ 
+ {% highlight java %}
+ // expecting 4 occurrences
+ start.times(4);
+  
+ // expecting 0 or 4 occurrences
+ start.times(4).optional();
+ 
+ // expecting 1 or more occurrences
+ start.oneOrMore();
+   
+ // expecting 0 or more occurrences
+ start.oneOrMore().optional();
+ {% endhighlight %}
+ 
+ 
+ 
+ {% highlight scala %}
+ // expecting 4 occurrences
+ start.times(4)
+   
+ // expecting 0 or 4 occurrences
+ start.times(4).optional()
+  
+ // expecting 1 or more occurrences
+ start.oneOrMore()
+
+ // expecting 0 or more occurrences

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119889986
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
+ANDed:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+});
+{% endhighlight %}
+
+
+
+or(condition)
+
+Adds a new condition which is ORed with an existing 
one. Only if an event passes one of the 
+conditions, it can match the state:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+}).or(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // alternative condition
+}
+});
+{% endhighlight %}
+
+
+   
+   subtype(subClass)
+   
+   Defines a subtype condition for the current pattern 
state. Only if an event is of this subtype, 
+   it can match the state:
 {% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
+patternState.subtype(SubEvent.class);
 {% endhighlight %}
+   
+   
+   
+  oneOrMore()
+  
+  Specifies that this state expects at least one occurrence 
of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+  {% highlight java %}
+  patternState.oneOrMore();
+  {% endhighlight %}
+  
+   
+   
+  times(#ofTimes)
+  
+  Specifies that this state expects an exact number of 
occurrences of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+{% highlight java %}
+patternState.times(2);
+{% endhighlight %}
+  
+   
+   
+  optional()
+  
+  Specifies that this pattern is optional, i.e. it may not 
occur at a

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119888607
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
+ANDed:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+});
+{% endhighlight %}
+
+
+
+or(condition)
+
+Adds a new condition which is ORed with an existing 
one. Only if an event passes one of the 
+conditions, it can match the state:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+}).or(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // alternative condition
+}
+});
+{% endhighlight %}
+
+
+   
+   subtype(subClass)
+   
+   Defines a subtype condition for the current pattern 
state. Only if an event is of this subtype, 
+   it can match the state:
 {% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
+patternState.subtype(SubEvent.class);
 {% endhighlight %}
+   
+   
+   
+  oneOrMore()
+  
+  Specifies that this state expects at least one occurrence 
of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+  {% highlight java %}
+  patternState.oneOrMore();
+  {% endhighlight %}
+  
+   
+   
+  times(#ofTimes)
+  
+  Specifies that this state expects an exact number of 
occurrences of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+{% highlight java %}
+patternState.times(2);
+{% endhighlight %}
+  
+   
+   
+  optional()
+  
+  Specifies that this pattern is optional, i.e. it may not 
occur at a

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r11907
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
+ANDed:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+});
+{% endhighlight %}
+
+
+
+or(condition)
+
+Adds a new condition which is ORed with an existing 
one. Only if an event passes one of the 
+conditions, it can match the state:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+}).or(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // alternative condition
+}
+});
+{% endhighlight %}
+
+
+   
+   subtype(subClass)
+   
+   Defines a subtype condition for the current pattern 
state. Only if an event is of this subtype, 
+   it can match the state:
 {% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
+patternState.subtype(SubEvent.class);
 {% endhighlight %}
+   
+   
+   
+  oneOrMore()
+  
+  Specifies that this state expects at least one occurrence 
of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+  {% highlight java %}
+  patternState.oneOrMore();
+  {% endhighlight %}
+  
+   
+   
+  times(#ofTimes)
+  
+  Specifies that this state expects an exact number of 
occurrences of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+{% highlight java %}
+patternState.times(2);
+{% endhighlight %}
+  
+   
+   
+  optional()
+  
+  Specifies that this pattern is optional, i.e. it may not 
occur at a

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119886720
  
--- Diff: docs/dev/libs/cep.md ---
@@ -700,26 +995,29 @@ class MyPatternFlatSelectFunction<IN, OUT> implements 
PatternFlatSelectFunction<
 
 
 
-The `select` method takes a selection function as argument, which is 
called for each matching event sequence.
-It receives a map of string/event pairs of the matched events.
-The string is defined by the name of the state to which the event has been 
matched.
-The selection function returns exactly one result per call.
+The `select()` method takes a selection function as argument, which is 
called for each matching event sequence.
+It receives a match in the form of `Map[String, Iterable[IN]]` where the 
key is the name of each state in your pattern 
+sequence and the value is an Iterable over all accepted events for that 
state (`IN` is the type of your input elements). 
+The events for a given state are ordered by timestamp. The reason for 
returning an iterable of accepted events for each 
+state is that when using looping states (e.g. `oneToMany()` and 
`times()`), more than one events may be accepted for a 
--- End diff --

... more than one event may be accepted ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119889674
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
+ANDed:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+});
+{% endhighlight %}
+
+
+
+or(condition)
+
+Adds a new condition which is ORed with an existing 
one. Only if an event passes one of the 
+conditions, it can match the state:
+{% highlight java %}
+patternState.where(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // some condition
+}
+}).or(new IterativeCondition() {
+@Override
+public boolean filter(Event value, Context ctx) throws Exception {
+return ... // alternative condition
+}
+});
+{% endhighlight %}
+
+
+   
+   subtype(subClass)
+   
+   Defines a subtype condition for the current pattern 
state. Only if an event is of this subtype, 
+   it can match the state:
 {% highlight java %}
-Pattern<Event, ?> strictNext = start.next("middle");
+patternState.subtype(SubEvent.class);
 {% endhighlight %}
+   
+   
+   
+  oneOrMore()
+  
+  Specifies that this state expects at least one occurrence 
of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+  {% highlight java %}
+  patternState.oneOrMore();
+  {% endhighlight %}
+  
+   
+   
+  times(#ofTimes)
+  
+  Specifies that this state expects an exact number of 
occurrences of a matching event.
+  By default a relaxed internal contiguity (between 
subsequent events) is used. For more info on the 
+  internal contiguity see consecutive
+{% highlight java %}
+patternState.times(2);
+{% endhighlight %}
+  
+   
+   
+  optional()
+  
+  Specifies that this pattern is optional, i.e. it may not 
occur at a

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119852417
  
--- Diff: docs/dev/libs/cep.md ---
@@ -98,48 +128,105 @@ val result: DataStream[Alert] = 
patternStream.select(createAlert(_))
 
 
 
-Note that we use Java 8 lambdas in our Java code examples to make them 
more succinct.
-
 ## The Pattern API
 
-The pattern API allows you to quickly define complex event patterns.
-
-Each pattern consists of multiple stages or what we call states.
-In order to go from one state to the next, the user can specify conditions.
-These conditions can be the contiguity of events or a filter condition on 
an event.
-
-Each pattern has to start with an initial state:
+The pattern API allows you to quickly define complex pattern sequences 
that you want to extract 
+from your input stream.
+
+Each such complex pattern sequence consists of multiple simple patterns, 
i.e. patterns looking for 
+individual events with the same properties. These simple patterns are 
called **states**. A complex pattern 
+can be seen as a graph of such states, where transition from one state to 
the next happens based on user-specified
+*conditions*, e.g. `event.getName().equals("start")`. A *match* is a 
sequence of input events which visit all 
+states of the complex pattern graph, through a sequence of valid state 
transitions.
+
+Attention Each state must have a 
unique name to identify the matched 
+events later on. 
+
+Attention State names **CANNOT** 
contain the character `:`.
+
+In the remainder, we start by describing how to define [States](#states), 
before describing how you can 
+combine individual states into [Complex Patterns](#combining-states).
+
+### Individual States
+
+A **State** can be either a *singleton* state, or a *looping* one. 
Singleton states accept a single event, 
+while looping ones accept more than one. In pattern matching symbols, in 
the pattern `a b+ c? d` (or `a`, 
+followed by *one or more* `b`'s, optionally followed by a `c`, followed by 
a `d`), `a`, `c?`, and `d` are 
+singleton patterns, while `b+` is a looping one (see 
[Quantifiers](#quantifiers)). In addition, each state 
+can have one or more *conditions* based on which it accepts events (see 
[Conditions](#conditions)).
+
+ Quantifiers
+
+In FlinkCEP, looping patterns can be specified using the methods: 
`pattern.oneOrMore()`, for states that expect one or
+more occurrences of a given event (e.g. the `b+` mentioned previously), 
and `pattern.times(#ofTimes)` for states that 
+expect a specific number of occurrences of a given type of event, e.g. 4 
`a`'s. All states, looping or not, can be made 
+optional using the `pattern.optional()` method. For a state named `start`, 
the following are valid quantifiers:
+ 
+ 
+ 
+ {% highlight java %}
+ // expecting 4 occurrences
+ start.times(4);
+  
+ // expecting 0 or 4 occurrences
+ start.times(4).optional();
+ 
+ // expecting 1 or more occurrences
+ start.oneOrMore();
+   
+ // expecting 0 or more occurrences
+ start.oneOrMore().optional();
+ {% endhighlight %}
+ 
+ 
+ 
+ {% highlight scala %}
+ // expecting 4 occurrences
+ start.times(4)
+   
+ // expecting 0 or 4 occurrences
+ start.times(4).optional()
+  
+ // expecting 1 or more occurrences
+ start.oneOrMore()
+
+ // expecting 0 or more occurrences
+ start.oneOrMore().optional()
+ {% endhighlight %}
+ 
+ 
+
+ Conditions
+
+At every state, and in order to go from one state to the next, you can 
specify additional **conditions**. 
+These conditions can be related to:
+ 
+ 1. a [property of the incoming event](#conditions-on-properties), e.g. 
its value should be larger than 5, 
+ or larger than the average value of the previously accepted events.
+
+ 2. the [contiguity of the matching events](#conditions-on-contiguity), 
e.g. detect pattern `a,b,c` without 
+ non-matching events between any matching ones.
+ 
+The latter refers to "looping" states, i.e. states that can accept more 
than one event, e.g. the `b+` in `a b+ c`, 
+which searches for one or more `b`'s.
+
+# Conditions on Properties
+
+Conditions on the event properties can be specified via the 
`pattern.where()` method. These can be either 
+`IterativeCondition`s or `SimpleCondition`s.
+
+**Iterative Conditions:** This is the most general type of conditions. 
This allows to specify a condition that accepts 
+any subsequent event based on some statistic over a subset of the 
previously accepted events. 
--- End diff --

This is the most general type of condition.

[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119887736
  
--- Diff: docs/dev/libs/cep.md ---
@@ -511,133 +849,81 @@ val start = Pattern.begin[Event]("start")
 
 
 
-Next
+next()
 
-Appends a new pattern state. A matching event has to 
directly succeed the previous matching event:
+Appends a new pattern state. A matching event has to 
directly succeed the previous matching event 
+(strict contiguity):
 {% highlight scala %}
 val next = start.next("middle")
 {% endhighlight %}
 
 
 
-FollowedBy
+followedBy()
 
-Appends a new pattern state. Other events can occur 
between a matching event and the previous matching event:
+Appends a new pattern state. Other events can occur 
between a matching event and the previous 
+matching event (relaxed contiguity) :
 {% highlight scala %}
 val followedBy = start.followedBy("middle")
 {% endhighlight %}
 
 
 
-Where
-
-Defines a filter condition for the current pattern 
state. Only if an event passes the filter, it can match the state:
-{% highlight scala %}
-patternState.where(event => ... /* some condition */)
-{% endhighlight %}
-
-
-
-Or
-
-Adds a new filter condition which is ORed with an 
existing filter condition. Only if an event passes the filter condition, it can 
match the state:
-{% highlight scala %}
-patternState.where(event => ... /* some condition */)
-.or(event => ... /* alternative condition */)
-{% endhighlight %}
-
+followedByAny()
+
+Appends a new pattern state. Other events can 
occur between a matching event and the previous 
+matching event and alternative matches will be 
presented for every alternative matching event 
--- End diff --

add a comma:

... matching event, and alternative matches will be ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.

2017-06-02 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4041#discussion_r119891095
  
--- Diff: docs/dev/libs/cep.md ---
@@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition 
*/).or(event => ... /* or condition
 
 
 
-Next, we can append further states to detect complex patterns.
-We can control the contiguity of two succeeding events to be accepted by 
the pattern.
+# Conditions on Contiguity
 
-Strict contiguity means that two matching events have to be directly the 
one after the other.
-This means that no other events can occur in between. 
-A strict contiguity pattern state can be created via the `next` method.
+FlinkCEP supports the following forms of contiguity between events:
+
+ 1. Strict Contiguity: which expects all matching events to appear 
strictly the one after the other,
+ without any non-matching events in-between.
+
+ 2. Relaxed Contiguity: which simply ignores non-matching events appearing 
in-between the matching ones.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity 
by also creating alternative
+ matches which ignore also matching events.
+
+To illustrate the above with an example, a pattern sequence `a+ b` (one or 
more `a`s followed by a `b`) with 
+input `a1, c, a2, b` will have the following results:
+
+ 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` 
is discarded.
+
+ 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply 
ignored.
+ 
+ 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`.
+ 
+For looping states (e.g. `oneOrMore()` and `times()`) the default is 
*relaxed contiguity*. If you want 
+strict contiguity, you have to explicitly specify it by using the 
`consecutive()` call, and if you want 
+*non-deterministic relaxed contiguity* you can use the 
`allowCombinations()` call.
 
 
 
+
+
+
+Pattern Operation
+Description
+
+
+
+   
+where(condition)
+
+Defines a condition for the current state. Only if an 
event satisifes the condition, 
+it can match the state. Multiple consecutive where() 
clauses lead to their condtions being 
--- End diff --

To match the state, an event must satisfy the condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >