buildbot success in on flink-docs-release-0.9

2016-08-19 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.9 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.9/builds/420

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' 
triggered this build
Build Source Stamp: [branch release-0.9] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-release-0.10

2016-08-19 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/305

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-master

2016-08-19 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/430

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave1_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





flink git commit: [FLINK-4431] [core] Introduce a "VisibleForTesting" annotation.

2016-08-19 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 7e8de772b -> 59eb4332f


[FLINK-4431] [core] Introduce a "VisibleForTesting" annotation.

This annotation documents methods/fields that are not private because tests 
need them,
but should not be called by any non-testing code.

This closes #2390


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59eb4332
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59eb4332
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59eb4332

Branch: refs/heads/master
Commit: 59eb4332f31204b118fe95d56bcf3893ae705866
Parents: 7e8de77
Author: Stephan Ewen 
Authored: Fri Aug 19 12:16:09 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Aug 19 23:52:45 2016 +0200

--
 .../flink/annotation/VisibleForTesting.java | 36 
 1 file changed, 36 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/59eb4332/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
--
diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
 
b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
new file mode 100644
index 000..8f945a9
--- /dev/null
+++ 
b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.flink.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * This annotations declares that a function, field, constructor, or entire 
type, is only visible for
+ * testing purposes.
+ * 
+ * This annotation is typically attached when for example a method should 
be {@code private}
+ * (because it is not intended to be called externally), but cannot be 
declared private, because
+ * some tests need to have access to it.
+ */
+@Documented
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, 
ElementType.CONSTRUCTOR })
+@Internal
+public @interface VisibleForTesting {}



flink git commit: [FLINK-4431] [core] Introduce a "VisibleForTesting" annotation.

2016-08-19 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/flip-6 682ebdb57 -> ed26ab044


[FLINK-4431] [core] Introduce a "VisibleForTesting" annotation.

This annotation documents methods/fields that are not private because tests 
need them,
but should not be called by any non-testing code.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed26ab04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed26ab04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed26ab04

Branch: refs/heads/flip-6
Commit: ed26ab04459bf8baea11af3c6a5733a4a8a92655
Parents: 682ebdb
Author: Stephan Ewen 
Authored: Fri Aug 19 12:16:09 2016 +0200
Committer: Stephan Ewen 
Committed: Fri Aug 19 23:48:43 2016 +0200

--
 .../flink/annotation/VisibleForTesting.java | 36 
 1 file changed, 36 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ed26ab04/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
--
diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
 
b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
new file mode 100644
index 000..8f945a9
--- /dev/null
+++ 
b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.flink.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * This annotations declares that a function, field, constructor, or entire 
type, is only visible for
+ * testing purposes.
+ * 
+ * This annotation is typically attached when for example a method should 
be {@code private}
+ * (because it is not intended to be called externally), but cannot be 
declared private, because
+ * some tests need to have access to it.
+ */
+@Documented
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, 
ElementType.CONSTRUCTOR })
+@Internal
+public @interface VisibleForTesting {}



[2/2] flink git commit: [FLINK-4282] Add Offset Parameter to WindowAssigners

2016-08-19 Thread aljoscha
[FLINK-4282] Add Offset Parameter to WindowAssigners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09774626
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09774626
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09774626

Branch: refs/heads/master
Commit: 09774626086564b184abbe09776d3b7033badd20
Parents: 3be9a28
Author: renkai 
Authored: Thu Aug 11 18:48:50 2016 +0800
Committer: Aljoscha Krettek 
Committed: Fri Aug 19 18:23:38 2016 +0200

--
 .../assigners/SlidingEventTimeWindows.java  |  34 +++-
 .../assigners/SlidingProcessingTimeWindows.java |  34 +++-
 .../windowing/assigners/SlidingTimeWindows.java |   2 +-
 .../assigners/TumblingEventTimeWindows.java |  35 +++-
 .../TumblingProcessingTimeWindows.java  |  34 +++-
 .../assigners/TumblingTimeWindows.java  |   2 +-
 .../api/windowing/windows/TimeWindow.java   |  12 ++
 .../operators/windowing/TimeWindowTest.java |  59 +++
 .../operators/windowing/WindowOperatorTest.java | 175 +++
 9 files changed, 370 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 8fd0d25..16171a0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -52,16 +52,19 @@ public class SlidingEventTimeWindows extends 
WindowAssigner
 
private final long slide;
 
-   protected SlidingEventTimeWindows(long size, long slide) {
+   private final long offset;
+
+   protected SlidingEventTimeWindows(long size, long slide, long offset) {
this.size = size;
this.slide = slide;
+   this.offset = offset;
}
 
@Override
public Collection assignWindows(Object element, long 
timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List windows = new ArrayList<>((int) (size 
/ slide));
-   long lastStart = timestamp - timestamp % slide;
+   long lastStart = 
TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
@@ -102,7 +105,32 @@ public class SlidingEventTimeWindows extends 
WindowAssigner
 * @return The time policy.
 */
public static SlidingEventTimeWindows of(Time size, Time slide) {
-   return new SlidingEventTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds());
+   return new SlidingEventTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(), 0);
+   }
+
+   /**
+*  Creates a new {@code SlidingEventTimeWindows} {@link 
WindowAssigner} that assigns
+*  elements to time windows based on the element timestamp and offset.
+*
+* For example, if you want window a stream by hour,but window 
begins at the 15th minutes
+* of each hour, you can use {@code 
of(Time.hours(1),Time.minutes(15))},then you will get
+* time windows start at 0:15:00,1:15:00,2:15:00,etc.
+*
+*
+* 
+* Rather than that,if you are living in somewhere which is not 
using UTC±00:00 time,
+* such as China which is using UTC+08:00,and you want a time 
window with size of one day,
+* and window begins at every 00:00:00 of local time,you may use 
{@code of(Time.days(1),Time.hours(-8))}.
+* The parameter of offset is {@code Time.hours(-8))} since 
UTC+08:00 is 8 hours earlier than UTC time.
+* 
+* @param size The size of the generated windows.
+* @param slide  The slide interval of the generated windows.
+* @param offset The offset which window start would be shifted by.
+* @return The time policy.
+*/
+   public static SlidingEventTimeWindows of(Time size, Time slide, Time 
offset) {
+   return new SlidingEventTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(),
+   offset.toMilliseconds() % slide.toMilliseconds());
}
 
@Override

http://git-wip

[1/2] flink git commit: [FLINK-4282] Add doc for WindowAssigner offset parameter

2016-08-19 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 3be9a2851 -> 7e8de772b


[FLINK-4282] Add doc for WindowAssigner offset parameter

This closes #2333
This closes #2355


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e8de772
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e8de772
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e8de772

Branch: refs/heads/master
Commit: 7e8de772b00bbdaae5f606d51f3037c0ae5f8aae
Parents: 0977462
Author: Aljoscha Krettek 
Authored: Fri Aug 19 15:41:08 2016 +0200
Committer: Aljoscha Krettek 
Committed: Fri Aug 19 18:23:38 2016 +0200

--
 docs/apis/streaming/windows.md | 36 
 1 file changed, 36 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7e8de772/docs/apis/streaming/windows.md
--
diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md
index 7a93723..b9847a5 100644
--- a/docs/apis/streaming/windows.md
+++ b/docs/apis/streaming/windows.md
@@ -252,6 +252,42 @@ input
 Note, how we can specify a time interval by using one of 
`Time.milliseconds(x)`, `Time.seconds(x)`,
 `Time.minutes(x)`, and so on.
 
+The time-based window assigners also take an optional `offset` parameter that 
can be used to
+change the alignment of windows. For example, without offsets hourly windows 
are aligned
+with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` 
and so on. If you
+want to change that you can give an offset. With an offset of 15 minutes you 
would, for example,
+get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets 
is when you
+want to have daily windows and live in a timezone other than UTC-0. For 
example, in China
+you would have to specify an offset of `Time.hours(-8)`.
+
+This example shows how an offset can be specified for tumbling event time 
windows (the other
+windows work accordingly):
+
+
+{% highlight java %}
+DataStream input = ...;
+
+// tumbling event-time windows
+input
+.keyBy()
+.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
+.();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val input: DataStream[T] = ...
+
+// tumbling event-time windows
+input
+.keyBy()
+.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
+.()
+{% endhighlight %}
+
+
+
 ## Window Functions
 
 The *window function* is used to process the elements of each window (and key) 
once the system



flink git commit: [FLINK-4402] Changed the documentation for the metrics in the System Scope Section

2016-08-19 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master b7ae3e533 -> 3be9a2851


[FLINK-4402] Changed the documentation for the metrics in the System Scope 
Section

This closes #2382


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be9a285
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be9a285
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be9a285

Branch: refs/heads/master
Commit: 3be9a2851ba616dba60b9b8d74f499b7db02debf
Parents: b7ae3e5
Author: Neelesh Srinivas Salian 
Authored: Wed Aug 17 13:30:06 2016 -0500
Committer: Robert Metzger 
Committed: Fri Aug 19 16:35:33 2016 +0200

--
 docs/apis/metrics.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3be9a285/docs/apis/metrics.md
--
diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md
index 3e32ebd..1cc7a29 100644
--- a/docs/apis/metrics.md
+++ b/docs/apis/metrics.md
@@ -196,10 +196,10 @@ Each of these keys expect a format string that may 
contain constants (e.g. "task
 - `metrics.scope.tm.job`
   - Default: .taskmanager..
   - Applied to all metrics that were scoped to a task manager and job.
-- `metrics.scope.tm.task`
+- `metrics.scope.task`
   - Default: 
.taskmanager....
- Applied to all metrics that were scoped to a task.
-- `metrics.scope.tm.operator`
+- `metrics.scope.operator`
   - Default: 
.taskmanager....
   - Applied to all metrics that were scoped to an operator.
 
@@ -209,7 +209,7 @@ The default scope for operator metrics will result in an 
identifier akin to `loc
 
 If you also want to include the task name but omit the task manager 
information you can specify the following format:
 
-`metrics.scope.tm.operator: 
`
+`metrics.scope.operator: 
`
 
 This could create the identifier 
`localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric`.
 



flink git commit: Add ManualWindowSpeedITCase For Assessing State Performance

2016-08-19 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master f0fef6f44 -> b7ae3e533


Add ManualWindowSpeedITCase For Assessing State Performance

This should be used to test whether there are any obvious performance
regressions between releases. Somewhat similar to the other manual tests
that have @Ignore set these have to be run manually. (for now)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7ae3e53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7ae3e53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7ae3e53

Branch: refs/heads/master
Commit: b7ae3e53382258d9f811b6c34cd0df9564b00370
Parents: f0fef6f
Author: Aljoscha Krettek 
Authored: Wed Aug 17 14:24:51 2016 +0200
Committer: Aljoscha Krettek 
Committed: Fri Aug 19 16:20:24 2016 +0200

--
 .../test/state/ManualWindowSpeedITCase.java | 260 +++
 1 file changed, 260 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b7ae3e53/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
new file mode 100644
index 000..428c47c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * A collection of manual tests that serve to assess the performance of 
windowed operations. These
+ * run in local mode with parallelism 1 with a source that emits data as fast 
as possible. Thus,
+ * these mostly test the performance of the state backend.
+ *
+ * When doing a release we should manually run theses tests on the version 
that is to be released
+ * and on older version to see if there are performance regressions.
+ *
+ * When a test is executed it will output how many elements of key {@code 
"Tuple 0"} have
+ * been processed in each window. This gives an estimate of the throughput.
+ */
+@Ignore
+public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase 
{
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Test
+   public void testTumblingIngestionTimeWindowsWithFsBackend() throws 
Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   env.setParallelism(1);
+
+   String checkpoints = tempFolder.newFolder().toURI().toString();
+   env.setStateBackend(new FsStateBackend(checkpoints));
+
+   env.addSource(new InfiniteTupleSource(10_000))
+   .keyBy(0)
+   .timeWindow(Time.seconds(3))
+   .reduce(new ReduceFunction>() {
+   private static final long 
serialVersionUID = 1L;
+
+   

[2/2] flink git commit: [FLINK-3319] [cep] Add documentation for or function

2016-08-19 Thread trohrmann
[FLINK-3319] [cep] Add documentation for or function


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0fef6f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0fef6f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0fef6f4

Branch: refs/heads/master
Commit: f0fef6f44a53baf24ab38f1392eb321462d3f4fa
Parents: 266c76b
Author: Till Rohrmann 
Authored: Fri Aug 19 15:47:34 2016 +0200
Committer: Till Rohrmann 
Committed: Fri Aug 19 15:47:34 2016 +0200

--
 docs/apis/streaming/libs/cep.md | 56 
 .../flink/cep/scala/pattern/Pattern.scala   | 11 
 .../org/apache/flink/cep/pattern/Pattern.java   | 12 ++---
 3 files changed, 73 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f0fef6f4/docs/apis/streaming/libs/cep.md
--
diff --git a/docs/apis/streaming/libs/cep.md b/docs/apis/streaming/libs/cep.md
index b313451..ef35d32 100644
--- a/docs/apis/streaming/libs/cep.md
+++ b/docs/apis/streaming/libs/cep.md
@@ -178,6 +178,33 @@ As it can be seen here, the subtype condition can also be 
combined with an addit
 In fact you can always provide multiple conditions by calling `where` and 
`subtype` multiple times.
 These conditions will then be combined using the logical AND operator.
 
+In order to construct or conditions, one has to call the `or` method with a 
respective filter function.
+Any existing filter function is then ORed with the given one.
+
+
+
+{% highlight java %}
+pattern.where(new FilterFunction() {
+@Override
+public boolean filter(Event value) {
+return ... // some condition
+}
+}).or(new FilterFunction() {
+@Override
+public boolean filter(Event value) {
+return ... // or condition
+}
+});
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+pattern.where(event => ... /* some condition */).or(event => ... /* or 
condition */)
+{% endhighlight %}
+
+
+
 Next, we can append further states to detect complex patterns.
 We can control the contiguity of two succeeding events to be accepted by the 
pattern.
 
@@ -285,6 +312,25 @@ patternState.where(new FilterFunction() {
 {% endhighlight %}
 
 
+
+Or
+
+Adds a new filter condition which is ORed with an existing 
filter condition. Only if an event passes the filter condition, it can match 
the state:
+{% highlight java %}
+patternState.where(new FilterFunction() {
+@Override
+public boolean filter(Event value) throws Exception {
+return ... // some condition
+}
+}).or(new FilterFunction() {
+@Override
+public boolean filter(Event value) throws Exception {
+return ... // alternative condition
+}
+});
+{% endhighlight %}
+
+

Subtype

@@ -352,6 +398,16 @@ patternState.where(event => ... /* some condition */)
 {% endhighlight %}
 
 
+
+Or
+
+Adds a new filter condition which is ORed with an existing 
filter condition. Only if an event passes the filter condition, it can match 
the state:
+{% highlight scala %}
+patternState.where(event => ... /* some condition */)
+.or(event => ... /* alternative condition */)
+{% endhighlight %}
+
+

Subtype


http://git-wip-us.apache.org/repos/asf/flink/blob/f0fef6f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
--
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 592599c..cc3b03c 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -127,6 +127,17 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+* Specifies a filter condition which is ORed with an existing filter 
function.
+*
+* @param filter Or filter function
+* @return The same pattern operator where the new filter condition is set
+*/
+  def or(filter: FilterFunction[F]): Pattern[T, F] = {
+jPattern.or(filter)
+this
+  }
+
+  /**
 * Specifies a filter condition which has to be fulfilled by an event in 
order to be matched.
 *
 * @param filterFun Filter condition

http://git-wip-us.apache.org/repos/asf/flink/blob/f0fef6f4/flink-libraries/flink-cep/src/main/java/org/apac

[1/2] flink git commit: [FLINK-3319] [cep] Add or function to CEP's pattern api

2016-08-19 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 7dbcffb90 -> f0fef6f44


[FLINK-3319] [cep] Add or function to CEP's pattern api

This closes #2171.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/266c76b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/266c76b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/266c76b5

Branch: refs/heads/master
Commit: 266c76b55e410965d7a5332acd3b2a0dfd3b24bb
Parents: 7dbcffb
Author: Bob Thorman 
Authored: Fri Aug 19 15:23:03 2016 +0200
Committer: Till Rohrmann 
Committed: Fri Aug 19 15:26:02 2016 +0200

--
 .../flink/cep/pattern/OrFilterFunction.java | 52 
 .../org/apache/flink/cep/pattern/Pattern.java   | 18 ++
 .../java/org/apache/flink/cep/CEPITCase.java| 65 
 .../apache/flink/cep/pattern/PatternTest.java   | 42 +
 4 files changed, 177 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
new file mode 100644
index 000..c42ecb1
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which combines two filter functions with a logical or. 
Thus, the filter
+ * function only returns true, iff at least one of the filter functions holds 
true.
+ *
+ * @param  Type of the element to filter
+ */
+public class OrFilterFunction implements FilterFunction {
+   private static final long serialVersionUID = -2109562093871155005L;
+
+   private final FilterFunction left;
+   private final FilterFunction right;
+
+   public OrFilterFunction(final FilterFunction left, final 
FilterFunction right) {
+   this.left = left;
+   this.right = right;
+   }
+
+   @Override
+   public boolean filter(T value) throws Exception {
+   return left.filter(value) || right.filter(value);
+   }
+
+   public FilterFunction getLeft() {
+   return left;
+   }
+
+   public FilterFunction getRight() {
+   return right;
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 696518e..14aed5d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -93,6 +93,24 @@ public class Pattern {
}
 
/**
+* Specifies a filter condition if fulfilled by an event will match.
+*
+* @param newFilterFunction Filter condition
+* @return The same pattern operator where the new filter condition is 
set
+*/
+   public Pattern or(FilterFunction newFilterFunction) {
+   ClosureCleaner.clean(newFilterFunction, true);
+
+   if (this.filterFunction == null) {
+   this.filterFunction = newFilterFunction;
+   } else {
+   this.filterFunction = new 
OrFilterFunction<>(this.filterFunction, newFilterFunction);
+   }
+
+   return this;
+   }
+
+   /**
 * Applies a subtype constraint on the current pattern 

[1/2] flink git commit: [FLINK-4021] [network] Consume staged buffers on shutdown

2016-08-19 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 5d7f88031 -> 7dbcffb90


[FLINK-4021] [network] Consume staged buffers on shutdown

If we have staged buffers and one consuming channel is closed,
all others will be skipped and auto read will not be set back
to true.

This is currently not a problem, because failures bring down the
whole job, but it would become an issue with partial recovery.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6615ee70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6615ee70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6615ee70

Branch: refs/heads/master
Commit: 6615ee70c677914e5b8c6f251ec0b93fe0991472
Parents: 5d7f880
Author: 淘江 
Authored: Tue Jun 21 17:13:37 2016 +0800
Committer: Ufuk Celebi 
Committed: Fri Aug 19 12:23:33 2016 +0200

--
 .../netty/PartitionRequestClientHandler.java  | 18 --
 1 file changed, 8 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6615ee70/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 953405f..52775d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
 import com.google.common.collect.Maps;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -161,7 +160,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
try {
if (!bufferListener.hasStagedBufferOrEvent() && 
stagedMessages.isEmpty()) {
-   decodeMsg(msg);
+   decodeMsg(msg, false);
}
else {
stagedMessages.add(msg);
@@ -201,7 +200,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
super.channelReadComplete(ctx);
}
 
-   private boolean decodeMsg(Object msg) throws Throwable {
+   private boolean decodeMsg(Object msg, boolean isStagedBuffer) throws 
Throwable {
final Class msgClazz = msg.getClass();
 
//  Buffer 

@@ -217,7 +216,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
return true;
}
 
-   return decodeBufferOrEvent(inputChannel, bufferOrEvent);
+   return decodeBufferOrEvent(inputChannel, bufferOrEvent, 
isStagedBuffer);
}
//  Error 
-
else if (msgClazz == NettyMessage.ErrorResponse.class) {
@@ -252,7 +251,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
return true;
}
 
-   private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, 
NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
+   private boolean decodeBufferOrEvent(RemoteInputChannel inputChannel, 
NettyMessage.BufferResponse bufferOrEvent, boolean isStagedBuffer) throws 
Throwable {
boolean releaseNettyBuffer = true;
 
try {
@@ -269,10 +268,9 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
BufferProvider bufferProvider = 
inputChannel.getBufferProvider();
 
if (bufferProvider == null) {
-
+   // receiver has been cancelled/failed

cancelRequestFor(bufferOrEvent.receiverId);
-
-   return false; // receiver has been 
cancelled/failed
+   return isStagedBuffer;
}
 
while (true) {
@@ -292,7 +290,7 @@ class PartitionRequestClientHandle

[2/2] flink git commit: [FLINK-4021] [network] Add test for staged buffers auto read behaviour

2016-08-19 Thread uce
[FLINK-4021] [network] Add test for staged buffers auto read behaviour

This closes #2141.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbcffb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbcffb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbcffb9

Branch: refs/heads/master
Commit: 7dbcffb901a2a0e1b64b9ae78977726febe2
Parents: 6615ee7
Author: Ufuk Celebi 
Authored: Mon Aug 15 15:59:09 2016 +0200
Committer: Ufuk Celebi 
Committed: Fri Aug 19 12:28:15 2016 +0200

--
 .../runtime/io/network/netty/NettyMessage.java  |  2 +-
 .../PartitionRequestClientHandlerTest.java  | 86 
 .../runtime/testutils/DiscardingRecycler.java   |  2 +
 3 files changed, 89 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..2b03f1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -192,7 +192,7 @@ abstract class NettyMessage {
buffer = null;
}
 
-   BufferResponse(Buffer buffer, int sequenceNumber, 
InputChannelID receiverId) {
+   public BufferResponse(Buffer buffer, int sequenceNumber, 
InputChannelID receiverId) {
this.buffer = buffer;
this.sequenceNumber = sequenceNumber;
this.receiverId = receiverId;

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbcffb9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 2c08cc5..26d791f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -22,6 +22,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
@@ -31,11 +34,17 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -158,8 +167,85 @@ public class PartitionRequestClientHandlerTest {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Tests that an unsuccessful message decode call for a staged message
+* does not leave the channel with auto read set to false.
+*/
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testAutoReadAfterUnsuccessfulStagedMessage() throws 
Exception {
+   PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
+   EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final AtomicReference> listener = new 
AtomicReference<>();
+
+   BufferProvider bufferProvider = mock(BufferProvider.class);
+   

flink git commit: [FLINK-4414] [cluster] Add getAddress method to RpcGateway

2016-08-19 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/flip-6 b7259d617 -> 682ebdb57


[FLINK-4414] [cluster] Add getAddress method to RpcGateway

The RpcGateway.getAddress method allows to retrieve the fully qualified address 
of the
associated RpcEndpoint.

This closes #2392.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/682ebdb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/682ebdb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/682ebdb5

Branch: refs/heads/flip-6
Commit: 682ebdb5744b01a4d126ce4a66846c12588a1714
Parents: b7259d6
Author: Till Rohrmann 
Authored: Thu Aug 18 16:34:47 2016 +0200
Committer: Till Rohrmann 
Committed: Fri Aug 19 13:52:57 2016 +0200

--
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  6 +-
 .../apache/flink/runtime/rpc/RpcGateway.java|  7 +++
 .../apache/flink/runtime/rpc/RpcService.java| 11 --
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 14 +++--
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 21 ++--
 .../runtime/rpc/akka/AkkaRpcActorTest.java  | 16 +++
 6 files changed, 42 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/682ebdb5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index a28bc14..7b3f8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -69,9 +69,6 @@ public abstract class RpcEndpoint {
/** Self gateway which can be used to schedule asynchronous calls on 
yourself */
private final C self;
 
-   /** the fully qualified address of the this RPC endpoint */
-   private final String selfAddress;
-
/** The main thread execution context to be used to execute future 
callbacks in the main thread
 * of the executing rpc server. */
private final ExecutionContext mainThreadExecutionContext;
@@ -92,7 +89,6 @@ public abstract class RpcEndpoint {
this.selfGatewayType = 
ReflectionUtil.getTemplateType1(getClass());
this.self = rpcService.startServer(this);

-   this.selfAddress = rpcService.getAddress(self);
this.mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
}
 
@@ -156,7 +152,7 @@ public abstract class RpcEndpoint {
 * @return Fully qualified address of the underlying RPC endpoint
 */
public String getAddress() {
-   return selfAddress;
+   return self.getAddress();
}
 
/**

http://git-wip-us.apache.org/repos/asf/flink/blob/682ebdb5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
index e3a16b4..81075ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -22,4 +22,11 @@ package org.apache.flink.runtime.rpc;
  * Rpc gateway interface which has to be implemented by Rpc gateways.
  */
 public interface RpcGateway {
+
+   /**
+* Returns the fully qualified address under which the associated rpc 
endpoint is reachable.
+*
+* @return Fully qualified address under which the associated rpc 
endpoint is reachable
+*/
+   String getAddress();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/682ebdb5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index fabdb05..bc0f5cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -65,17 +65,6 @@ public interface RpcService {
void stopService();
 
/**
-* Get the fully qualified address of the underlying rpc server 
represented by the self gateway.
-* It must be possible to connect from a remote host to the rpc server 
via the returned fully
-* qualified address.
-*
-* @param

[1/2] flink git commit: [FLINK-4322] [checkpointing] Ignore minimum delay for savepoints

2016-08-19 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 0d53daa2f -> 5d7f88031


[FLINK-4322] [checkpointing] Ignore minimum delay for savepoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8854d75c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8854d75c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8854d75c

Branch: refs/heads/master
Commit: 8854d75c13a6e7a593baf6d0d5ae4f47c90718e3
Parents: 0d53daa
Author: Ramkrishna 
Authored: Thu Aug 18 17:27:41 2016 +0530
Committer: Ufuk Celebi 
Committed: Fri Aug 19 12:03:44 2016 +0200

--
 .../checkpoint/CheckpointCoordinator.java   | 20 +++-
 .../checkpoint/CheckpointCoordinatorTest.java   |  2 +-
 2 files changed, 12 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2a1ece0..ff54bad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -333,17 +333,19 @@ public class CheckpointCoordinator {
}
return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
-   }
 
-   // make sure the minimum interval between checkpoints 
has passed
-   if (lastTriggeredCheckpoint + 
minPauseBetweenCheckpoints > timestamp) {
-   if (currentPeriodicTrigger != null) {
-   currentPeriodicTrigger.cancel();
-   currentPeriodicTrigger = null;
+   // make sure the minimum interval between 
checkpoints has passed
+   if (lastTriggeredCheckpoint + 
minPauseBetweenCheckpoints > timestamp) {
+   if (currentPeriodicTrigger != null) {
+   currentPeriodicTrigger.cancel();
+   currentPeriodicTrigger = null;
+   }
+   ScheduledTrigger trigger = new 
ScheduledTrigger();
+   // Reassign the new trigger to the 
currentPeriodicTrigger
+   currentPeriodicTrigger = trigger;
+   timer.scheduleAtFixedRate(trigger, 
minPauseBetweenCheckpoints, baseInterval);
+   return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
-   ScheduledTrigger trigger = new 
ScheduledTrigger();
-   timer.scheduleAtFixedRate(trigger, 
minPauseBetweenCheckpoints, baseInterval);
-   return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8854d75c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3341095..f243803 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1408,7 +1408,7 @@ public class CheckpointCoordinatorTest {
assertTrue(coord.triggerCheckpoint(timestamp + 3));
assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-   Future savepointFuture2 = 
coord.triggerSavepoint(timestamp);
+   Future savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4);
long savepointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints());
 



[2/2] flink git commit: [FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest

2016-08-19 Thread uce
[FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest

The added tests check that savepoints ignore the maximum number
of concurrent checkpoints and minimum delay between checkpoints.

This closes #2385.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d7f8803
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d7f8803
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d7f8803

Branch: refs/heads/master
Commit: 5d7f8803155d2eb8865cce9a60dd677c2400261c
Parents: 8854d75
Author: Ufuk Celebi 
Authored: Fri Aug 19 12:01:24 2016 +0200
Committer: Ufuk Celebi 
Committed: Fri Aug 19 12:04:12 2016 +0200

--
 .../checkpoint/CheckpointCoordinatorTest.java   | 90 +++-
 1 file changed, 89 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5d7f8803/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f243803..09c53d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.mockito.stubbing.Answer;
 import scala.concurrent.Future;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1646,7 +1647,94 @@ public class CheckpointCoordinatorTest {
fail(e.getMessage());
}
}
-   
+
+   /**
+* Tests that the savepoints can be triggered concurrently.
+*/
+   @Test
+   public void testConcurrentSavepoints() throws Exception {
+   JobID jobId = new JobID();
+
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+
+   CheckpointCoordinator coord = new CheckpointCoordinator(
+   jobId,
+   10,
+   20,
+   0L,
+   1, // max one checkpoint at a time => should 
not affect savepoints
+   42,
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   new ExecutionVertex[] { vertex1 },
+   cl,
+   checkpointIDCounter,
+   new StandaloneCompletedCheckpointStore(2, cl),
+   new HeapSavepointStore(),
+   new DisabledCheckpointStatsTracker());
+
+   List> savepointFutures = new ArrayList<>();
+
+   int numSavepoints = 5;
+
+   // Trigger savepoints
+   for (int i = 0; i < numSavepoints; i++) {
+   savepointFutures.add(coord.triggerSavepoint(i));
+   }
+
+   // After triggering multiple savepoints, all should in progress
+   for (Future savepointFuture : savepointFutures) {
+   assertFalse(savepointFuture.isCompleted());
+   }
+
+   // ACK all savepoints
+   long checkpointId = checkpointIDCounter.getLast();
+   for (int i = 0; i < numSavepoints; i++, checkpointId--) {
+   coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+   }
+
+   // After ACKs, all should be completed
+   for (Future savepointFuture : savepointFutures) {
+   assertTrue(savepointFuture.isCompleted());
+   }
+   }
+
+   /**
+* Tests that no minimum delay between savepoints is enforced.
+*/
+   @Test
+   public void testMinDelayBetweenSavepoints() throws Exception {
+   JobID jobId = new JobID();
+
+   final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+   ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+   CheckpointCoordinator coord = new CheckpointCoordinator(
+   jobId,
+   10,
+   20,
+