[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/6097 +1 Nice! ---
[GitHub] flink issue #6086: [FLINK-9452] Flink 1.5 document version title shows snaps...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/6086 @zentol I think the baseurl variable also needs to be changed -- for a stable release, it should not point to master. ---
[GitHub] flink pull request #6076: [hotfix][docs] Specify operators behaviour on proc...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/6076#discussion_r190841926 --- Diff: docs/dev/event_time.md --- @@ -219,4 +219,17 @@ with late elements in event time windows. Please refer to the [Debugging Windows & Event Time]({{ site.baseurl }}/monitoring/debugging_event_time.html) section for debugging watermarks at runtime. +## How operators are processing watermarks + +General rule is that operator are required to completely process a given watermark before forwarding it downstream. For example, +`WindowOperator` will first evaluate which windows should be fired and only after producing all of the output triggered by +the watermark, the watermark itself will be handled downstream. In other words, all elements produced due to occurrence of +the watermark will be emitted before such watermark. + +Same rule applies to `TwoInputStreamOperator`. However in this case current watermark of the operator is defined as a minimum +of both of it's inputs. + +Details of this behaviour is defined by implementations of methods `OneInputStreamOperator.processWatermark`, +`TwoInputStreamOperator.processWatermark1` and `TwoInputStreamOperator.processWatermark2`. + --- End diff -- I offer some grammatical improvements. Also, is it correct to describe "operators are required to completely process a given watermark before forwarding it downstream" as a general rule, meaning that it might have exceptions, or should we simply say "operators are required ..." without adding this caveat? I changed behaviour to behavior because most of the docs seem to be using American spellings rather than English ones, but I'm not sure if we have a policy regarding this. ---
[GitHub] flink pull request #6076: [hotfix][docs] Specify operators behaviour on proc...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/6076#discussion_r190841061 --- Diff: docs/dev/event_time.md --- @@ -219,4 +219,17 @@ with late elements in event time windows. Please refer to the [Debugging Windows & Event Time]({{ site.baseurl }}/monitoring/debugging_event_time.html) section for debugging watermarks at runtime. +## How operators are processing watermarks + +General rule is that operator are required to completely process a given watermark before forwarding it downstream. For example, +`WindowOperator` will first evaluate which windows should be fired and only after producing all of the output triggered by +the watermark, the watermark itself will be handled downstream. In other words, all elements produced due to occurrence of +the watermark will be emitted before such watermark. + +Same rule applies to `TwoInputStreamOperator`. However in this case current watermark of the operator is defined as a minimum +of both of it's inputs. + +Details of this behaviour is defined by implementations of methods `OneInputStreamOperator.processWatermark`, +`TwoInputStreamOperator.processWatermark1` and `TwoInputStreamOperator.processWatermark2`. + --- End diff -- As a general rule, operators are required to completely process a given watermark before forwarding it downstream. For example, `WindowOperator` will first evaluate which windows should be fired, and only after producing all of the output triggered by the watermark will the watermark itself be sent downstream. In other words, all elements produced due to occurrence of a watermark will be emitted before the watermark. The same rule applies to `TwoInputStreamOperator`. However, in this case the current watermark of the operator is defined as the minimum of both of its inputs. The details of this behavior are defined by the implementations of the `OneInputStreamOperator.processWatermark`, `TwoInputStreamOperator.processWatermark1` and `TwoInputStreamOperator.processWatermark2` methods. ---
[GitHub] flink pull request #6010: [FLINK-9359][docs] Update quickstart docs to only ...
GitHub user alpinegizmo opened a pull request: https://github.com/apache/flink/pull/6010 [FLINK-9359][docs] Update quickstart docs to only mention Java 8 This is a trivial but important fix to the quickstart setup page in the docs. This fix should be made to the 1.4, 1.5, and 1.6 docs so that folks aren't confused when Java 7, 9, and 10 all fail to work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alpinegizmo/flink 9359-quickstart-docs-java8 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6010.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6010 commit df430cbdc6e9e30edcba791eadc34980b6fe7d3d Author: David Anderson <david@...> Date: 2018-05-14T11:57:39Z [FLINK-9359][docs] Update quickstart docs to only mention Java 8 ---
[GitHub] flink issue #5976: [FLINK-9309] Recommend HA setup on Production Readiness C...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5976 +1 ---
[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...
GitHub user alpinegizmo reopened a pull request: https://github.com/apache/flink/pull/5949 [FLINK-9288][docs] clarify the event time / watermark docs This PR only affects the documentation (for event time and watermarks). I wanted to make a couple of things clearer, and to provide a couple of additional internal links. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alpinegizmo/flink event-time-watermarks-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5949.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5949 commit a5c66a56845ada7b0a20471a4842175c5a6566d6 Author: David Anderson <david@...> Date: 2018-05-02T11:50:48Z [FLINK-9288][docs] clarify the event time / watermark docs commit a100cab6fec6ab3affa4ecc13c46e0081bd19b62 Author: David Anderson <david@...> Date: 2018-05-03T05:12:12Z Reworked the section on event time to be less absolutist. ---
[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...
Github user alpinegizmo closed the pull request at: https://github.com/apache/flink/pull/5949 ---
[GitHub] flink issue #5949: [FLINK-9288][docs] clarify the event time / watermark doc...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5949 @bowenli86 Thanks for the feedback. I've reworked that event time section. Hopefully it's now more complete and accurate without being too complex. ---
[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...
GitHub user alpinegizmo opened a pull request: https://github.com/apache/flink/pull/5949 [FLINK-9288][docs] clarify the event time / watermark docs This PR only affects the documentation (for event time and watermarks). I wanted to make a couple of things clearer, and to provide a couple of additional internal links. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alpinegizmo/flink event-time-watermarks-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5949.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5949 commit a5c66a56845ada7b0a20471a4842175c5a6566d6 Author: David Anderson <david@...> Date: 2018-05-02T11:50:48Z [FLINK-9288][docs] clarify the event time / watermark docs ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184970245 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to streams whose elements are *broadcasted*, + 3. the only operation available to a stream with broadcast state is to be *connected* to another keyed or non-keyed stream, + 4. such a broadcast stream can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolve over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPart
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184969214 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to streams whose elements are *broadcasted*, + 3. the only operation available to a stream with broadcast state is to be *connected* to another keyed or non-keyed stream, + 4. such a broadcast stream can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolve over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPart
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184968333 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to streams whose elements are *broadcasted*, + 3. the only operation available to a stream with broadcast state is to be *connected* to another keyed or non-keyed stream, + 4. such a broadcast stream can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolve over time. --- End diff -- the set of interesting patterns evolves over time. ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184969839 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to streams whose elements are *broadcasted*, + 3. the only operation available to a stream with broadcast state is to be *connected* to another keyed or non-keyed stream, + 4. such a broadcast stream can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolve over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPart
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184971409 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + --- End diff -- I find this intro paragraph confusing. Would it be correct to rewrite it like this? [Working with State](state.html) describes operator state which **upon restore** is either **evenly** distributed among the parallel tasks of an operator, or **unioned**, with the whole state being used to initialize the restored parallel tasks. ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184971516 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally --- End diff -- use cases (no hyphen) ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r184969718 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to streams whose elements are *broadcasted*, + 3. the only operation available to a stream with broadcast state is to be *connected* to another keyed or non-keyed stream, + 4. such a broadcast stream can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolve over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream +.keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPart
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184962653 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,539 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. --- End diff -- Flinkâs Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers. ---
[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5887#discussion_r184960782 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -271,16 +271,39 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): -## Optimizations +## Timers -### Timer Coalescing +Every timer registered via `registerEventTimeTimer()` or `registerProcessingTimeTimer()` will be stored on `TimerService` +and enqueued for execution. -Every timer registered at the `TimerService` via `registerEventTimeTimer()` or -`registerProcessingTimeTimer()` will be stored on the Java heap and enqueued for execution. There is, -however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the -worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any -processing for outdated timers in `onTimer`, this may put a significant burden on the -Flink runtime. +Invocations of `onTimer()` and `processElement()` are always synchronized, so that users don't have to worry about +concurrent modification of state. + +Note that there is a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the worst case, +every key may have a timer for each upcoming millisecond. Even if you do not do any processing for outdated timers in `onTimer`, +this may put a significant burden on the Flink runtime. + +### Fault Tolerance + +Timers registered within `ProcessFunction` are fault tolerant. They are synchronously checkpointed by Flink, regardless of +configurations of state backends. (Therefore, a large number of timers can significantly increase checkpointing time. See optimizations +section for advice to reduce the number of timers.) --- End diff -- See the optimizations section for advice on how to reduce the number of timers. ---
[GitHub] flink issue #5924: [hotfix][README.md] Update building prerequisites
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5924 +1 I frequently see newcomers to Flink getting hung up on this. ---
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r184959202 --- Diff: docs/dev/stream/state/checkpointing.md --- @@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... -- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. +- `state.checkpoints.dir`: The target directory for storing checkpoints data files and meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. --- End diff -- This seems potentially misleading -- isn't it okay to use a `file://` URI in the case of a distributed filesystem that is mounted at the same mount point across the cluster? ---
[GitHub] flink issue #5785: [FLINK-9108][docs] Fix invalid link
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5785 +1 ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 These lines need to be restored to the Gemfile. The hawkins plugin is needed for the incremental build and live reload feature group :jekyll_plugins do gem 'hawkins' end The bundled version of jekyll (3.7.2) requires ruby >= 2.1, but our build machines use ruby 2.0. If we can't get the apache INFRA team to upgrade ruby, we'll have to rework this. The Gemfile.lock doesn't work with ruby 2.1 -- I get this error ruby_dep-1.5.0 requires ruby version >= 2.2.5, which is incompatible with the current version, ruby 2.1.10p492 but re-bundling fixes this. I think we should commit a Gemfile.lock file that is compatible back to Ruby 2.1, if we determine we can get ruby 2.1 -- otherwise we'll have to roll back jekyll and then re-bundle with ruby 2.0. This PR works fine on ruby 2.3 and 2.4. The latest stable release of rvm doesn't yet support ruby 2.5, so I didn't test it. ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 Sure. ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 It looks good, but I haven't tested it. I'm wondering what versions of Ruby this has been tested with. At a minimum it needs to work with whatever version we can get on the production build infrastructure, as well as 2.3 and 2.4, since most developers will have one of those versions. And maybe 2.5, since that's out now. ---
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 It would be lovely to ditch ruby 1.9. However, if I understand correctly, we don't fully control the environment that builds the docs, and the last time we tried this we couldn't get a newer ruby there. But it has been a while, and maybe the ASF infrastructure folks have seen the light. @rmetzger Can we somehow determine what ruby version is running on the build machines? ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r16290 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + --- End diff -- This needs some clarification. Is taskmanager.network.memory.buffers-per-channel only used in credit-based flow control mode? Does choosing a value greater than 2 provide any benefit? ---
[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5317#discussion_r162906372 --- Diff: docs/ops/config.md --- @@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager and TaskManagers. - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. +- `taskmanager.network.memory.buffers-per-channel`: Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). Especially in credit-based flow control mode, it indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receving in-flight data in the subpartition and 1 buffer is for parallel serialization. + +- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, it indicates how many floating credits are shared for all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback. So the floating buffers can help relief back-pressure caused by imbalance data distribution among subpartitions. + --- End diff -- Number of extra network buffers used by each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. ---
[GitHub] flink pull request #5277: [hotfix][docs]Review to reduce passive voice, impr...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5277#discussion_r162902645 --- Diff: docs/concepts/runtime.md --- @@ -46,19 +46,23 @@ The Flink runtime consists of two types of processes: - The **JobManagers** (also called *masters*) coordinate the distributed execution. They schedule tasks, coordinate checkpoints, coordinate recovery on failures, etc. -There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of -which one is always the *leader*, and the others are *standby*. +There is always at least one Job Manager. A high-availability setup should have multiple JobManagers, one of --- End diff -- "A high-availability setup should have multiple JobManagers" is, in general, not true -- this is a detail that depends on the underlying cluster management framework. I suggest reworking as follows: There is always at least one Job Manager, but some [high-availability setups]({{ site.baseurl }}/ops/jobmanager_high_availability.html) will have multiple JobManagers. ---
[GitHub] flink issue #5129: [Hotfix for State doc in 1.4 and 1.5] update State's doc ...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5129 +1 ---
[GitHub] flink issue #5119: [FLINK-6590] Integrate automatic docs generation
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5119 Very pleased to see this coming to fruition. ---
[GitHub] flink pull request #5075: [hotfix] [docs] Fix typos in State Backends doc
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/5075#discussion_r153149948 --- Diff: docs/ops/state/state_backends.md --- @@ -110,10 +110,10 @@ Limitations of the RocksDBStateBackend: The RocksDBStateBackend is encouraged for: - - Jobs with very large state, long windows, large key/value states. + - Jobs with a very large state, long windows, large key/value states. --- End diff -- This change is ungrammatical, but the rest are good. ---
[GitHub] flink issue #5039: [hotfix][docs] Fix some typos in the documentation.
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5039 lgtm ---
[GitHub] flink pull request #4992: [FLINK-6163] Document per-window state in ProcessW...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4992#discussion_r150194036 --- Diff: docs/dev/stream/operators/windows.md --- @@ -978,6 +978,24 @@ input +### Using per-window state in ProcessWindowFunction + +In addition to accessing keyed state (as any rich function can) a `ProcessWindowFunction` can +also use keyed state that is scoped to the window that the function is currently processing. The are --- End diff -- The are --> There are ---
[GitHub] flink issue #4869: [FLINK-7843][docs] Improve documentation for metrics
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4869 I can't vouch for the accuracy of the info, but +1 for adding this level of detail. ---
[GitHub] flink pull request #4833: [FLINK-5968] Add documentation for WindowedStream....
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4833#discussion_r144842788 --- Diff: docs/dev/stream/operators/windows.md --- @@ -721,6 +808,111 @@ input + Incremental Window Aggregation with AggregateFunction + +The following example shows how an incremental `AggregateFunction` can be combined with +a `ProcesWindowFunction` to compute the average and also emit the key and window along with +the average. + + + +{% highlight java %} +DataStream<Tuple2<String, Long> input = ...; + +input + .keyBy() + .timeWindow() + .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); + +// Function definitions + +/** + * The accumulator is used to keep a running sum and a count. The {@code getResult} method + * computes the average. + */ +private static class AverageAggregate +implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { + @Override + public Tuple2<Long, Long> createAccumulator() { +return new Tuple2<>(0L, 0L); + } + + @Override + public Tuple2<Long, Long> add( +Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { +return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); + } + + @Override + public Double getResult(Tuple2<Long, Long> accumulator) { +return accumulator.f0 / accumulator.f1; + } + + @Override + public Tuple2<Long, Long> merge( +Tuple2<Long, Long> a, Tuple2<Long, Long> b) { +return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); + } +} + +private static class MyProcessWindowFunction +implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { + + public void apply(String key, +Context context, +Iterable averages, +Collector<Tuple2<String, Double>> out) { + Double average = averags.iterator().next(); --- End diff -- should be averages, not averags ---
[GitHub] flink issue #4816: [hotfix][docs] CEP docs review to remove weasel words, fi...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4816 +1 ---
[GitHub] flink issue #4756: [FLINK-7744][docs] Add missing top links to documentation
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4756 Yes, looks good to me. +1 ---
[GitHub] flink pull request #4760: [hotfix][docs] Polish docs index page for consiste...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4760#discussion_r142630633 --- Diff: docs/index.md --- @@ -25,27 +25,26 @@ under the License. -This documentation is for Apache Flink version {{ site.version_title }}. These pages have been built at: {% build_time %}. +This documentation is for Apache Flink version {{ site.version_title }}. These pages were built at: {% build_time %}. -Apache Flink is an open source platform for distributed stream and batch data processing. Flinkâs core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. +Apache Flink is an open source platform for distributed stream and batch data processing. Flinkâs core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization. ## First Steps -- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you to fully understand the other parts of the documentation, including the setup and programming guides. It is highly recommended to read these sections first. +- **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommended you read these sections first. - **Quickstarts**: [Run an example program](quickstart/setup_quickstart.html) on your local machine or [study some examples](examples/index.html). -- **Programming Guides**: You can check out our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. +- **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs. ## Deployment -Before putting your Flink job into production, be sure to read the [Production Readiness Checklist](ops/production_ready.html). +Before putting your Flink job into production, read the [Production Readiness Checklist](ops/production_ready.html). ## Migration Guide -For users of earlier versions of Apache Flink we recommend the [API migration guide](dev/migration.html). -While all parts of the API that were marked as public and stable are still supported (the public API is backwards compatible), we suggest migrating applications to the -newer interfaces where applicable. +For users of earlier versions of Apache Flink, we recommend the [API migration guide](dev/migration.html). +While all parts of the API marked as public and stable are still supported (the public API is backwards compatible), we suggest migrating applications to the newer interfaces where applicable. --- End diff -- I think "were marked" better communicates the intent here, as it is saying that the parts of the API that were marked as public and stable in previous releases are still supported (regardless of how they are currently marked). Otherwise, +1. ---
[GitHub] flink pull request #4687: [hotfix][docs][CEP] wrong method name for PatternF...
GitHub user alpinegizmo opened a pull request: https://github.com/apache/flink/pull/4687 [hotfix][docs][CEP] wrong method name for PatternFlatSelectFunction The name of the single abstract method in the PatternFlatSelectFunction interface is flatSelect, not select. This PR is just a trivial change to the docs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alpinegizmo/flink PatternFlatSelectFunction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4687.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4687 commit bf5bcf4b980f168fc01652df6d0bd0e0fe48a8ff Author: David Anderson <da...@alpinegizmo.com> Date: 2017-09-20T13:57:40Z [hotfix][docs] method name for PatternFlatSelectFunction should be flatSelect ---
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139691882 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687800 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139685466 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139690107 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139686793 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139692206 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139691118 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139685795 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687153 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687994 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139684257 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. --- End diff -- on top of the normal load from the pipelineâs data processing work. (add "the") ---
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139689544 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139691558 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139690574 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687055 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139689818 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139690806 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first checkpoint
[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4634#discussion_r137000137 --- Diff: docs/dev/stream/operators/windows.md --- @@ -663,25 +623,42 @@ input .keyBy() .window() .process(new MyProcessWindowFunction()) + +/* ... */ + +class MyWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { + + def apply(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { +var count = 0L +for (in <- input) { + count = count + 1 +} +out.collect(s"Window ${context.window} count: $count") + } +} {% endhighlight %} -### WindowFunction with Incremental Aggregation +The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output. + +Attention Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`. -A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to +### ProcessWindowFunction with Incremental Aggregation + +A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to --- End diff -- Sure, that sounds fine. ---
[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4634#discussion_r136973577 --- Diff: docs/dev/stream/operators/windows.md --- @@ -460,118 +465,14 @@ The above example appends all input `Long` values to an initially empty `String` Attention `fold()` cannot be used with session windows or other mergeable windows. -### WindowFunction - The Generic Case +### ProcessWindowFunction -A `WindowFunction` gets an `Iterable` containing all the elements of the window and provides +A `ProcessWindowFunction` gets an `Iterable` containing all the elements of the window and provides the most flexibility of all window functions. This comes --- End diff -- A `ProcessWindowFunction` gets an `Iterable` containing all the elements of the window, and a `Context` object with access to time and state information, which enables it to provide more flexibility than other window functions. ---
[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4634#discussion_r136972599 --- Diff: docs/dev/stream/operators/windows.md --- @@ -111,6 +111,11 @@ windows) assign elements to windows based on time, which can either be processin time. Please take a look at our section on [event time]({{ site.baseurl }}/dev/event_time.html) to learn about the difference between processing time and event time and how timestamps and watermarks are generated. +Time-based windows have a *start timestamp* (inclusive) and an *end timestamp* (exclusive) +that together describe the size of the window. In code, Flink uses `TimeWindow` when working with +time-based windows, this has methods for querying the start- and end-timestamp and also an --- End diff -- "this has methods" => "which has methods" ---
[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4634#discussion_r136843813 --- Diff: docs/dev/stream/operators/windows.md --- @@ -663,25 +623,42 @@ input .keyBy() .window() .process(new MyProcessWindowFunction()) + +/* ... */ + +class MyWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { + + def apply(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { +var count = 0L +for (in <- input) { + count = count + 1 +} +out.collect(s"Window ${context.window} count: $count") + } +} {% endhighlight %} -### WindowFunction with Incremental Aggregation +The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output. + +Attention Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`. -A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to +### ProcessWindowFunction with Incremental Aggregation + +A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to --- End diff -- Do we want to still mention FoldFunction here without mentioning it has been deprecated? Maybe we should talk about AggregateFunction instead. ---
[GitHub] flink pull request #4634: [FLINK-7568] Improve Windowing Documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4634#discussion_r136845008 --- Diff: docs/dev/stream/operators/windows.md --- @@ -490,15 +490,35 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl Iterable elements, Collector out) throws Exception; -/** - * The context holding window metadata - */ -public abstract class Context { -/** - * @return The window that is being evaluated. - */ -public abstract W window(); -} + /** +* The context holding window metadata. +*/ + public abstract class Context implements java.io.Serializable { + /** +* Returns the window that is being evaluated. +*/ + public abstract W window(); + + /** Returns the current processing time. */ + public abstract long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + public abstract long currentWatermark(); + + /** +* State accessor for per-key and per-window state. +* +* NOTE:If you use per-window state you have to ensure that you clean it up +* by implementing {@link ProcessWindowFunction#clear(Context)}. +*/ + public abstract KeyedStateStore windowState(); + + /** +* State accessor for per-key global state. +*/ + public abstract KeyedStateStore globalState(); + } --- End diff -- The difference between per-key per-window state and per-key global state deserves more explanation. It would be even better to sketch out scenarios of what one might do with these accessors. There's a fair bit of power being exposed here, and that's not at all obvious. ---
[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4553 +1 I like these improvements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4480: [FLINK-6995] [docs] Enable is_latest attribute to false
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4480 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4454#discussion_r131354369 --- Diff: docs/dev/testing.md --- @@ -0,0 +1,189 @@ +--- +title: "Testing" +nav-parent_id: dev +nav-id: testing +nav-pos: 99 +--- + + +This page briefly discusses how to test Flink application in the local environment. + +* This will be replaced by the TOC +{:toc} + +## Unit testing + +It is encouraged to test your classes with unit tests as much as possible. For example if one implement following `ReduceFunction`: + +~~~java +public class SumReduce implements ReduceFunction { +@Override +public Long reduce(Long value1, Long value2) throws Exception { +return value1 + value2; +} +} +~~~ + +it is very easy to unit test it with your favorite framework: + +~~~java +public class SumReduceTest { +@Test +public void testSum() throws Exception { +SumReduce sumReduce = new SumReduce(); + +assertEquals(42L, sumReduce.reduce(40L, 2L)); +} +} +~~~ + +Or in scala: + +~~~scala +class SumReduce extends ReduceFunction[Long] { +override def reduce(value1: java.lang.Long, +value2: java.lang.Long): java.lang.Long = value1 + value2 +} +~~~ + +~~~scala +class SumReduceTest extends FlatSpec with Matchers { +"SumReduce" should "add values" in { +val sumReduce: SumReduce = new SumReduce() +sumReduce.reduce(40L, 2L) should be (42L) +} +} +~~~ + +## Integration testing + +You also can write integration tests that are executed against local Flink mini cluster. +In order to do so add a test dependency `flink-test-utils`. + +~~~ xml + + org.apache.flink + flink-test-utils{{site.scala_version_suffix}} + {{site.version}} + +~~~ --- End diff -- To get this to work in my outside-of-flink project, I had to modify this as: org.apache.flink flink-test-utils_{{site.scala_version_suffix}} ${flink.version} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4454#discussion_r131352677 --- Diff: docs/dev/testing.md --- @@ -90,13 +121,69 @@ public class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase { public static final List values = new ArrayList<>(); @Override -public void invoke(Long value) throws Exception { +public synchronized void invoke(Long value) throws Exception { values.add(value); } } } ~~~ -Static variable in `CollectSink` is required because Flink serializes all operators before distributing them across a cluster. +or in Scala: + +~~~scala +class MultiplyByTwo extends MapFunction[Long, Long] { + override def map(value: java.lang.Long): java.lang.Long = value * 2 +} +~~~ + +~~~scala +class ExampleIntegrationTest extends FlatSpec with Matchers { +"MultiplyByTwo" should "multiply it input by two" in { +val env: StreamExecutionEnvironment = +StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +// values are collected on a static variable +CollectSink.values.clear() +env +.fromElements(1L, 21L, 22L) +.map(new MultiplyByTwo()) +.addSink(new CollectSink()) +env.execute() +CollectSink.values should be (Lists.newArrayList(2L, 42L, 44L)) +} +} + +object CollectSink { +// must be static +val values: List[Long] = new ArrayList() +} + +class CollectSink extends SinkFunction[Long] { +override def invoke(value: java.lang.Long): Unit = { +synchronized { +values.add(value) +} +} +} +~~~ + +Static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local flink mini cluster via static variables is one way around this issue. Alternatively in your test sink you could for example write the data to files in a temporary directory. Of course you could use your own custom sources and sinks, which can emit watermarks. + +## Testing checkpointing and state handling + +One way to test state handling is to enable checkpointing in integration tests. You can do that by +configuring `environment` in the test: +~~~java +env.enableCheckpointing(500); +env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); +~~~ +and for example adding to your Flink application an identity mapper operator that will throw and exception --- End diff -- "throw an exception" (typo) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4454 +1 for adding this section to the docs. And another +1 for adding something about testing watermarks / timestamps -- this is a very frequently asked question. Could, perhaps, include a link to https://github.com/ottogroup/flink-spector/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4441: [FLINK-7301] [docs] Rework state documentation
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4441 @twalthr Duh, of course, you're right. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4441: [FLINK-7301] [docs] Rework state documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4441#discussion_r131087040 --- Diff: docs/dev/stream/state/index.md --- @@ -0,0 +1,56 @@ +--- +title: "State & Fault Tolerance" +nav-id: streaming_state +nav-title: "State & Fault Tolerance" +nav-parent_id: streaming +nav-pos: 3 +nav-show_overview: true +--- + + +Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for +any type of more elaborate operation. + +For example: + + - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. + - When aggregating events per minute/hour/day, the state holds the pending aggregates. + - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. + - When historic data needs to be managed, the state allows efficient access to events occured in the past. + +Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications. --- End diff -- "and to allow [savepoints]" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4441: [FLINK-7301] [docs] Rework state documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4441#discussion_r131085493 --- Diff: docs/dev/stream/state/custom_serialization.md --- @@ -0,0 +1,188 @@ +--- +title: "Custom Serialization for Managed State" +nav-title: "Custom Serialization" +nav-parent_id: streaming_state +nav-pos: 10 +--- + + +If your application uses Flink's managed state, it might be necessary to implement a custom serialization logic for special use cases. --- End diff -- drop the word "a" in "implement a custom serialization logic" so that it reads "implement custom serialization logic" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4441: [FLINK-7301] [docs] Rework state documentation
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4441#discussion_r131086039 --- Diff: docs/dev/stream/state/custom_serialization.md --- @@ -0,0 +1,188 @@ +--- +title: "Custom Serialization for Managed State" +nav-title: "Custom Serialization" +nav-parent_id: streaming_state +nav-pos: 10 +--- + + +If your application uses Flink's managed state, it might be necessary to implement a custom serialization logic for special use cases. + +This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this page is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + + +{% highlight java %} +public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...}; + +ListStateDescriptor<Tuple2<String, Integer>> descriptor = +new ListStateDescriptor<>( +"state-name", +new CustomTypeSerializer()); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + + + +{% highlight scala %} +class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...} + +val descriptor = new ListStateDescriptor[(String, Integer)]( +"state-name", +new CustomTypeSerializer) +) + +checkpointedState = getRuntimeContext.getListState(descriptor); +{% endhighlight %} + + + +Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following +subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using +anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, +varying across compilers and depends on the order that they are instantiated within the enclosing class, which can --- End diff -- "varying across compilers and depends" ==> "which varies across compilers and depends" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4136: [FLINK-6940][docs] Clarify the effect of configuri...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4136#discussion_r126431179 --- Diff: docs/ops/state_backends.md --- @@ -123,8 +123,7 @@ RocksDBStateBackend is currently the only backend that offers incremental checkp ## Configuring a State Backend -State backends can be configured per job. In addition, you can define a default state backend to be used when the -job does not explicitly define a state backend. +State backends can be configured per job in code. In addition, you can define a default state backend in **flink-conf.yaml** that is used when the job does not explicitly define a state backend. --- End diff -- @zentol I find that "in code" reads rather awkwardly, and I don't see how it adds any value, since the details of how to do per-job configuration are shown below. Nevertheless, this topic can be a bit confusing, so I would suggest something more like this (assuming I got the details right): The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in **flink-conf.yaml**. The default state backend can be overridden on a per-job basis, as shown below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4291: [FLINK-7136][docs] Improve search by adding facets...
GitHub user alpinegizmo opened a pull request: https://github.com/apache/flink/pull/4291 [FLINK-7136][docs] Improve search by adding facets and turning off ads This PR expands the custom search engine used by the documentation to include some additional sources, and expands the UI to include tabs for breaking out the results by source, and turns off ads (which we are entitled to do, since the ASF is a non-profit org). The specifications for the list of sites to be searched, their weights, and the UI are checked in as XML files. However, editing these files won't have any direct impact. Instead, they must be re-uploaded to the google custom search console. The reason to check them in is to make the search settings visible, and to preserve a copy of all the settings within the project. Before and after screenshots when searching for "timers": https://user-images.githubusercontent.com/43608/28008542-3ae67428-6558-11e7-881a-5e79369ef9d3.png;> https://user-images.githubusercontent.com/43608/28008577-60e11fca-6558-11e7-82f5-e28c7c9f364a.png;> You can merge this pull request into a Git repository by running: $ git pull https://github.com/alpinegizmo/flink 7136-doc-search Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4291.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4291 commit 9f2ed8cbae957b030774d48ac4206904d7cb8385 Author: David Anderson <da...@alpinegizmo.com> Date: 2017-07-07T20:54:17Z [FLINK-7136][docs] Improve search by adding facets and turning off ads --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4088: [FLINK-6652] [core] Fix handling of delimiters split by b...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4088 FYI, the reference solutions to the batch training exercises are failing, and it looks to me like it's because of this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3720: [FLINK-6302] Documentation build error on ruby 2.4
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/3720 Yes, let's get this done. On Jun 6, 2017 19:03, "Stephan Ewen" <notificati...@github.com> wrote: > @alpinegizmo <https://github.com/alpinegizmo> @zentol > <https://github.com/zentol> Now that #4043 > <https://github.com/apache/flink/pull/4043> is merged, can we rebase this > one? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/flink/pull/3720#issuecomment-306551741>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AACqWFH4ZhNVBH2qCD6bsOrQaukYd3Dfks5sBYZKgaJpZM4M8pJW> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119893904 --- Diff: docs/dev/table/sql.md --- @@ -22,20 +22,22 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or written to a `TableSink` (see [Writing Tables to External Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. +Flink supports specifying DataStream or DataSet programs with SQL queries using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in the subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries or written to a `TableSink` (see [Writing Tables to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. -A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)). For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). +To access the data in the SQL queries, users must register data sources, including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the `TableEnvironment` (see [Registering Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can also register external catalogs in the `TableEnvironment` to specify the location of the data sources. -*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* +For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). --- End diff -- ... a unique table name ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119894456 --- Diff: docs/dev/table/sql.md --- @@ -22,20 +22,22 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or written to a `TableSink` (see [Writing Tables to External Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. +Flink supports specifying DataStream or DataSet programs with SQL queries using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in the subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries or written to a `TableSink` (see [Writing Tables to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. -A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)). For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). +To access the data in the SQL queries, users must register data sources, including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the `TableEnvironment` (see [Registering Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can also register external catalogs in the `TableEnvironment` to specify the location of the data sources. -*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* +For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). -**TODO: Rework intro. Move some parts below. ** +*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* * This will be replaced by the TOC {:toc} Specifying a Query --- +Here are a few examples on how to specify a DataStream / DataSet program using SQL: --- End diff -- examples of how to --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119894379 --- Diff: docs/dev/table/sql.md --- @@ -22,20 +22,22 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or written to a `TableSink` (see [Writing Tables to External Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. +Flink supports specifying DataStream or DataSet programs with SQL queries using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in the subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries or written to a `TableSink` (see [Writing Tables to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. -A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)). For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). +To access the data in the SQL queries, users must register data sources, including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the `TableEnvironment` (see [Registering Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can also register external catalogs in the `TableEnvironment` to specify the location of the data sources. -*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* +For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). -**TODO: Rework intro. Move some parts below. ** +*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* --- End diff -- Flink's SQL support is not yet feature complete. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119896766 --- Diff: docs/dev/table/sql.md --- @@ -22,20 +22,22 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or written to a `TableSink` (see [Writing Tables to External Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. +Flink supports specifying DataStream or DataSet programs with SQL queries using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in the subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries or written to a `TableSink` (see [Writing Tables to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. -A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)). For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). +To access the data in the SQL queries, users must register data sources, including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the `TableEnvironment` (see [Registering Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can also register external catalogs in the `TableEnvironment` to specify the location of the data sources. --- End diff -- This reads a bit awkwardly. How about this? Before using data in a SQL query, the data source(s) must first be registered in the `TableEnvironment` (see [Registering Tables](common.html#register-a-table-in-the-catalog)). Possible data sources include `Table`s, `DataSet`s, `DataStream`s, and external `TableSource`s. Alternatively, ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119894264 --- Diff: docs/dev/table/sql.md --- @@ -22,20 +22,22 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or written to a `TableSink` (see [Writing Tables to External Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. +Flink supports specifying DataStream or DataSet programs with SQL queries using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in the subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries or written to a `TableSink` (see [Writing Tables to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. -A `Table`, `DataSet`, `DataStream`, or external `TableSource` must be registered in the `TableEnvironment` in order to be accessible by a SQL query (see [Registering Tables](#registering-tables)). For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). +To access the data in the SQL queries, users must register data sources, including `Table`, `DataSet`, `DataStream` or external `TableSource`, in the `TableEnvironment` (see [Registering Tables](common.html#register-a-table-in-the-catalog)). Alternatively, users can also register external catalogs in the `TableEnvironment` to specify the location of the data sources. -*Note: Flink's SQL support is not feature complete, yet. Queries that include unsupported SQL features will cause a `TableException`. The limitations of SQL on batch and streaming tables are listed in the following sections.* +For convenience `Table.toString()` will automatically register an unique table name under the `Table`'s `TableEnvironment` and return the table name. So it allows to call SQL directly on tables in a string concatenation (see examples below). --- End diff -- This allows SQL to be called directly on tables in a string concatenation (see examples below). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119893598 --- Diff: docs/dev/table/sql.md --- @@ -22,20 +22,22 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table` which can be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries, or written to a `TableSink` (see [Writing Tables to External Sinks](#writing-tables-to-external-sinks)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single DataStream or DataSet program. +Flink supports specifying DataStream or DataSet programs with SQL queries using the `sql()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in the subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, used in subsequent Table API queries or written to a `TableSink` (see [Writing Tables to External Sinks](common.html#emit-to-a-tablesink)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. --- End diff -- A `Table` can be used in subsequent SQL / Table API queries, be converted into a `DataSet` or `DataStream`, or written to a `TableSink` ... _(drop "the" and the redundant "used in subsequent Table API queries")_ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4046: [FLINK-6749] [table] Table API / SQL Docs: SQL Pag...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4046#discussion_r119894751 --- Diff: docs/dev/table/sql.md --- @@ -82,14 +84,12 @@ val result2 = tableEnv.sql( -**TODO: Add some intro.** - {% top %} Supported Syntax -Flink uses [Apache Calcite](https://calcite.apache.org/docs/reference.html) for SQL parsing. Currently, Flink SQL only supports query-related SQL syntax and only a subset of the comprehensive SQL standard. The following BNF-grammar describes the supported SQL features: +Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html). Flink supports the standard ANSI SQL but it provides no supports for DML and DDL. The following BNF-grammar describes the supported SQL features: --- End diff -- Flink supports standard ANSI SQL, but it provides no support for DML or DDL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119883989 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +333,118 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between consecutive events: - - -{% highlight java %} -Pattern<Event, ?> strictNext = start.next("middle"); -{% endhighlight %} - + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. - -{% highlight scala %} -val strictNext: Pattern[Event, _] = start.next("middle") -{% endhighlight %} - - + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. -Non-strict contiguity means that other events are allowed to occur in-between two matching events. -A non-strict contiguity pattern state can be created via the `followedBy` or `followedByAny` method. +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + --- End diff -- +1 for the added formatting. I find the explanations hard to parse. How about this: 1. Strict Contiguity: `{a2 b}` -- the `"c"` after `"a1"` causes `"a1"` to be discarded. 2. Relaxed Contiguity: `{a1 b}` and `{a1 a2 b}` -- `"c"` is simply ignored. 3. Non-Deterministic Relaxed Contiguity: `{a1 b}`, `{a2 b}`, and `{a1 a2 b}`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119883183 --- Diff: docs/dev/libs/cep.md --- @@ -98,48 +128,106 @@ val result: DataStream[Alert] = patternStream.select(createAlert(_)) -Note that we use Java 8 lambdas in our Java code examples to make them more succinct. - ## The Pattern API -The pattern API allows you to quickly define complex event patterns. - -Each pattern consists of multiple stages or what we call states. -In order to go from one state to the next, the user can specify conditions. -These conditions can be the contiguity of events or a filter condition on an event. - -Each pattern has to start with an initial state: - - - -{% highlight java %} -Pattern<Event, ?> start = Pattern.begin("start"); -{% endhighlight %} - - - -{% highlight scala %} -val start : Pattern[Event, _] = Pattern.begin("start") -{% endhighlight %} - - - -Each state must have a unique name to identify the matched events later on. -Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. -These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. - -**Iterative Conditions:** This type of conditions can iterate over the previously accepted elements in the pattern and -decide to accept a new element or not, based on some statistic over those elements. - -Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum -of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do -not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. -`oneToMany` or `zeroToMany`. +The pattern API allows you to quickly define complex pattern sequences that you want to extract +from your input stream. + +Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for +individual events with the same properties. These simple patterns are called **states**. A complex pattern +can be seen as a graph of such states, where transition from one state to the next happens based on user-specified --- End diff -- ... where transitions from one state to the next occur ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119890353 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being +ANDed: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}); +{% endhighlight %} + + + +or(condition) + +Adds a new condition which is ORed with an existing one. Only if an event passes one of the +conditions, it can match the state: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}).or(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // alternative condition +} +}); +{% endhighlight %} + + + + subtype(subClass) + + Defines a subtype condition for the current pattern state. Only if an event is of this subtype, + it can match the state: {% highlight java %} -Pattern<Event, ?> strictNext = start.next("middle"); +patternState.subtype(SubEvent.class); {% endhighlight %} + + + + oneOrMore() + + Specifies that this state expects at least one occurrence of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive + {% highlight java %} + patternState.oneOrMore(); + {% endhighlight %} + + + + times(#ofTimes) + + Specifies that this state expects an exact number of occurrences of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive +{% highlight java %} +patternState.times(2); +{% endhighlight %} + + + + optional() + + Specifies that this pattern is optional, i.e. it may not occur at a
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119886927 --- Diff: docs/dev/libs/cep.md --- @@ -700,26 +995,29 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction< -The `select` method takes a selection function as argument, which is called for each matching event sequence. -It receives a map of string/event pairs of the matched events. -The string is defined by the name of the state to which the event has been matched. -The selection function returns exactly one result per call. +The `select()` method takes a selection function as argument, which is called for each matching event sequence. +It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each state in your pattern +sequence and the value is an Iterable over all accepted events for that state (`IN` is the type of your input elements). +The events for a given state are ordered by timestamp. The reason for returning an iterable of accepted events for each +state is that when using looping states (e.g. `oneToMany()` and `times()`), more than one events may be accepted for a +given state. The selection function returns exactly one result per call. {% highlight scala %} -def selectFn(pattern : mutable.Map[String, IN]): OUT = { -val startEvent = pattern.get("start").get -val endEvent = pattern.get("end").get +def selectFn(pattern : Map[String, Iterable[IN]]): OUT = { +val startEvent = pattern.get("start").get.next +val endEvent = pattern.get("end").get.next OUT(startEvent, endEvent) } {% endhighlight %} -The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the `flatSelect` method can return an arbitrary number of results per call. -In order to do this, the function for `flatSelect` has an additional `Collector` parameter which is used for the element output. +The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the +`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for +`flatSelect` has an additional `Collector` parameter which is used for forwarding your output elements downstream. --- End diff -- ... which is used to forward ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119885420 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. --- End diff -- Non-Deterministic Relaxed Contiguity is hard to understand. Maybe this is clearer: ... which further relaxes contiguity, allowing additional matches that ignore some matching events. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119879015 --- Diff: docs/dev/libs/cep.md --- @@ -168,33 +256,34 @@ start.where( -Attention The call to `Context.getEventsForPattern(...)` has to find the -elements that belong to the pattern. The cost of this operation can vary, so when implementing your condition, try -to minimize the times the method is called. +Attention The call to `context.getEventsForPattern(...)` finds all the +previously accepted events for a given potential match. The cost of this operation can vary, so when implementing +your condition, try to minimize the times the method is called. -**Simple Conditions:** This type of conditions extend the aforementioned `IterativeCondition` class. They are simple -filtering conditions that decide to accept an element or not, based only on properties of the element itself. +**Simple Conditions:** This type of conditions extend the aforementioned `IterativeCondition` class and decides --- End diff -- ... This type of condition extends the aforementioned `IterativeCondition` class and decides whether ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119890712 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being +ANDed: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}); +{% endhighlight %} + + + +or(condition) + +Adds a new condition which is ORed with an existing one. Only if an event passes one of the +conditions, it can match the state: --- End diff -- An event can match the state only if it passes at least one of the conditions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119878726 --- Diff: docs/dev/libs/cep.md --- @@ -168,33 +256,34 @@ start.where( -Attention The call to `Context.getEventsForPattern(...)` has to find the -elements that belong to the pattern. The cost of this operation can vary, so when implementing your condition, try -to minimize the times the method is called. +Attention The call to `context.getEventsForPattern(...)` finds all the +previously accepted events for a given potential match. The cost of this operation can vary, so when implementing +your condition, try to minimize the times the method is called. --- End diff -- ... try to minimize its use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119888143 --- Diff: docs/dev/libs/cep.md --- @@ -341,153 +765,67 @@ Pattern<Event, ?> start = Pattern.begin("start"); -Next +next() -Appends a new pattern state. A matching event has to directly succeed the previous matching event: +Appends a new pattern state. A matching event has to directly succeed the previous matching event +(strict contiguity): {% highlight java %} -Pattern<Event, ?> next = start.next("next"); +Pattern<Event, ?> next = start.next("middle"); {% endhighlight %} -FollowedBy +followedBy() -Appends a new pattern state. Other events can occur between a matching event and the previous matching event: +Appends a new pattern state. Other events can occur between a matching event and the previous +matching event (relaxed contiguity): {% highlight java %} -Pattern<Event, ?> followedBy = start.followedBy("next"); +Pattern<Event, ?> followedBy = start.followedBy("middle"); {% endhighlight %} -Where +followedByAny() -Defines a condition for the current pattern state. Only if an event satisifes the condition, it can match the state: +Appends a new pattern state. Other events can occur between a matching event and the previous +matching event and alternative matches will be presented for every alternative matching event --- End diff -- add a comma: ... matching event, and alternative matches ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119853002 --- Diff: docs/dev/libs/cep.md --- @@ -98,48 +128,106 @@ val result: DataStream[Alert] = patternStream.select(createAlert(_)) -Note that we use Java 8 lambdas in our Java code examples to make them more succinct. - ## The Pattern API -The pattern API allows you to quickly define complex event patterns. - -Each pattern consists of multiple stages or what we call states. -In order to go from one state to the next, the user can specify conditions. -These conditions can be the contiguity of events or a filter condition on an event. - -Each pattern has to start with an initial state: - - - -{% highlight java %} -Pattern<Event, ?> start = Pattern.begin("start"); -{% endhighlight %} - - - -{% highlight scala %} -val start : Pattern[Event, _] = Pattern.begin("start") -{% endhighlight %} - - - -Each state must have a unique name to identify the matched events later on. -Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. -These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. - -**Iterative Conditions:** This type of conditions can iterate over the previously accepted elements in the pattern and -decide to accept a new element or not, based on some statistic over those elements. - -Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum -of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do -not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. -`oneToMany` or `zeroToMany`. +The pattern API allows you to quickly define complex pattern sequences that you want to extract +from your input stream. + +Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for +individual events with the same properties. These simple patterns are called **states**. A complex pattern +can be seen as a graph of such states, where transition from one state to the next happens based on user-specified +*conditions*, e.g. `event.getName().equals("start")`. A *match* is a sequence of input events which visit all +states of the complex pattern graph, through a sequence of valid state transitions. + +Attention Each state must have a unique name to identify the matched +events later on. + +Attention State names **CANNOT** contain the character `:`. + +In the remainder, we start by describing how to define [States](#states), before describing how you can +combine individual states into [Complex Patterns](#combining-states). + +### Individual States + +A **State** can be either a *singleton* state, or a *looping* one. Singleton states accept a single +event, while looping ones can accept more than one. In pattern matching symbols, in the pattern `a b+ c? d` (or `a`, +followed by *one or more* `b`'s, optionally followed by a `c`, followed by a `d`), `a`, `c?`, and `d` are +singleton patterns, while `b+` is a looping one. By default, a state is a singleton state and you can transform +it to a looping one using [Quantifiers](#quantifiers). In addition, each state can have one or more +[Conditions](#conditions) based on which it accepts events. + + Quantifiers + +In FlinkCEP, looping patterns can be specified using the methods: `pattern.oneOrMore()`, for states that expect one or +more occurrences of a given event (e.g. the `b+` mentioned previously), and `pattern.times(#ofTimes)` for states that +expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All states, looping or not, can be made +optional using the `pattern.optional()` method. For a state named `start`, the following are valid quantifiers: + + + + {% highlight java %} + // expecting 4 occurrences + start.times(4); + + // expecting 0 or 4 occurrences + start.times(4).optional(); + + // expecting 1 or more occurrences + start.oneOrMore(); + + // expecting 0 or more occurrences + start.oneOrMore().optional(); + {% endhighlight %} + + + + {% highlight scala %} + // expecting 4 occurrences + start.times(4) + + // expecting 0 or 4 occurrences + start.times(4).optional() + + // expecting 1 or more occurrences + start.oneOrMore() + + // expecting 0 or more occurrences
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119889986 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being +ANDed: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}); +{% endhighlight %} + + + +or(condition) + +Adds a new condition which is ORed with an existing one. Only if an event passes one of the +conditions, it can match the state: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}).or(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // alternative condition +} +}); +{% endhighlight %} + + + + subtype(subClass) + + Defines a subtype condition for the current pattern state. Only if an event is of this subtype, + it can match the state: {% highlight java %} -Pattern<Event, ?> strictNext = start.next("middle"); +patternState.subtype(SubEvent.class); {% endhighlight %} + + + + oneOrMore() + + Specifies that this state expects at least one occurrence of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive + {% highlight java %} + patternState.oneOrMore(); + {% endhighlight %} + + + + times(#ofTimes) + + Specifies that this state expects an exact number of occurrences of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive +{% highlight java %} +patternState.times(2); +{% endhighlight %} + + + + optional() + + Specifies that this pattern is optional, i.e. it may not occur at a
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119888607 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being +ANDed: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}); +{% endhighlight %} + + + +or(condition) + +Adds a new condition which is ORed with an existing one. Only if an event passes one of the +conditions, it can match the state: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}).or(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // alternative condition +} +}); +{% endhighlight %} + + + + subtype(subClass) + + Defines a subtype condition for the current pattern state. Only if an event is of this subtype, + it can match the state: {% highlight java %} -Pattern<Event, ?> strictNext = start.next("middle"); +patternState.subtype(SubEvent.class); {% endhighlight %} + + + + oneOrMore() + + Specifies that this state expects at least one occurrence of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive + {% highlight java %} + patternState.oneOrMore(); + {% endhighlight %} + + + + times(#ofTimes) + + Specifies that this state expects an exact number of occurrences of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive +{% highlight java %} +patternState.times(2); +{% endhighlight %} + + + + optional() + + Specifies that this pattern is optional, i.e. it may not occur at a
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r11907 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being +ANDed: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}); +{% endhighlight %} + + + +or(condition) + +Adds a new condition which is ORed with an existing one. Only if an event passes one of the +conditions, it can match the state: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}).or(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // alternative condition +} +}); +{% endhighlight %} + + + + subtype(subClass) + + Defines a subtype condition for the current pattern state. Only if an event is of this subtype, + it can match the state: {% highlight java %} -Pattern<Event, ?> strictNext = start.next("middle"); +patternState.subtype(SubEvent.class); {% endhighlight %} + + + + oneOrMore() + + Specifies that this state expects at least one occurrence of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive + {% highlight java %} + patternState.oneOrMore(); + {% endhighlight %} + + + + times(#ofTimes) + + Specifies that this state expects an exact number of occurrences of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive +{% highlight java %} +patternState.times(2); +{% endhighlight %} + + + + optional() + + Specifies that this pattern is optional, i.e. it may not occur at a
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119886720 --- Diff: docs/dev/libs/cep.md --- @@ -700,26 +995,29 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction< -The `select` method takes a selection function as argument, which is called for each matching event sequence. -It receives a map of string/event pairs of the matched events. -The string is defined by the name of the state to which the event has been matched. -The selection function returns exactly one result per call. +The `select()` method takes a selection function as argument, which is called for each matching event sequence. +It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each state in your pattern +sequence and the value is an Iterable over all accepted events for that state (`IN` is the type of your input elements). +The events for a given state are ordered by timestamp. The reason for returning an iterable of accepted events for each +state is that when using looping states (e.g. `oneToMany()` and `times()`), more than one events may be accepted for a --- End diff -- ... more than one event may be accepted ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119889674 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being +ANDed: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}); +{% endhighlight %} + + + +or(condition) + +Adds a new condition which is ORed with an existing one. Only if an event passes one of the +conditions, it can match the state: +{% highlight java %} +patternState.where(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // some condition +} +}).or(new IterativeCondition() { +@Override +public boolean filter(Event value, Context ctx) throws Exception { +return ... // alternative condition +} +}); +{% endhighlight %} + + + + subtype(subClass) + + Defines a subtype condition for the current pattern state. Only if an event is of this subtype, + it can match the state: {% highlight java %} -Pattern<Event, ?> strictNext = start.next("middle"); +patternState.subtype(SubEvent.class); {% endhighlight %} + + + + oneOrMore() + + Specifies that this state expects at least one occurrence of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive + {% highlight java %} + patternState.oneOrMore(); + {% endhighlight %} + + + + times(#ofTimes) + + Specifies that this state expects an exact number of occurrences of a matching event. + By default a relaxed internal contiguity (between subsequent events) is used. For more info on the + internal contiguity see consecutive +{% highlight java %} +patternState.times(2); +{% endhighlight %} + + + + optional() + + Specifies that this pattern is optional, i.e. it may not occur at a
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119852417 --- Diff: docs/dev/libs/cep.md --- @@ -98,48 +128,105 @@ val result: DataStream[Alert] = patternStream.select(createAlert(_)) -Note that we use Java 8 lambdas in our Java code examples to make them more succinct. - ## The Pattern API -The pattern API allows you to quickly define complex event patterns. - -Each pattern consists of multiple stages or what we call states. -In order to go from one state to the next, the user can specify conditions. -These conditions can be the contiguity of events or a filter condition on an event. - -Each pattern has to start with an initial state: +The pattern API allows you to quickly define complex pattern sequences that you want to extract +from your input stream. + +Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for +individual events with the same properties. These simple patterns are called **states**. A complex pattern +can be seen as a graph of such states, where transition from one state to the next happens based on user-specified +*conditions*, e.g. `event.getName().equals("start")`. A *match* is a sequence of input events which visit all +states of the complex pattern graph, through a sequence of valid state transitions. + +Attention Each state must have a unique name to identify the matched +events later on. + +Attention State names **CANNOT** contain the character `:`. + +In the remainder, we start by describing how to define [States](#states), before describing how you can +combine individual states into [Complex Patterns](#combining-states). + +### Individual States + +A **State** can be either a *singleton* state, or a *looping* one. Singleton states accept a single event, +while looping ones accept more than one. In pattern matching symbols, in the pattern `a b+ c? d` (or `a`, +followed by *one or more* `b`'s, optionally followed by a `c`, followed by a `d`), `a`, `c?`, and `d` are +singleton patterns, while `b+` is a looping one (see [Quantifiers](#quantifiers)). In addition, each state +can have one or more *conditions* based on which it accepts events (see [Conditions](#conditions)). + + Quantifiers + +In FlinkCEP, looping patterns can be specified using the methods: `pattern.oneOrMore()`, for states that expect one or +more occurrences of a given event (e.g. the `b+` mentioned previously), and `pattern.times(#ofTimes)` for states that +expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All states, looping or not, can be made +optional using the `pattern.optional()` method. For a state named `start`, the following are valid quantifiers: + + + + {% highlight java %} + // expecting 4 occurrences + start.times(4); + + // expecting 0 or 4 occurrences + start.times(4).optional(); + + // expecting 1 or more occurrences + start.oneOrMore(); + + // expecting 0 or more occurrences + start.oneOrMore().optional(); + {% endhighlight %} + + + + {% highlight scala %} + // expecting 4 occurrences + start.times(4) + + // expecting 0 or 4 occurrences + start.times(4).optional() + + // expecting 1 or more occurrences + start.oneOrMore() + + // expecting 0 or more occurrences + start.oneOrMore().optional() + {% endhighlight %} + + + + Conditions + +At every state, and in order to go from one state to the next, you can specify additional **conditions**. +These conditions can be related to: + + 1. a [property of the incoming event](#conditions-on-properties), e.g. its value should be larger than 5, + or larger than the average value of the previously accepted events. + + 2. the [contiguity of the matching events](#conditions-on-contiguity), e.g. detect pattern `a,b,c` without + non-matching events between any matching ones. + +The latter refers to "looping" states, i.e. states that can accept more than one event, e.g. the `b+` in `a b+ c`, +which searches for one or more `b`'s. + +# Conditions on Properties + +Conditions on the event properties can be specified via the `pattern.where()` method. These can be either +`IterativeCondition`s or `SimpleCondition`s. + +**Iterative Conditions:** This is the most general type of conditions. This allows to specify a condition that accepts +any subsequent event based on some statistic over a subset of the previously accepted events. --- End diff -- This is the most general type of condition.
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119887736 --- Diff: docs/dev/libs/cep.md --- @@ -511,133 +849,81 @@ val start = Pattern.begin[Event]("start") -Next +next() -Appends a new pattern state. A matching event has to directly succeed the previous matching event: +Appends a new pattern state. A matching event has to directly succeed the previous matching event +(strict contiguity): {% highlight scala %} val next = start.next("middle") {% endhighlight %} -FollowedBy +followedBy() -Appends a new pattern state. Other events can occur between a matching event and the previous matching event: +Appends a new pattern state. Other events can occur between a matching event and the previous +matching event (relaxed contiguity) : {% highlight scala %} val followedBy = start.followedBy("middle") {% endhighlight %} -Where - -Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state: -{% highlight scala %} -patternState.where(event => ... /* some condition */) -{% endhighlight %} - - - -Or - -Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state: -{% highlight scala %} -patternState.where(event => ... /* some condition */) -.or(event => ... /* alternative condition */) -{% endhighlight %} - +followedByAny() + +Appends a new pattern state. Other events can occur between a matching event and the previous +matching event and alternative matches will be presented for every alternative matching event --- End diff -- add a comma: ... matching event, and alternative matches will be ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4041: [FLINK-6198] [cep] Update CEP documentation.
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4041#discussion_r119891095 --- Diff: docs/dev/libs/cep.md --- @@ -246,63 +334,399 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition -Next, we can append further states to detect complex patterns. -We can control the contiguity of two succeeding events to be accepted by the pattern. +# Conditions on Contiguity -Strict contiguity means that two matching events have to be directly the one after the other. -This means that no other events can occur in between. -A strict contiguity pattern state can be created via the `next` method. +FlinkCEP supports the following forms of contiguity between events: + + 1. Strict Contiguity: which expects all matching events to appear strictly the one after the other, + without any non-matching events in-between. + + 2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones. + + 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity by also creating alternative + matches which ignore also matching events. + +To illustrate the above with an example, a pattern sequence `a+ b` (one or more `a`s followed by a `b`) with +input `a1, c, a2, b` will have the following results: + + 1. Strict Contiguity: `a2 b` because there is `c` `a1` and `a2` so `a1` is discarded. + + 2. Relaxed Contiguity: `a1 b` and `a1 a2 b`, as `c` will get simply ignored. + + 3. Non-Deterministic Relaxed Contiguity: `a1 b`, `a2 b` and `a1 a2 b`. + +For looping states (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want +strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want +*non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. + + + +Pattern Operation +Description + + + + +where(condition) + +Defines a condition for the current state. Only if an event satisifes the condition, +it can match the state. Multiple consecutive where() clauses lead to their condtions being --- End diff -- To match the state, an event must satisfy the condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---