buildbot success in on flink-docs-release-0.9
The Buildbot has detected a restored build on builder flink-docs-release-0.9 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.9/builds/420 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' triggered this build Build Source Stamp: [branch release-0.9] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/305 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-master
The Buildbot has detected a restored build on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/430 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave1_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
flink git commit: [FLINK-4431] [core] Introduce a "VisibleForTesting" annotation.
Repository: flink Updated Branches: refs/heads/master 7e8de772b -> 59eb4332f [FLINK-4431] [core] Introduce a "VisibleForTesting" annotation. This annotation documents methods/fields that are not private because tests need them, but should not be called by any non-testing code. This closes #2390 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59eb4332 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59eb4332 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59eb4332 Branch: refs/heads/master Commit: 59eb4332f31204b118fe95d56bcf3893ae705866 Parents: 7e8de77 Author: Stephan Ewen Authored: Fri Aug 19 12:16:09 2016 +0200 Committer: Stephan Ewen Committed: Fri Aug 19 23:52:45 2016 +0200 -- .../flink/annotation/VisibleForTesting.java | 36 1 file changed, 36 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/59eb4332/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java -- diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java new file mode 100644 index 000..8f945a9 --- /dev/null +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.flink.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * This annotations declares that a function, field, constructor, or entire type, is only visible for + * testing purposes. + * + * This annotation is typically attached when for example a method should be {@code private} + * (because it is not intended to be called externally), but cannot be declared private, because + * some tests need to have access to it. + */ +@Documented +@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR }) +@Internal +public @interface VisibleForTesting {}
flink git commit: [FLINK-4431] [core] Introduce a "VisibleForTesting" annotation.
Repository: flink Updated Branches: refs/heads/flip-6 682ebdb57 -> ed26ab044 [FLINK-4431] [core] Introduce a "VisibleForTesting" annotation. This annotation documents methods/fields that are not private because tests need them, but should not be called by any non-testing code. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed26ab04 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed26ab04 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed26ab04 Branch: refs/heads/flip-6 Commit: ed26ab04459bf8baea11af3c6a5733a4a8a92655 Parents: 682ebdb Author: Stephan Ewen Authored: Fri Aug 19 12:16:09 2016 +0200 Committer: Stephan Ewen Committed: Fri Aug 19 23:48:43 2016 +0200 -- .../flink/annotation/VisibleForTesting.java | 36 1 file changed, 36 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ed26ab04/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java -- diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java new file mode 100644 index 000..8f945a9 --- /dev/null +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.flink.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * This annotations declares that a function, field, constructor, or entire type, is only visible for + * testing purposes. + * + * This annotation is typically attached when for example a method should be {@code private} + * (because it is not intended to be called externally), but cannot be declared private, because + * some tests need to have access to it. + */ +@Documented +@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR }) +@Internal +public @interface VisibleForTesting {}
[2/2] flink git commit: [FLINK-4282] Add Offset Parameter to WindowAssigners
[FLINK-4282] Add Offset Parameter to WindowAssigners Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09774626 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09774626 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09774626 Branch: refs/heads/master Commit: 09774626086564b184abbe09776d3b7033badd20 Parents: 3be9a28 Author: renkai Authored: Thu Aug 11 18:48:50 2016 +0800 Committer: Aljoscha Krettek Committed: Fri Aug 19 18:23:38 2016 +0200 -- .../assigners/SlidingEventTimeWindows.java | 34 +++- .../assigners/SlidingProcessingTimeWindows.java | 34 +++- .../windowing/assigners/SlidingTimeWindows.java | 2 +- .../assigners/TumblingEventTimeWindows.java | 35 +++- .../TumblingProcessingTimeWindows.java | 34 +++- .../assigners/TumblingTimeWindows.java | 2 +- .../api/windowing/windows/TimeWindow.java | 12 ++ .../operators/windowing/TimeWindowTest.java | 59 +++ .../operators/windowing/WindowOperatorTest.java | 175 +++ 9 files changed, 370 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index 8fd0d25..16171a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -52,16 +52,19 @@ public class SlidingEventTimeWindows extends WindowAssigner private final long slide; - protected SlidingEventTimeWindows(long size, long slide) { + private final long offset; + + protected SlidingEventTimeWindows(long size, long slide, long offset) { this.size = size; this.slide = slide; + this.offset = offset; } @Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List windows = new ArrayList<>((int) (size / slide)); - long lastStart = timestamp - timestamp % slide; + long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { @@ -102,7 +105,32 @@ public class SlidingEventTimeWindows extends WindowAssigner * @return The time policy. */ public static SlidingEventTimeWindows of(Time size, Time slide) { - return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds()); + return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); + } + + /** +* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns +* elements to time windows based on the element timestamp and offset. +* +* For example, if you want window a stream by hour,but window begins at the 15th minutes +* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get +* time windows start at 0:15:00,1:15:00,2:15:00,etc. +* +* +* +* Rather than that,if you are living in somewhere which is not using UTC±00:00 time, +* such as China which is using UTC+08:00,and you want a time window with size of one day, +* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. +* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. +* +* @param size The size of the generated windows. +* @param slide The slide interval of the generated windows. +* @param offset The offset which window start would be shifted by. +* @return The time policy. +*/ + public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) { + return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), + offset.toMilliseconds() % slide.toMilliseconds()); } @Override http://git-wip
[1/2] flink git commit: [FLINK-4282] Add doc for WindowAssigner offset parameter
Repository: flink Updated Branches: refs/heads/master 3be9a2851 -> 7e8de772b [FLINK-4282] Add doc for WindowAssigner offset parameter This closes #2333 This closes #2355 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e8de772 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e8de772 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e8de772 Branch: refs/heads/master Commit: 7e8de772b00bbdaae5f606d51f3037c0ae5f8aae Parents: 0977462 Author: Aljoscha Krettek Authored: Fri Aug 19 15:41:08 2016 +0200 Committer: Aljoscha Krettek Committed: Fri Aug 19 18:23:38 2016 +0200 -- docs/apis/streaming/windows.md | 36 1 file changed, 36 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7e8de772/docs/apis/streaming/windows.md -- diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md index 7a93723..b9847a5 100644 --- a/docs/apis/streaming/windows.md +++ b/docs/apis/streaming/windows.md @@ -252,6 +252,42 @@ input Note, how we can specify a time interval by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, `Time.minutes(x)`, and so on. +The time-based window assigners also take an optional `offset` parameter that can be used to +change the alignment of windows. For example, without offsets hourly windows are aligned +with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` and so on. If you +want to change that you can give an offset. With an offset of 15 minutes you would, for example, +get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets is when you +want to have daily windows and live in a timezone other than UTC-0. For example, in China +you would have to specify an offset of `Time.hours(-8)`. + +This example shows how an offset can be specified for tumbling event time windows (the other +windows work accordingly): + + +{% highlight java %} +DataStream input = ...; + +// tumbling event-time windows +input +.keyBy() +.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) +.(); +{% endhighlight %} + + + +{% highlight scala %} +val input: DataStream[T] = ... + +// tumbling event-time windows +input +.keyBy() +.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) +.() +{% endhighlight %} + + + ## Window Functions The *window function* is used to process the elements of each window (and key) once the system
flink git commit: [FLINK-4402] Changed the documentation for the metrics in the System Scope Section
Repository: flink Updated Branches: refs/heads/master b7ae3e533 -> 3be9a2851 [FLINK-4402] Changed the documentation for the metrics in the System Scope Section This closes #2382 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be9a285 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be9a285 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be9a285 Branch: refs/heads/master Commit: 3be9a2851ba616dba60b9b8d74f499b7db02debf Parents: b7ae3e5 Author: Neelesh Srinivas Salian Authored: Wed Aug 17 13:30:06 2016 -0500 Committer: Robert Metzger Committed: Fri Aug 19 16:35:33 2016 +0200 -- docs/apis/metrics.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3be9a285/docs/apis/metrics.md -- diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md index 3e32ebd..1cc7a29 100644 --- a/docs/apis/metrics.md +++ b/docs/apis/metrics.md @@ -196,10 +196,10 @@ Each of these keys expect a format string that may contain constants (e.g. "task - `metrics.scope.tm.job` - Default:.taskmanager. . - Applied to all metrics that were scoped to a task manager and job. -- `metrics.scope.tm.task` +- `metrics.scope.task` - Default: .taskmanager. . . . - Applied to all metrics that were scoped to a task. -- `metrics.scope.tm.operator` +- `metrics.scope.operator` - Default: .taskmanager. . . . - Applied to all metrics that were scoped to an operator. @@ -209,7 +209,7 @@ The default scope for operator metrics will result in an identifier akin to `loc If you also want to include the task name but omit the task manager information you can specify the following format: -`metrics.scope.tm.operator: ` +`metrics.scope.operator: ` This could create the identifier `localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric`.
flink git commit: Add ManualWindowSpeedITCase For Assessing State Performance
Repository: flink Updated Branches: refs/heads/master f0fef6f44 -> b7ae3e533 Add ManualWindowSpeedITCase For Assessing State Performance This should be used to test whether there are any obvious performance regressions between releases. Somewhat similar to the other manual tests that have @Ignore set these have to be run manually. (for now) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7ae3e53 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7ae3e53 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7ae3e53 Branch: refs/heads/master Commit: b7ae3e53382258d9f811b6c34cd0df9564b00370 Parents: f0fef6f Author: Aljoscha Krettek Authored: Wed Aug 17 14:24:51 2016 +0200 Committer: Aljoscha Krettek Committed: Fri Aug 19 16:20:24 2016 +0200 -- .../test/state/ManualWindowSpeedITCase.java | 260 +++ 1 file changed, 260 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b7ae3e53/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java new file mode 100644 index 000..428c47c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.state; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * A collection of manual tests that serve to assess the performance of windowed operations. These + * run in local mode with parallelism 1 with a source that emits data as fast as possible. Thus, + * these mostly test the performance of the state backend. + * + * When doing a release we should manually run theses tests on the version that is to be released + * and on older version to see if there are performance regressions. + * + * When a test is executed it will output how many elements of key {@code "Tuple 0"} have + * been processed in each window. This gives an estimate of the throughput. + */ +@Ignore +public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testTumblingIngestionTimeWindowsWithFsBackend() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setParallelism(1); + + String checkpoints = tempFolder.newFolder().toURI().toString(); + env.setStateBackend(new FsStateBackend(checkpoints)); + + env.addSource(new InfiniteTupleSource(10_000)) + .keyBy(0) + .timeWindow(Time.seconds(3)) + .reduce(new ReduceFunction>() { + private static final long serialVersionUID = 1L; + +
[2/2] flink git commit: [FLINK-3319] [cep] Add documentation for or function
[FLINK-3319] [cep] Add documentation for or function Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0fef6f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0fef6f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0fef6f4 Branch: refs/heads/master Commit: f0fef6f44a53baf24ab38f1392eb321462d3f4fa Parents: 266c76b Author: Till Rohrmann Authored: Fri Aug 19 15:47:34 2016 +0200 Committer: Till Rohrmann Committed: Fri Aug 19 15:47:34 2016 +0200 -- docs/apis/streaming/libs/cep.md | 56 .../flink/cep/scala/pattern/Pattern.scala | 11 .../org/apache/flink/cep/pattern/Pattern.java | 12 ++--- 3 files changed, 73 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f0fef6f4/docs/apis/streaming/libs/cep.md -- diff --git a/docs/apis/streaming/libs/cep.md b/docs/apis/streaming/libs/cep.md index b313451..ef35d32 100644 --- a/docs/apis/streaming/libs/cep.md +++ b/docs/apis/streaming/libs/cep.md @@ -178,6 +178,33 @@ As it can be seen here, the subtype condition can also be combined with an addit In fact you can always provide multiple conditions by calling `where` and `subtype` multiple times. These conditions will then be combined using the logical AND operator. +In order to construct or conditions, one has to call the `or` method with a respective filter function. +Any existing filter function is then ORed with the given one. + + + +{% highlight java %} +pattern.where(new FilterFunction() { +@Override +public boolean filter(Event value) { +return ... // some condition +} +}).or(new FilterFunction() { +@Override +public boolean filter(Event value) { +return ... // or condition +} +}); +{% endhighlight %} + + + +{% highlight scala %} +pattern.where(event => ... /* some condition */).or(event => ... /* or condition */) +{% endhighlight %} + + + Next, we can append further states to detect complex patterns. We can control the contiguity of two succeeding events to be accepted by the pattern. @@ -285,6 +312,25 @@ patternState.where(new FilterFunction() { {% endhighlight %} + +Or + +Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state: +{% highlight java %} +patternState.where(new FilterFunction() { +@Override +public boolean filter(Event value) throws Exception { +return ... // some condition +} +}).or(new FilterFunction() { +@Override +public boolean filter(Event value) throws Exception { +return ... // alternative condition +} +}); +{% endhighlight %} + + Subtype @@ -352,6 +398,16 @@ patternState.where(event => ... /* some condition */) {% endhighlight %} + +Or + +Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state: +{% highlight scala %} +patternState.where(event => ... /* some condition */) +.or(event => ... /* alternative condition */) +{% endhighlight %} + + Subtype http://git-wip-us.apache.org/repos/asf/flink/blob/f0fef6f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala -- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index 592599c..cc3b03c 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -127,6 +127,17 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { } /** +* Specifies a filter condition which is ORed with an existing filter function. +* +* @param filter Or filter function +* @return The same pattern operator where the new filter condition is set +*/ + def or(filter: FilterFunction[F]): Pattern[T, F] = { +jPattern.or(filter) +this + } + + /** * Specifies a filter condition which has to be fulfilled by an event in order to be matched. * * @param filterFun Filter condition http://git-wip-us.apache.org/repos/asf/flink/blob/f0fef6f4/flink-libraries/flink-cep/src/main/java/org/apac
[1/2] flink git commit: [FLINK-3319] [cep] Add or function to CEP's pattern api
Repository: flink Updated Branches: refs/heads/master 7dbcffb90 -> f0fef6f44 [FLINK-3319] [cep] Add or function to CEP's pattern api This closes #2171. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/266c76b5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/266c76b5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/266c76b5 Branch: refs/heads/master Commit: 266c76b55e410965d7a5332acd3b2a0dfd3b24bb Parents: 7dbcffb Author: Bob Thorman Authored: Fri Aug 19 15:23:03 2016 +0200 Committer: Till Rohrmann Committed: Fri Aug 19 15:26:02 2016 +0200 -- .../flink/cep/pattern/OrFilterFunction.java | 52 .../org/apache/flink/cep/pattern/Pattern.java | 18 ++ .../java/org/apache/flink/cep/CEPITCase.java| 65 .../apache/flink/cep/pattern/PatternTest.java | 42 + 4 files changed, 177 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java new file mode 100644 index 000..c42ecb1 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A filter function which combines two filter functions with a logical or. Thus, the filter + * function only returns true, iff at least one of the filter functions holds true. + * + * @param Type of the element to filter + */ +public class OrFilterFunction implements FilterFunction { + private static final long serialVersionUID = -2109562093871155005L; + + private final FilterFunction left; + private final FilterFunction right; + + public OrFilterFunction(final FilterFunction left, final FilterFunction right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value) throws Exception { + return left.filter(value) || right.filter(value); + } + + public FilterFunction getLeft() { + return left; + } + + public FilterFunction getRight() { + return right; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 696518e..14aed5d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -93,6 +93,24 @@ public class Pattern { } /** +* Specifies a filter condition if fulfilled by an event will match. +* +* @param newFilterFunction Filter condition +* @return The same pattern operator where the new filter condition is set +*/ + public Pattern or(FilterFunction newFilterFunction) { + ClosureCleaner.clean(newFilterFunction, true); + + if (this.filterFunction == null) { + this.filterFunction = newFilterFunction; + } else { + this.filterFunction = new OrFilterFunction<>(this.filterFunction, newFilterFunction); + } + + return this; + } + + /** * Applies a subtype constraint on the current pattern
[1/2] flink git commit: [FLINK-4021] [network] Consume staged buffers on shutdown
Repository: flink Updated Branches: refs/heads/master 5d7f88031 -> 7dbcffb90 [FLINK-4021] [network] Consume staged buffers on shutdown If we have staged buffers and one consuming channel is closed, all others will be skipped and auto read will not be set back to true. This is currently not a problem, because failures bring down the whole job, but it would become an issue with partial recovery. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6615ee70 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6615ee70 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6615ee70 Branch: refs/heads/master Commit: 6615ee70c677914e5b8c6f251ec0b93fe0991472 Parents: 5d7f880 Author: æ·æ± Authored: Tue Jun 21 17:13:37 2016 +0800 Committer: Ufuk Celebi Committed: Fri Aug 19 12:23:33 2016 +0200 -- .../netty/PartitionRequestClientHandler.java | 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6615ee70/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 953405f..52775d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty; import com.google.common.collect.Maps; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; - import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -161,7 +160,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) { - decodeMsg(msg); + decodeMsg(msg, false); } else { stagedMessages.add(msg); @@ -201,7 +200,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { super.channelReadComplete(ctx); } - private boolean decodeMsg(Object msg) throws Throwable { + private boolean decodeMsg(Object msg, boolean isStagedBuffer) throws Throwable { final Class msgClazz = msg.getClass(); // Buffer @@ -217,7 +216,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { return true; } - return decodeBufferOrEvent(inputChannel, bufferOrEvent); + return decodeBufferOrEvent(inputChannel, bufferOrEvent, isStagedBuffer); } // Error - else if (msgClazz == NettyMessage.ErrorResponse.class) { @@ -252,7 +251,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { return true; } - private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable { + private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent, boolean isStagedBuffer) throws Throwable { boolean releaseNettyBuffer = true; try { @@ -269,10 +268,9 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { BufferProvider bufferProvider = inputChannel.getBufferProvider(); if (bufferProvider == null) { - + // receiver has been cancelled/failed cancelRequestFor(bufferOrEvent.receiverId); - - return false; // receiver has been cancelled/failed + return isStagedBuffer; } while (true) { @@ -292,7 +290,7 @@ class PartitionRequestClientHandle
[2/2] flink git commit: [FLINK-4021] [network] Add test for staged buffers auto read behaviour
[FLINK-4021] [network] Add test for staged buffers auto read behaviour This closes #2141. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbcffb9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbcffb9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbcffb9 Branch: refs/heads/master Commit: 7dbcffb901a2a0e1b64b9ae78977726febe2 Parents: 6615ee7 Author: Ufuk Celebi Authored: Mon Aug 15 15:59:09 2016 +0200 Committer: Ufuk Celebi Committed: Fri Aug 19 12:28:15 2016 +0200 -- .../runtime/io/network/netty/NettyMessage.java | 2 +- .../PartitionRequestClientHandlerTest.java | 86 .../runtime/testutils/DiscardingRecycler.java | 2 + 3 files changed, 89 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index 3a24181..2b03f1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -192,7 +192,7 @@ abstract class NettyMessage { buffer = null; } - BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) { + public BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) { this.buffer = buffer; this.sequenceNumber = sequenceNumber; this.receiverId = receiverId; http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java index 2c08cc5..26d791f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java @@ -22,6 +22,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.flink.core.memory.HeapMemorySegment; +import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; @@ -31,11 +34,17 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.apache.flink.runtime.testutils.DiscardingRecycler; import org.apache.flink.runtime.util.event.EventListener; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -158,8 +167,85 @@ public class PartitionRequestClientHandlerTest { client.cancelRequestFor(inputChannel.getInputChannelId()); } + /** +* Tests that an unsuccessful message decode call for a staged message +* does not leave the channel with auto read set to false. +*/ + @Test + @SuppressWarnings("unchecked") + public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception { + PartitionRequestClientHandler handler = new PartitionRequestClientHandler(); + EmbeddedChannel channel = new EmbeddedChannel(handler); + + final AtomicReference> listener = new AtomicReference<>(); + + BufferProvider bufferProvider = mock(BufferProvider.class); +
flink git commit: [FLINK-4414] [cluster] Add getAddress method to RpcGateway
Repository: flink Updated Branches: refs/heads/flip-6 b7259d617 -> 682ebdb57 [FLINK-4414] [cluster] Add getAddress method to RpcGateway The RpcGateway.getAddress method allows to retrieve the fully qualified address of the associated RpcEndpoint. This closes #2392. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/682ebdb5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/682ebdb5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/682ebdb5 Branch: refs/heads/flip-6 Commit: 682ebdb5744b01a4d126ce4a66846c12588a1714 Parents: b7259d6 Author: Till Rohrmann Authored: Thu Aug 18 16:34:47 2016 +0200 Committer: Till Rohrmann Committed: Fri Aug 19 13:52:57 2016 +0200 -- .../apache/flink/runtime/rpc/RpcEndpoint.java | 6 +- .../apache/flink/runtime/rpc/RpcGateway.java| 7 +++ .../apache/flink/runtime/rpc/RpcService.java| 11 -- .../runtime/rpc/akka/AkkaInvocationHandler.java | 14 +++-- .../flink/runtime/rpc/akka/AkkaRpcService.java | 21 ++-- .../runtime/rpc/akka/AkkaRpcActorTest.java | 16 +++ 6 files changed, 42 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/682ebdb5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index a28bc14..7b3f8a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -69,9 +69,6 @@ public abstract class RpcEndpoint { /** Self gateway which can be used to schedule asynchronous calls on yourself */ private final C self; - /** the fully qualified address of the this RPC endpoint */ - private final String selfAddress; - /** The main thread execution context to be used to execute future callbacks in the main thread * of the executing rpc server. */ private final ExecutionContext mainThreadExecutionContext; @@ -92,7 +89,6 @@ public abstract class RpcEndpoint { this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass()); this.self = rpcService.startServer(this); - this.selfAddress = rpcService.getAddress(self); this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); } @@ -156,7 +152,7 @@ public abstract class RpcEndpoint { * @return Fully qualified address of the underlying RPC endpoint */ public String getAddress() { - return selfAddress; + return self.getAddress(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/682ebdb5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java index e3a16b4..81075ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java @@ -22,4 +22,11 @@ package org.apache.flink.runtime.rpc; * Rpc gateway interface which has to be implemented by Rpc gateways. */ public interface RpcGateway { + + /** +* Returns the fully qualified address under which the associated rpc endpoint is reachable. +* +* @return Fully qualified address under which the associated rpc endpoint is reachable +*/ + String getAddress(); } http://git-wip-us.apache.org/repos/asf/flink/blob/682ebdb5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index fabdb05..bc0f5cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -65,17 +65,6 @@ public interface RpcService { void stopService(); /** -* Get the fully qualified address of the underlying rpc server represented by the self gateway. -* It must be possible to connect from a remote host to the rpc server via the returned fully -* qualified address. -* -* @param
[1/2] flink git commit: [FLINK-4322] [checkpointing] Ignore minimum delay for savepoints
Repository: flink Updated Branches: refs/heads/master 0d53daa2f -> 5d7f88031 [FLINK-4322] [checkpointing] Ignore minimum delay for savepoints Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8854d75c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8854d75c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8854d75c Branch: refs/heads/master Commit: 8854d75c13a6e7a593baf6d0d5ae4f47c90718e3 Parents: 0d53daa Author: Ramkrishna Authored: Thu Aug 18 17:27:41 2016 +0530 Committer: Ufuk Celebi Committed: Fri Aug 19 12:03:44 2016 +0200 -- .../checkpoint/CheckpointCoordinator.java | 20 +++- .../checkpoint/CheckpointCoordinatorTest.java | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 2a1ece0..ff54bad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -333,17 +333,19 @@ public class CheckpointCoordinator { } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } - } - // make sure the minimum interval between checkpoints has passed - if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) { - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(); - currentPeriodicTrigger = null; + // make sure the minimum interval between checkpoints has passed + if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + ScheduledTrigger trigger = new ScheduledTrigger(); + // Reassign the new trigger to the currentPeriodicTrigger + currentPeriodicTrigger = trigger; + timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval); + return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } - ScheduledTrigger trigger = new ScheduledTrigger(); - timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval); - return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 3341095..f243803 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -1408,7 +1408,7 @@ public class CheckpointCoordinatorTest { assertTrue(coord.triggerCheckpoint(timestamp + 3)); assertEquals(2, coord.getNumberOfPendingCheckpoints()); - Future savepointFuture2 = coord.triggerSavepoint(timestamp); + Future savepointFuture2 = coord.triggerSavepoint(timestamp + 4); long savepointId2 = counter.getLast(); assertEquals(3, coord.getNumberOfPendingCheckpoints());
[2/2] flink git commit: [FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest
[FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest The added tests check that savepoints ignore the maximum number of concurrent checkpoints and minimum delay between checkpoints. This closes #2385. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d7f8803 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d7f8803 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d7f8803 Branch: refs/heads/master Commit: 5d7f8803155d2eb8865cce9a60dd677c2400261c Parents: 8854d75 Author: Ufuk Celebi Authored: Fri Aug 19 12:01:24 2016 +0200 Committer: Ufuk Celebi Committed: Fri Aug 19 12:04:12 2016 +0200 -- .../checkpoint/CheckpointCoordinatorTest.java | 90 +++- 1 file changed, 89 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5d7f8803/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index f243803..09c53d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -36,6 +36,7 @@ import org.mockito.stubbing.Answer; import scala.concurrent.Future; import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1646,7 +1647,94 @@ public class CheckpointCoordinatorTest { fail(e.getMessage()); } } - + + /** +* Tests that the savepoints can be triggered concurrently. +*/ + @Test + public void testConcurrentSavepoints() throws Exception { + JobID jobId = new JobID(); + + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + + StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jobId, + 10, + 20, + 0L, + 1, // max one checkpoint at a time => should not affect savepoints + 42, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + cl, + checkpointIDCounter, + new StandaloneCompletedCheckpointStore(2, cl), + new HeapSavepointStore(), + new DisabledCheckpointStatsTracker()); + + List> savepointFutures = new ArrayList<>(); + + int numSavepoints = 5; + + // Trigger savepoints + for (int i = 0; i < numSavepoints; i++) { + savepointFutures.add(coord.triggerSavepoint(i)); + } + + // After triggering multiple savepoints, all should in progress + for (Future savepointFuture : savepointFutures) { + assertFalse(savepointFuture.isCompleted()); + } + + // ACK all savepoints + long checkpointId = checkpointIDCounter.getLast(); + for (int i = 0; i < numSavepoints; i++, checkpointId--) { + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId)); + } + + // After ACKs, all should be completed + for (Future savepointFuture : savepointFutures) { + assertTrue(savepointFuture.isCompleted()); + } + } + + /** +* Tests that no minimum delay between savepoints is enforced. +*/ + @Test + public void testMinDelayBetweenSavepoints() throws Exception { + JobID jobId = new JobID(); + + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jobId, + 10, + 20, +