Repository: flink
Updated Branches:
  refs/heads/master 98fad04ec -> c0fd36bac


[FLINK-2671] [tests] Fix unstable StreamCheckpointNotifierITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0fd36ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0fd36ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0fd36ba

Branch: refs/heads/master
Commit: c0fd36bac8955cff58a88e72d5bdb378f3cfdf93
Parents: a8be52a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 15 17:29:46 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 18:58:57 2016 +0100

----------------------------------------------------------------------
 .../api/functions/sink/DiscardingSink.java      |  32 ++
 .../StreamCheckpointNotifierITCase.java         | 402 +++++++++++--------
 2 files changed, 276 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0fd36ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
new file mode 100644
index 0000000..3bbb14b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+/**
+ * A stream sink that ignores all elements.
+ * 
+ * @param <T> The type of elements received by the sink.
+ */
+public class DiscardingSink<T> implements SinkFunction<T> {
+       
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void invoke(T value) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0fd36ba/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 08af93a..9de5794 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -22,31 +22,41 @@ import 
org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Integration test for the {@link CheckpointNotifier} interface. The test 
ensures that
- * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for 
some completed
+ * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for 
completed
  * checkpoints, that it is called at most once for any checkpoint id and that 
it is not
  * called for a deliberately failed checkpoint.
  *
@@ -60,9 +70,43 @@ import static org.junit.Assert.assertTrue;
  * successfully completed checkpoint.
  */
 @SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase {
+public class StreamCheckpointNotifierITCase {
+       
+       private static final int NUM_TASK_MANAGERS = 2;
+       private static final int NUM_TASK_SLOTS = 3;
+       private static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
+
+       private static ForkableFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void startCluster() {
+               try {
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+                       
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster.start();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to start test cluster: " + e.getMessage());
+               }
+       }
 
-       final long NUM_LONGS = 10_000_000L;
+       @AfterClass
+       public static void stopCluster() {
+               try {
+                       cluster.stop();
+                       cluster = null;
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to stop test cluster: " + e.getMessage());
+               }
+       }
 
        /**
         * Runs the following program:
@@ -71,61 +115,83 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
         *     [ (source)->(filter) ] -> [ (co-map) ] -> [ (map) ] -> [ 
(groupBy/reduce)->(sink) ]
         * </pre>
         */
-       @Override
-       public void testProgram(StreamExecutionEnvironment env) {
-
-               DataStream<Long> stream = env.addSource(new 
GeneratingSourceFunction(NUM_LONGS));
-
-               stream
-                               // -------------- first vertex, chained to the 
src ----------------
-                               .filter(new LongRichFilterFunction())
-
-                               // -------------- second vertex, applying the 
co-map ----------------
-                               .connect(stream).flatMap(new 
LeftIdentityCoRichFlatMapFunction())
-
-                               // -------------- third vertex - the stateful 
one that also fails ----------------
-                               .map(new IdentityMapFunction())
-                               .startNewChain()
-
-                               // -------------- fourth vertex - reducer and 
the sink ----------------
-                               .keyBy(0)
-                               .reduce(new OnceFailingReducer(NUM_LONGS))
-                               .addSink(new SinkFunction<Tuple1<Long>>() {
-                                       @Override
-                                       public void invoke(Tuple1<Long> value) {
-                                               // do nothing
-                                       }
-                               });
-       }
+       @Test
+       public void testProgram() {
+               try {
+                       StreamExecutionEnvironment env = 
+                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
+                       
+                       env.setParallelism(PARALLELISM);
+                       env.enableCheckpointing(500);
+                       env.getConfig().disableSysoutLogging();
+                       
+                       final int numElements = 10000;
+                       final int numTaskTotal = PARALLELISM * 5; 
+                       
+                       DataStream<Long> stream = env.addSource(new 
GeneratingSourceFunction(numElements, numTaskTotal));
+       
+                       stream
+                                       // -------------- first vertex, chained 
to the src ----------------
+                                       .filter(new LongRichFilterFunction())
+       
+                                       // -------------- second vertex, 
applying the co-map ----------------
+                                       .connect(stream).flatMap(new 
LeftIdentityCoRichFlatMapFunction())
+       
+                                       // -------------- third vertex - the 
stateful one that also fails ----------------
+                                       .map(new IdentityMapFunction())
+                                       .startNewChain()
+       
+                                       // -------------- fourth vertex - 
reducer and the sink ----------------
+                                       .keyBy(0)
+                                       .reduce(new 
OnceFailingReducer(numElements))
+                               
+                                       .addSink(new 
DiscardingSink<Tuple1<Long>>());
+                       
+                       env.execute();
 
-       @Override
-       public void postSubmit() {
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               List<Long>[][] checkList = new List[][] {
+                       final long failureCheckpointID = 
OnceFailingReducer.failureCheckpointID;
+                       assertNotEquals(0L, failureCheckpointID);
+                       
+                       List<List<Long>[]> allLists = Arrays.asList(
                                GeneratingSourceFunction.completedCheckpoints,
-                               IdentityMapFunction.completedCheckpoints,
                                LongRichFilterFunction.completedCheckpoints,
-                               
LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
-
-               long failureCheckpointID = 
OnceFailingReducer.failureCheckpointID;
+                               
LeftIdentityCoRichFlatMapFunction.completedCheckpoints,
+                               IdentityMapFunction.completedCheckpoints,
+                               OnceFailingReducer.completedCheckpoints
+                       );
 
-               for(List<Long>[] parallelNotifications : checkList) {
-                       for (int i = 0; i < PARALLELISM; i++){
-                               List<Long> notifications = 
parallelNotifications[i];
-                               assertTrue("No checkpoint notification was 
received.",
+                       for (List<Long>[] parallelNotifications : allLists) {
+                               for (List<Long> notifications : 
parallelNotifications) {
+                                       
+                                       assertTrue("No checkpoint notification 
was received.", 
                                                notifications.size() > 0);
-                               assertFalse("Failure checkpoint was marked as 
completed.",
+                                       
+                                       assertFalse("Failure checkpoint was 
marked as completed.",
                                                
notifications.contains(failureCheckpointID));
-                               assertFalse("No checkpoint received before 
failure.",
-                                               notifications.get(0) == 
failureCheckpointID);
-                               assertFalse("No checkpoint received after 
failure.",
+                                       
+                                       assertFalse("No checkpoint received 
after failure.",
                                                
notifications.get(notifications.size() - 1) == failureCheckpointID);
-                               assertTrue("Checkpoint notification was 
received multiple times",
+                                       
+                                       assertTrue("Checkpoint notification was 
received multiple times",
                                                notifications.size() == new 
HashSet<Long>(notifications).size());
+                               }
                        }
                }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
+       static List<Long>[] createCheckpointLists(int parallelism) {
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               List<Long>[] lists = new List[parallelism];
+               for (int i = 0; i < parallelism; i++) {
+                       lists[i] = new ArrayList<>();
+               }
+               return lists;
+       }
+       
        // 
--------------------------------------------------------------------------------------------
        //  Custom Functions
        // 
--------------------------------------------------------------------------------------------
@@ -136,40 +202,35 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
         */
        private static class GeneratingSourceFunction extends 
RichSourceFunction<Long>
                        implements ParallelSourceFunction<Long>, 
CheckpointNotifier, Checkpointed<Integer> {
-
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               static List<Long>[] completedCheckpoints = new 
List[PARALLELISM];
                
+               static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
+               
+               static AtomicLong numPostFailureNotifications = new 
AtomicLong();
 
                // operator behaviour
                private final long numElements;
-               private long result;
+               
+               private final int notificationsToWaitFor;
 
                private int index;
                private int step;
 
-               // test behaviour
-               private int subtaskId;
-
+               private volatile boolean notificationAlready;
+               
                private volatile boolean isRunning = true;
 
-               GeneratingSourceFunction(long numElements) {
+               GeneratingSourceFunction(long numElements, int 
notificationsToWaitFor) {
                        this.numElements = numElements;
+                       this.notificationsToWaitFor = notificationsToWaitFor;
                }
 
                @Override
                public void open(Configuration parameters) throws IOException {
                        step = 
getRuntimeContext().getNumberOfParallelSubtasks();
-                       subtaskId = getRuntimeContext().getIndexOfThisSubtask();
-                       
-                       if (index == 0) {
-                               index = subtaskId;
-                       }
 
-                       // Create a collection on the first open
-                       if (completedCheckpoints[subtaskId] == null) {
-                               completedCheckpoints[subtaskId] = new 
ArrayList<>();
-                       }
+                       // if index has been restored, it is not 0 any more
+                       if (index == 0)
+                               index = 
getRuntimeContext().getIndexOfThisSubtask();
                }
 
                @Override
@@ -177,14 +238,19 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
                        final Object lockingObject = ctx.getCheckpointLock();
 
                        while (isRunning && index < numElements) {
-
-                               result = index % 10;
+                               long result = index % 10;
 
                                synchronized (lockingObject) {
                                        index += step;
                                        ctx.collect(result);
                                }
                        }
+                       
+                       // if the program goes fast and no notifications come 
through, we
+                       // wait until all tasks had a chance to see a 
notification
+                       while (isRunning && numPostFailureNotifications.get() < 
notificationsToWaitFor) {
+                               Thread.sleep(50);
+                       }
                }
 
                @Override
@@ -193,11 +259,6 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
                }
 
                @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       completedCheckpoints[subtaskId].add(checkpointId);
-               }
-
-               @Override
                public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
                        return index;
                }
@@ -206,6 +267,20 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
                public void restoreState(Integer state) {
                        index = state;
                }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       // record the ID of the completed checkpoint
+                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+                       completedCheckpoints[partition].add(checkpointId);
+
+                       // if this is the first time we get a notification 
since the failure,
+                       // tell the source function
+                       if (OnceFailingReducer.hasFailed && 
!notificationAlready) {
+                               notificationAlready = true;
+                               
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
+                       }
+               }
        }
 
        /**
@@ -215,10 +290,9 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
        private static class IdentityMapFunction extends RichMapFunction<Long, 
Tuple1<Long>>
                        implements CheckpointNotifier {
 
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               public static List<Long>[] completedCheckpoints = new 
List[PARALLELISM];
-               
-               private int subtaskId;
+               static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
+
+               private volatile boolean notificationAlready;
 
                @Override
                public Tuple1<Long> map(Long value) throws Exception {
@@ -226,35 +300,106 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
                }
 
                @Override
-               public void open(Configuration conf) throws IOException {
-                       subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+               public void notifyCheckpointComplete(long checkpointId) {
+                       // record the ID of the completed checkpoint
+                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+                       completedCheckpoints[partition].add(checkpointId);
+
+                       // if this is the first time we get a notification 
since the failure,
+                       // tell the source function
+                       if (OnceFailingReducer.hasFailed && 
!notificationAlready) {
+                               notificationAlready = true;
+                               
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
+                       }
+               }
+       }
 
-                       // Create a collection on the first open
-                       if (completedCheckpoints[subtaskId] == null) {
-                               completedCheckpoints[subtaskId] = new 
ArrayList<>();
+       /**
+        * Filter on Long values supposedly letting all values through. As an 
implementation
+        * for the {@link CheckpointNotifier} interface it stores all the 
checkpoint ids
+        * it has seen in a static list.
+        */
+       private static class LongRichFilterFunction extends 
RichFilterFunction<Long> implements CheckpointNotifier {
+
+               static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
+               
+               private volatile boolean notificationAlready;
+               
+               @Override
+               public boolean filter(Long value) {
+                       return value < 100;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       // record the ID of the completed checkpoint
+                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+                       completedCheckpoints[partition].add(checkpointId);
+                       
+                       // if this is the first time we get a notification 
since the failure,
+                       // tell the source function
+                       if (OnceFailingReducer.hasFailed && 
!notificationAlready) {
+                               notificationAlready = true;
+                               
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
+       }
 
+       /**
+        * CoFlatMap on Long values as identity transform on the left input, 
while ignoring the right.
+        * As an implementation for the {@link CheckpointNotifier} interface it 
stores all the checkpoint
+        * ids it has seen in a static list.
+        */
+       private static class LeftIdentityCoRichFlatMapFunction extends 
RichCoFlatMapFunction<Long, Long, Long>
+                       implements CheckpointNotifier {
+
+               static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
+
+               private volatile boolean notificationAlready;
+               
                @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       completedCheckpoints[subtaskId].add(checkpointId);
+               public void flatMap1(Long value, Collector<Long> out) {
+                       out.collect(value);
+               }
+
+               @Override
+               public void flatMap2(Long value, Collector<Long> out) {
+                       // we ignore the values from the second input
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       // record the ID of the completed checkpoint
+                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+                       completedCheckpoints[partition].add(checkpointId);
+
+                       // if this is the first time we get a notification 
since the failure,
+                       // tell the source function
+                       if (OnceFailingReducer.hasFailed && 
!notificationAlready) {
+                               notificationAlready = true;
+                               
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
+                       }
                }
        }
 
        /**
         * Reducer that causes one failure between seeing 40% to 70% of the 
records.
         */
-       private static class OnceFailingReducer extends 
RichReduceFunction<Tuple1<Long>> implements Checkpointed<Long> {
-
-               private static volatile boolean hasFailed = false;
-               public static volatile long failureCheckpointID;
+       private static class OnceFailingReducer extends 
RichReduceFunction<Tuple1<Long>> 
+               implements Checkpointed<Long>, CheckpointNotifier
+       {
+               static volatile boolean hasFailed = false;
+               static volatile long failureCheckpointID;
 
+               static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
+               
                private final long numElements;
 
                private long failurePos;
                private long count;
 
-
+               private volatile boolean notificationAlready;
+               
                OnceFailingReducer(long numElements) {
                        this.numElements = numElements;
                }
@@ -265,11 +410,10 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
                        long failurePosMax = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
 
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
-                       count = 0;
                }
 
                @Override
-               public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> 
value2) throws Exception {
+               public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> 
value2) {
                        count++;
                        value1.f0 += value2.f0;
                        return value1;
@@ -277,7 +421,7 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
 
                @Override
                public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       if (!hasFailed && count >= failurePos) {
+                       if (!hasFailed && count >= failurePos && 
getRuntimeContext().getIndexOfThisSubtask() == 0) {
                                hasFailed = true;
                                failureCheckpointID = checkpointId;
                                throw new Exception("Test Failure");
@@ -289,77 +433,19 @@ public class StreamCheckpointNotifierITCase extends 
StreamFaultToleranceTestBase
                public void restoreState(Long state) {
                        count = state;
                }
-       }
-
-       /**
-        * Filter on Long values supposedly letting all values through. As an 
implementation
-        * for the {@link CheckpointNotifier} interface it stores all the 
checkpoint ids
-        * it has seen in a static list.
-        */
-       private static class LongRichFilterFunction extends 
RichFilterFunction<Long>
-                       implements CheckpointNotifier {
-
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               static List<Long>[] completedCheckpoints = new 
List[PARALLELISM];
-               
-               private int subtaskId;
-
-               @Override
-               public boolean filter(Long value) {
-                       return value < 100;
-               }
 
                @Override
-               public void open(Configuration conf) throws IOException {
-                       subtaskId = getRuntimeContext().getIndexOfThisSubtask();
-
-                       // Create a collection on the first open
-                       if (completedCheckpoints[subtaskId] == null) {
-                               completedCheckpoints[subtaskId] = new 
ArrayList<>();
+               public void notifyCheckpointComplete(long checkpointId) {
+                       // record the ID of the completed checkpoint
+                       int partition = 
getRuntimeContext().getIndexOfThisSubtask();
+                       completedCheckpoints[partition].add(checkpointId);
+
+                       // if this is the first time we get a notification 
since the failure,
+                       // tell the source function
+                       if (OnceFailingReducer.hasFailed && 
!notificationAlready) {
+                               notificationAlready = true;
+                               
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       completedCheckpoints[subtaskId].add(checkpointId);
-               }
-       }
-
-       /**
-        * CoFlatMap on Long values as identity transform on the left input, 
while ignoring the right.
-        * As an implementation for the {@link CheckpointNotifier} interface it 
stores all the checkpoint
-        * ids it has seen in a static list.
-        */
-       private static class LeftIdentityCoRichFlatMapFunction extends 
RichCoFlatMapFunction<Long, Long, Long>
-                       implements CheckpointNotifier {
-
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               public static List<Long>[] completedCheckpoints = new 
List[PARALLELISM];
-               private int subtaskId;
-
-               @Override
-               public void open(Configuration conf) throws IOException {
-                       subtaskId = getRuntimeContext().getIndexOfThisSubtask();
-
-                       // Create a collection on the first open
-                       if (completedCheckpoints[subtaskId] == null) {
-                               completedCheckpoints[subtaskId] = new 
ArrayList<>();
-                       }
-               }
-
-               @Override
-               public void flatMap1(Long value, Collector<Long> out) throws 
IOException {
-                       out.collect(value);
-               }
-
-               @Override
-               public void flatMap2(Long value, Collector<Long> out) throws 
IOException {
-                       // we ignore the values from the second input
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       completedCheckpoints[subtaskId].add(checkpointId);
-               }
        }
 }

Reply via email to