Repository: flink
Updated Branches:
  refs/heads/master b198c33a7 -> 42a4df2d4


[hotfix] Fix shaky EventTimeAllWindowCheckpointITCase

In very rare cases it could happen that a checkpoint would be performed
after the ValidatingSink signaled that it had seen all expected
elements. If this happened the job would be restarted with the already
complete state and we would never finish since no more elements would
arrive.

This adds a check in open() of ValidatingSink that signals success if we
already have the final state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42a4df2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42a4df2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42a4df2d

Branch: refs/heads/master
Commit: 42a4df2d475910a9092a5f7251d4897d00b4fba9
Parents: b198c33
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Oct 21 15:06:42 2015 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Oct 22 10:56:41 2015 +0200

----------------------------------------------------------------------
 .../EventTimeAllWindowCheckpointingITCase.java  | 25 ++++++++++++++------
 .../EventTimeWindowCheckpointingITCase.java     |  5 ----
 2 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42a4df2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 2733349..84022f0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -432,17 +432,12 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                        synchronized (ctx.getCheckpointLock()) {
                                                int next = numElementsEmitted++;
                                                for (long i = 0; i < numKeys; 
i++) {
-                                                       
ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+                                                       
ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next);
                                                }
                                                ctx.emitWatermark(new 
Watermark(next));
                                        }
                                }
                                else {
-                                       // exit at some point so that we don't 
deadlock
-                                       if (numElementsEmitted > 
numElementsToEmit * 5) {
-//                                             running = false;
-                                               System.err.println("Succ 
Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + 
numElementsEmitted + "num elements to emit: " + numElementsToEmit);
-                                       }
                                        // if our work is done, delay a bit to 
prevent busy waiting
                                        Thread.sleep(1);
                                }
@@ -491,6 +486,22 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                public void open(Configuration parameters) throws Exception {
                        // this sink can only work with DOP 1
                        assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       // it can happen that a checkpoint happens when the 
complete success state is
+                       // already set. In that case we restart with the final 
state and would never
+                       // finish because no more elements arrive.
+                       if (windowCounts.size() == numKeys) {
+                               boolean seenAll = true;
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount != numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       }
+                               }
+                               if (seenAll) {
+                                       throw new SuccessException();
+                               }
+                       }
                }
 
                @Override
@@ -498,7 +509,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                        boolean seenAll = true;
                        if (windowCounts.size() == numKeys) {
                                for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
+                                       if (windowCount != numWindowsExpected) {
                                                seenAll = false;
                                                break;
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/42a4df2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4d1d2c3..6cf04f5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -439,11 +439,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        }
                                }
                                else {
-                                       // exit at some point so that we don't 
deadlock
-                                       if (numElementsEmitted > 
numElementsToEmit * 5) {
-//                                             running = false;
-                                               System.err.println("Succ 
Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + 
numElementsEmitted + "num elements to emit: " + numElementsToEmit);
-                                       }
 
                                        // if our work is done, delay a bit to 
prevent busy waiting
                                        Thread.sleep(1);

Reply via email to