[FLINK-785] ChainedAllReduce

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a7350d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a7350d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a7350d

Branch: refs/heads/master
Commit: a3a7350d55a3e9b20a6c53aedd8d1c24fb188122
Parents: b4152d7
Author: zentol <s.mo...@web.de>
Authored: Wed Feb 11 16:02:15 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 14:00:49 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/DriverStrategy.java |   3 +-
 .../chaining/ChainedAllReduceDriver.java        | 112 +++++++++++++++++++
 2 files changed, 114 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 7942b3b..4a0035c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 import static org.apache.flink.runtime.operators.DamBehavior.FULL_DAM;
 import static org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
 import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
+import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
 
 import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -51,7 +52,7 @@ public enum DriverStrategy {
        FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, 0),
 
        // group everything together into one group and apply the Reduce 
function
-       ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, 0),
+       ALL_REDUCE(AllReduceDriver.class, ChainedAllReduceDriver.class, 
PIPELINED, 0),
        // group everything together into one group and apply the GroupReduce 
function
        ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
        // group everything together into one group and apply the GroupReduce's 
combine function

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a7350d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
new file mode 100644
index 0000000..4641fce
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ChainedAllReduceDriver.class);
+
+       // 
--------------------------------------------------------------------------------------------
+       private ReduceFunction<IT> reducer;
+       private TypeSerializer<IT> serializer;
+
+       private IT base;
+
+       // 
--------------------------------------------------------------------------------------------
+       @Override
+       public void setup(AbstractInvokable parent) {
+               @SuppressWarnings("unchecked")
+               final ReduceFunction<IT> red = 
RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, 
ReduceFunction.class);
+               this.reducer = red;
+               FunctionUtils.setFunctionRuntimeContext(red, 
getUdfRuntimeContext());
+
+               TypeSerializerFactory<IT> serializerFactory = 
this.config.getInputSerializer(0, userCodeClassLoader);
+               this.serializer = serializerFactory.getSerializer();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("ChainedAllReduceDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+               }
+       }
+
+       @Override
+       public void openTask() throws Exception {
+               Configuration stubConfig = this.config.getStubParameters();
+               RegularPactTask.openUserCode(this.reducer, stubConfig);
+       }
+
+       @Override
+       public void closeTask() throws Exception {
+               RegularPactTask.closeUserCode(this.reducer);
+       }
+
+       @Override
+       public void cancelTask() {
+               try {
+                       FunctionUtils.closeFunction(this.reducer);
+               } catch (Throwable t) {
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       @Override
+       public Function getStub() {
+               return this.reducer;
+       }
+
+       @Override
+       public String getTaskName() {
+               return this.taskName;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       @Override
+       public void collect(IT record) {
+               try {
+                       if (base == null) {
+                               base = objectReuseEnabled ? record : 
serializer.copy(record);
+                       } else {
+                               base = objectReuseEnabled ? 
reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record));
+                       }
+               } catch (Exception e) {
+                       throw new ExceptionInChainedStubException(taskName, e);
+               }
+       }
+
+       @Override
+       public void close() {
+               try {
+                       if (base != null) {
+                               this.outputCollector.collect(base);
+                               base = null;
+                       }
+               } catch (Exception e) {
+                       throw new 
ExceptionInChainedStubException(this.taskName, e);
+               }
+               this.outputCollector.close();
+       }
+}

Reply via email to