[FLINK-7505] Use lambdas in suppressed exception idiom

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5456cf9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5456cf9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5456cf9f

Branch: refs/heads/master
Commit: 5456cf9f8fc9156fd10e7542e8a2497a285cbff7
Parents: ca87bec
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Thu Aug 24 17:27:29 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Aug 24 20:17:08 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/util/LambdaUtil.java  | 63 ++++++++++++++++++++
 .../org/apache/flink/util/ThrowingConsumer.java | 37 ++++++++++++
 .../apache/flink/runtime/state/StateUtil.java   | 25 +-------
 3 files changed, 103 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5456cf9f/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
new file mode 100644
index 0000000..8ac0f0e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * This class offers utility functions for Java's lambda features.
+ */
+public final class LambdaUtil {
+
+       private LambdaUtil() {
+               throw new AssertionError();
+       }
+
+       /**
+        * This method supplies all elements from the input to the consumer. 
Exceptions that happen on elements are
+        * suppressed until all elements are processed. If exceptions happened 
for one or more of the inputs, they are
+        * reported in a combining suppressed exception.
+        *
+        * @param inputs iterator for all inputs to the throwingConsumer.
+        * @param throwingConsumer this consumer will be called for all 
elements delivered by the input iterator.
+        * @param <T> the type of input.
+        * @throws Exception collected exceptions that happened during the 
invocation of the consumer on the input elements.
+        */
+       public static <T> void applyToAllWhileSuppressingExceptions(
+               Iterable<T> inputs,
+               ThrowingConsumer<T> throwingConsumer) throws Exception {
+
+               if (inputs != null && throwingConsumer != null) {
+                       Exception exception = null;
+
+                       for (T input : inputs) {
+
+                               if (input != null) {
+                                       try {
+                                               throwingConsumer.accept(input);
+                                       } catch (Exception ex) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(ex, exception);
+                                       }
+                               }
+                       }
+
+                       if (exception != null) {
+                               throw exception;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5456cf9f/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java 
b/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java
new file mode 100644
index 0000000..a180a12
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ThrowingConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * This interface is basically Java's {@link java.util.function.Consumer} 
interface enhanced with the ability to throw
+ * an exception.
+ *
+ * @param <T> type of the consumed elements.
+ */
+@FunctionalInterface
+public interface ThrowingConsumer<T> {
+
+       /**
+        * Performs this operation on the given argument.
+        *
+        * @param t the input argument
+        * @throws Exception on errors during consumption
+        */
+       void accept(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5456cf9f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index 6f231e4..09d195a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.LambdaUtil;
 
 import java.util.concurrent.RunnableFuture;
 
@@ -49,27 +49,8 @@ public class StateUtil {
         * @throws Exception exception that is a collection of all suppressed 
exceptions that were caught during iteration
         */
        public static void bestEffortDiscardAllStateObjects(
-                       Iterable<? extends StateObject> handlesToDiscard) 
throws Exception {
-
-               if (handlesToDiscard != null) {
-                       Exception exception = null;
-
-                       for (StateObject state : handlesToDiscard) {
-
-                               if (state != null) {
-                                       try {
-                                               state.discardState();
-                                       }
-                                       catch (Exception ex) {
-                                               exception = 
ExceptionUtils.firstOrSuppressed(ex, exception);
-                                       }
-                               }
-                       }
-
-                       if (exception != null) {
-                               throw exception;
-                       }
-               }
+               Iterable<? extends StateObject> handlesToDiscard) throws 
Exception {
+               
LambdaUtil.applyToAllWhileSuppressingExceptions(handlesToDiscard, 
StateObject::discardState);
        }
 
        /**

Reply via email to