http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index c6a872c..988e903 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -55,14 +55,14 @@ import java.util.List; * @param <IN> The data type consumed by the combiner. * @param <OUT> The data type produced by the combiner. */ -public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> { +public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> { private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext; + private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext; private InMemorySorter<IN> sorter; @@ -87,7 +87,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) { + public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> context) { this.taskContext = context; this.running = true; }
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index 59fb603..a03e42d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -40,11 +40,11 @@ import org.apache.flink.util.MutableObjectIterator; * * @see org.apache.flink.api.common.functions.GroupReduceFunction */ -public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> { +public class GroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, OT>, OT> { private static final Logger LOG = LoggerFactory.getLogger(GroupReduceDriver.class); - private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext; + private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext; private MutableObjectIterator<IT> input; @@ -59,7 +59,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) { + public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index 811f00c..7a9c8e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -46,11 +46,11 @@ import org.slf4j.LoggerFactory; * * @see org.apache.flink.api.common.functions.FlatJoinFunction */ -public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { +public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT2, OT>, OT> { protected static final Logger LOG = LoggerFactory.getLogger(JoinDriver.class); - protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; + protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; private volatile JoinTaskIterator<IT1, IT2, OT> joinIterator; // the iterator that does the actual join @@ -59,7 +59,7 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1 // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { + public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java index fe926cb..51f9197 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java @@ -27,15 +27,15 @@ import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker; -import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask; +import org.apache.flink.runtime.iterative.task.AbstractIterativeTask; import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { +public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { - private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; + private TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; private CompactingHashTable<IT1> hashTable; @@ -55,7 +55,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP // -------------------------------------------------------------------------------------------- @Override - public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { + public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -99,8 +99,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP final TypeComparator<IT1> solutionSetComparator; // grab a handle to the hash table from the iteration broker - if (taskContext instanceof AbstractIterativePactTask) { - AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext; + if (taskContext instanceof AbstractIterativeTask) { + AbstractIterativeTask<?, ?> iterativeTaskContext = (AbstractIterativeTask<?, ?>) taskContext; String identifier = iterativeTaskContext.brokerKey(); Object table = SolutionSetBroker.instance().get(identifier); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java index 20079fc..e1fad47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java @@ -27,15 +27,15 @@ import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker; -import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask; +import org.apache.flink.runtime.iterative.task.AbstractIterativeTask; import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { +public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { - private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; + private TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; private CompactingHashTable<IT2> hashTable; @@ -55,7 +55,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable // -------------------------------------------------------------------------------------------- @Override - public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { + public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -99,8 +99,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable final TypeComparator<IT2> solutionSetComparator; // grab a handle to the hash table from the iteration broker - if (taskContext instanceof AbstractIterativePactTask) { - AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext; + if (taskContext instanceof AbstractIterativeTask) { + AbstractIterativeTask<?, ?> iterativeTaskContext = (AbstractIterativeTask<?, ?>) taskContext; String identifier = iterativeTaskContext.brokerKey(); Object table = SolutionSetBroker.instance().get(identifier); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java index d861cbd..eefe8e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java @@ -36,9 +36,9 @@ import org.apache.flink.util.MutableObjectIterator; * @param <IT> The mapper's input data type. * @param <OT> The mapper's output data type. */ -public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> { +public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> { - private PactTaskContext<MapFunction<IT, OT>, OT> taskContext; + private TaskContext<MapFunction<IT, OT>, OT> taskContext; private volatile boolean running; @@ -46,7 +46,7 @@ public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> { @Override - public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) { + public void setup(TaskContext<MapFunction<IT, OT>, OT> context) { this.taskContext = context; this.running = true; http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java index eaab904..8792ef1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java @@ -41,16 +41,16 @@ import org.slf4j.LoggerFactory; * @param <IT> The mapper's input data type. * @param <OT> The mapper's output data type. */ -public class MapPartitionDriver<IT, OT> implements PactDriver<MapPartitionFunction<IT, OT>, OT> { +public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> { private static final Logger LOG = LoggerFactory.getLogger(MapPartitionDriver.class); - private PactTaskContext<MapPartitionFunction<IT, OT>, OT> taskContext; + private TaskContext<MapPartitionFunction<IT, OT>, OT> taskContext; private boolean objectReuseEnabled = false; @Override - public void setup(PactTaskContext<MapPartitionFunction<IT, OT>, OT> context) { + public void setup(TaskContext<MapPartitionFunction<IT, OT>, OT> context) { this.taskContext = context; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java index 1fb4813..fcd2716 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java @@ -32,18 +32,18 @@ import org.slf4j.LoggerFactory; * * @param <T> The data type. */ -public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> { +public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> { private static final Logger LOG = LoggerFactory.getLogger(MapPartitionDriver.class); - private PactTaskContext<AbstractRichFunction, T> taskContext; + private TaskContext<AbstractRichFunction, T> taskContext; private volatile boolean running; private boolean objectReuseEnabled = false; @Override - public void setup(PactTaskContext<AbstractRichFunction, T> context) { + public void setup(TaskContext<AbstractRichFunction, T> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java deleted file mode 100644 index 288f7ca..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.functions.Function; - -/** - * The interface to be implemented by all drivers that run alone (or as the primary driver) in a task. - * A driver implements the actual code to perform a batch operation, like <i>map()</i>, - * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>. - * - * @see PactTaskContext - * - * @param <S> The type of stub driven by this driver. - * @param <OT> The data type of the records produced by this driver. - */ -public interface PactDriver<S extends Function, OT> { - - void setup(PactTaskContext<S, OT> context); - - /** - * Gets the number of inputs that the task has. - * - * @return The number of inputs. - */ - int getNumberOfInputs(); - - /** - * Gets the number of comparators required for this driver. - * - * @return The number of comparators required for this driver. - */ - int getNumberOfDriverComparators(); - - /** - * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return - * <code>MapFunction.class</code>. - * - * @return The class of the stub type run by the task. - */ - Class<S> getStubType(); - - /** - * This method is called before the user code is opened. An exception thrown by this method - * signals failure of the task. - * - * @throws Exception Exceptions may be forwarded and signal task failure. - */ - void prepare() throws Exception; - - /** - * The main operation method of the task. It should call the user code with the data subsets until - * the input is depleted. - * - * @throws Exception Any exception thrown by this method signals task failure. Because exceptions in the user - * code typically signal situations where this instance in unable to proceed, exceptions - * from the user code should be forwarded. - */ - void run() throws Exception; - - /** - * This method is invoked in any case (clean termination and exception) at the end of the tasks operation. - * - * @throws Exception Exceptions may be forwarded. - */ - void cleanup() throws Exception; - - /** - * This method is invoked when the driver must aborted in mid processing. It is invoked asynchronously by a different thread. - * - * @throws Exception Exceptions may be forwarded. - */ - void cancel() throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java deleted file mode 100644 index baeda3a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - - -/** - * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to - * the runtime components and configuration that they can use to fulfil their task. - * - * @param <S> The UDF type. - * @param <OT> The produced data type. - * - * @see PactDriver - */ -public interface PactTaskContext<S, OT> { - - TaskConfig getTaskConfig(); - - TaskManagerRuntimeInfo getTaskManagerInfo(); - - ClassLoader getUserCodeClassLoader(); - - MemoryManager getMemoryManager(); - - IOManager getIOManager(); - - <X> MutableObjectIterator<X> getInput(int index); - - <X> TypeSerializerFactory<X> getInputSerializer(int index); - - <X> TypeComparator<X> getDriverComparator(int index); - - S getStub(); - - ExecutionConfig getExecutionConfig(); - - Collector<OT> getOutputCollector(); - - AbstractInvokable getOwningNepheleTask(); - - String formatLogString(String message); -} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index f990156..c77e746 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -45,7 +45,7 @@ import org.apache.flink.util.MutableObjectIterator; * * @param <T> The data type consumed and produced by the combiner. */ -public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> { +public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); @@ -53,7 +53,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - private PactTaskContext<ReduceFunction<T>, T> taskContext; + private TaskContext<ReduceFunction<T>, T> taskContext; private TypeSerializer<T> serializer; @@ -77,7 +77,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<ReduceFunction<T>, T> context) { + public void setup(TaskContext<ReduceFunction<T>, T> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index 8d15ef2..395beab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -39,11 +39,11 @@ import org.apache.flink.util.MutableObjectIterator; * * @see org.apache.flink.api.common.functions.ReduceFunction */ -public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { +public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> { private static final Logger LOG = LoggerFactory.getLogger(ReduceDriver.class); - private PactTaskContext<ReduceFunction<T>, T> taskContext; + private TaskContext<ReduceFunction<T>, T> taskContext; private MutableObjectIterator<T> input; @@ -58,7 +58,7 @@ public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<ReduceFunction<T>, T> context) { + public void setup(TaskContext<ReduceFunction<T>, T> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java deleted file mode 100644 index 89963af..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ /dev/null @@ -1,1499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.functions.GroupCombineFunction; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization; -import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.reader.MutableReader; -import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; -import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.chaining.ChainedDriver; -import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; -import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator; -import org.apache.flink.runtime.operators.shipping.OutputCollector; -import org.apache.flink.runtime.operators.shipping.OutputEmitter; -import org.apache.flink.runtime.operators.shipping.RecordOutputCollector; -import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; -import org.apache.flink.runtime.operators.util.CloseableInputProvider; -import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.runtime.operators.util.ReaderIterator; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.plugable.DeserializationDelegate; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.MutableObjectIterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * The base class for all tasks. Encapsulated common behavior and implements the main life-cycle - * of the user code. - */ -public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> { - - protected static final Logger LOG = LoggerFactory.getLogger(RegularPactTask.class); - - // -------------------------------------------------------------------------------------------- - - /** - * The driver that invokes the user code (the stub implementation). The central driver in this task - * (further drivers may be chained behind this driver). - */ - protected volatile PactDriver<S, OT> driver; - - /** - * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf. - */ - protected S stub; - - /** - * The udf's runtime context. - */ - protected DistributedRuntimeUDFContext runtimeUdfContext; - - /** - * The collector that forwards the user code's results. May forward to a channel or to chained drivers within - * this task. - */ - protected Collector<OT> output; - - /** - * The output writers for the data that this task forwards to the next task. The latest driver (the central, if no chained - * drivers exist, otherwise the last chained driver) produces its output to these writers. - */ - protected List<RecordWriter<?>> eventualOutputs; - - /** - * The input readers of this task. - */ - protected MutableReader<?>[] inputReaders; - - /** - * The input readers for the configured broadcast variables for this task. - */ - protected MutableReader<?>[] broadcastInputReaders; - - /** - * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc... - */ - protected MutableObjectIterator<?>[] inputIterators; - - /** - * The indices of the iterative inputs. Empty, if the task is not iterative. - */ - protected int[] iterativeInputs; - - /** - * The indices of the iterative broadcast inputs. Empty, if non of the inputs is iteratve. - */ - protected int[] iterativeBroadcastInputs; - - /** - * The local strategies that are applied on the inputs. - */ - protected volatile CloseableInputProvider<?>[] localStrategies; - - /** - * The optional temp barriers on the inputs for dead-lock breaking. Are - * optionally resettable. - */ - protected volatile TempBarrier<?>[] tempBarriers; - - /** - * The resettable inputs in the case where no temp barrier is needed. - */ - protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs; - - /** - * The inputs to the operator. Return the readers' data after the application of the local strategy - * and the temp-table barrier. - */ - protected MutableObjectIterator<?>[] inputs; - - /** - * The serializers for the input data type. - */ - protected TypeSerializerFactory<?>[] inputSerializers; - - /** - * The serializers for the broadcast input data types. - */ - protected TypeSerializerFactory<?>[] broadcastInputSerializers; - - /** - * The comparators for the central driver. - */ - protected TypeComparator<?>[] inputComparators; - - /** - * The task configuration with the setup parameters. - */ - protected TaskConfig config; - - /** - * A list of chained drivers, if there are any. - */ - protected ArrayList<ChainedDriver<?, ?>> chainedTasks; - - /** - * Certain inputs may be excluded from resetting. For example, the initial partial solution - * in an iteration head must not be reseted (it is read through the back channel), when all - * others are reseted. - */ - private boolean[] excludeFromReset; - - /** - * Flag indicating for each input whether it is cached and can be reseted. - */ - private boolean[] inputIsCached; - - /** - * flag indicating for each input whether it must be asynchronously materialized. - */ - private boolean[] inputIsAsyncMaterialized; - - /** - * The amount of memory per input that is dedicated to the materialization. - */ - private int[] materializationMemory; - - /** - * The flag that tags the task as still running. Checked periodically to abort processing. - */ - protected volatile boolean running = true; - - /** - * The accumulator map used in the RuntimeContext. - */ - protected Map<String, Accumulator<?,?>> accumulatorMap; - - // -------------------------------------------------------------------------------------------- - // Task Interface - // -------------------------------------------------------------------------------------------- - - - /** - * Initialization method. Runs in the execution graph setup phase in the JobManager - * and as a setup method on the TaskManager. - */ - @Override - public void registerInputOutput() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Start registering input and output.")); - } - - // obtain task configuration (including stub parameters) - Configuration taskConf = getTaskConfiguration(); - this.config = new TaskConfig(taskConf); - - // now get the operator class which drives the operation - final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver(); - this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class); - - // initialize the readers. - // this does not yet trigger any stream consuming or processing. - initInputReaders(); - initBroadcastInputReaders(); - - // initialize the writers. - initOutputs(); - - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Finished registering input and output.")); - } - } - - - /** - * The main work method. - */ - @Override - public void invoke() throws Exception { - - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Start task code.")); - } - - Environment env = getEnvironment(); - - this.runtimeUdfContext = createRuntimeContext(env.getTaskName()); - - // whatever happens in this scope, make sure that the local strategies are cleaned up! - // note that the initialization of the local strategies is in the try-finally block as well, - // so that the thread that creates them catches its own errors that may happen in that process. - // this is especially important, since there may be asynchronous closes (such as through canceling). - try { - // initialize the remaining data structures on the input and trigger the local processing - // the local processing includes building the dams / caches - try { - int numInputs = driver.getNumberOfInputs(); - int numComparators = driver.getNumberOfDriverComparators(); - int numBroadcastInputs = this.config.getNumBroadcastInputs(); - - initInputsSerializersAndComparators(numInputs, numComparators); - initBroadcastInputsSerializers(numBroadcastInputs); - - // set the iterative status for inputs and broadcast inputs - { - List<Integer> iterativeInputs = new ArrayList<Integer>(); - - for (int i = 0; i < numInputs; i++) { - final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i); - - if (numberOfEventsUntilInterrupt < 0) { - throw new IllegalArgumentException(); - } - else if (numberOfEventsUntilInterrupt > 0) { - this.inputReaders[i].setIterativeReader(); - iterativeInputs.add(i); - - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Input [" + i + "] reads in supersteps with [" + - + numberOfEventsUntilInterrupt + "] event(s) till next superstep.")); - } - } - } - this.iterativeInputs = asArray(iterativeInputs); - } - - { - List<Integer> iterativeBcInputs = new ArrayList<Integer>(); - - for (int i = 0; i < numBroadcastInputs; i++) { - final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i); - - if (numberOfEventsUntilInterrupt < 0) { - throw new IllegalArgumentException(); - } - else if (numberOfEventsUntilInterrupt > 0) { - this.broadcastInputReaders[i].setIterativeReader(); - iterativeBcInputs.add(i); - - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Broadcast input [" + i + "] reads in supersteps with [" + - + numberOfEventsUntilInterrupt + "] event(s) till next superstep.")); - } - } - } - this.iterativeBroadcastInputs = asArray(iterativeBcInputs); - } - - initLocalStrategies(numInputs); - } - catch (Exception e) { - throw new RuntimeException("Initializing the input processing failed" + - (e.getMessage() == null ? "." : ": " + e.getMessage()), e); - } - - if (!this.running) { - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Task cancelled before task code was started.")); - } - return; - } - - // pre main-function initialization - initialize(); - - // read the broadcast variables. they will be released in the finally clause - for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) { - final String name = this.config.getBroadcastInputName(i); - readAndSetBroadcastInput(i, name, this.runtimeUdfContext, 1 /* superstep one for the start */); - } - - // the work goes here - run(); - } - finally { - // clean up in any case! - closeLocalStrategiesAndCaches(); - - clearReaders(inputReaders); - clearWriters(eventualOutputs); - - } - - if (this.running) { - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Finished task code.")); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Task code cancelled.")); - } - } - } - - @Override - public void cancel() throws Exception { - this.running = false; - - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Cancelling task code")); - } - - try { - if (this.driver != null) { - this.driver.cancel(); - } - } finally { - closeLocalStrategiesAndCaches(); - } - } - - // -------------------------------------------------------------------------------------------- - // Main Work Methods - // -------------------------------------------------------------------------------------------- - - protected void initialize() throws Exception { - // create the operator - try { - this.driver.setup(this); - } - catch (Throwable t) { - throw new Exception("The driver setup for '" + this.getEnvironment().getTaskName() + - "' , caused an error: " + t.getMessage(), t); - } - - // instantiate the UDF - try { - final Class<? super S> userCodeFunctionType = this.driver.getStubType(); - // if the class is null, the driver has no user code - if (userCodeFunctionType != null) { - this.stub = initStub(userCodeFunctionType); - } - } catch (Exception e) { - throw new RuntimeException("Initializing the UDF" + - (e.getMessage() == null ? "." : ": " + e.getMessage()), e); - } - } - - protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep) throws IOException { - - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Setting broadcast variable '" + bcVarName + "'" + - (superstep > 1 ? ", superstep " + superstep : ""))); - } - - @SuppressWarnings("unchecked") - final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.broadcastInputSerializers[inputNum]; - - final MutableReader<?> reader = this.broadcastInputReaders[inputNum]; - - BroadcastVariableMaterialization<X, ?> variable = getEnvironment().getBroadcastVariableManager().materializeBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory); - context.setBroadcastVariable(bcVarName, variable); - } - - protected void releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Releasing broadcast variable '" + bcVarName + "'" + - (superstep > 1 ? ", superstep " + superstep : ""))); - } - - getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, superstep, this); - context.clearBroadcastVariable(bcVarName); - } - - - protected void run() throws Exception { - // ---------------------------- Now, the actual processing starts ------------------------ - // check for asynchronous canceling - if (!this.running) { - return; - } - - boolean stubOpen = false; - - try { - // run the data preparation - try { - this.driver.prepare(); - } - catch (Throwable t) { - // if the preparation caused an error, clean up - // errors during clean-up are swallowed, because we have already a root exception - throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskName() + - "' , caused an error: " + t.getMessage(), t); - } - - // check for canceling - if (!this.running) { - return; - } - - // start all chained tasks - RegularPactTask.openChainedTasks(this.chainedTasks, this); - - // open stub implementation - if (this.stub != null) { - try { - Configuration stubConfig = this.config.getStubParameters(); - FunctionUtils.openFunction(this.stub, stubConfig); - stubOpen = true; - } - catch (Throwable t) { - throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t); - } - } - - // run the user code - this.driver.run(); - - // close. We close here such that a regular close throwing an exception marks a task as failed. - if (this.running && this.stub != null) { - FunctionUtils.closeFunction(this.stub); - stubOpen = false; - } - - this.output.close(); - - // close all chained tasks letting them report failure - RegularPactTask.closeChainedTasks(this.chainedTasks, this); - } - catch (Exception ex) { - // close the input, but do not report any exceptions, since we already have another root cause - if (stubOpen) { - try { - FunctionUtils.closeFunction(this.stub); - } - catch (Throwable t) { - // do nothing - } - } - - // if resettable driver invoke teardown - if (this.driver instanceof ResettablePactDriver) { - final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver; - try { - resDriver.teardown(); - } catch (Throwable t) { - throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t); - } - } - - RegularPactTask.cancelChainedTasks(this.chainedTasks); - - ex = ExceptionInChainedStubException.exceptionUnwrap(ex); - - if (ex instanceof CancelTaskException) { - // forward canceling exception - throw ex; - } - else if (this.running) { - // throw only if task was not cancelled. in the case of canceling, exceptions are expected - RegularPactTask.logAndThrowException(ex, this); - } - } - finally { - this.driver.cleanup(); - } - } - - protected void closeLocalStrategiesAndCaches() { - - // make sure that all broadcast variable references held by this task are released - if (LOG.isDebugEnabled()) { - LOG.debug(formatLogString("Releasing all broadcast variables.")); - } - - getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this); - if (runtimeUdfContext != null) { - runtimeUdfContext.clearAllBroadcastVariables(); - } - - // clean all local strategies and caches/pipeline breakers. - - if (this.localStrategies != null) { - for (int i = 0; i < this.localStrategies.length; i++) { - if (this.localStrategies[i] != null) { - try { - this.localStrategies[i].close(); - } catch (Throwable t) { - LOG.error("Error closing local strategy for input " + i, t); - } - } - } - } - if (this.tempBarriers != null) { - for (int i = 0; i < this.tempBarriers.length; i++) { - if (this.tempBarriers[i] != null) { - try { - this.tempBarriers[i].close(); - } catch (Throwable t) { - LOG.error("Error closing temp barrier for input " + i, t); - } - } - } - } - if (this.resettableInputs != null) { - for (int i = 0; i < this.resettableInputs.length; i++) { - if (this.resettableInputs[i] != null) { - try { - this.resettableInputs[i].close(); - } catch (Throwable t) { - LOG.error("Error closing cache for input " + i, t); - } - } - } - } - } - - // -------------------------------------------------------------------------------------------- - // Task Setup and Teardown - // -------------------------------------------------------------------------------------------- - - /** - * @return the last output collector in the collector chain - */ - @SuppressWarnings("unchecked") - protected Collector<OT> getLastOutputCollector() { - int numChained = this.chainedTasks.size(); - return (numChained == 0) ? output : (Collector<OT>) chainedTasks.get(numChained - 1).getOutputCollector(); - } - - /** - * Sets the last output {@link Collector} of the collector chain of this {@link RegularPactTask}. - * <p> - * In case of chained tasks, the output collector of the last {@link ChainedDriver} is set. Otherwise it is the - * single collector of the {@link RegularPactTask}. - * - * @param newOutputCollector new output collector to set as last collector - */ - protected void setLastOutputCollector(Collector<OT> newOutputCollector) { - int numChained = this.chainedTasks.size(); - - if (numChained == 0) { - output = newOutputCollector; - return; - } - - chainedTasks.get(numChained - 1).setOutputCollector(newOutputCollector); - } - - public TaskConfig getLastTasksConfig() { - int numChained = this.chainedTasks.size(); - return (numChained == 0) ? config : chainedTasks.get(numChained - 1).getTaskConfig(); - } - - protected S initStub(Class<? super S> stubSuperClass) throws Exception { - try { - ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - S stub = config.<S>getStubWrapper(userCodeClassLoader).getUserCodeObject(stubSuperClass, userCodeClassLoader); - // check if the class is a subclass, if the check is required - if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) { - throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + - stubSuperClass.getName() + "' as is required."); - } - FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext); - return stub; - } - catch (ClassCastException ccex) { - throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex); - } - } - - /** - * Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}. - * - * This method requires that the task configuration, the driver, and the user-code class loader are set. - */ - protected void initInputReaders() throws Exception { - final int numInputs = getNumTaskInputs(); - final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs]; - - int currentReaderOffset = 0; - - AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); - - for (int i = 0; i < numInputs; i++) { - // ---------------- create the input readers --------------------- - // in case where a logical input unions multiple physical inputs, create a union reader - final int groupSize = this.config.getGroupSize(i); - - if (groupSize == 1) { - // non-union case - inputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset)); - } else if (groupSize > 1){ - // union case - InputGate[] readers = new InputGate[groupSize]; - for (int j = 0; j < groupSize; ++j) { - readers[j] = getEnvironment().getInputGate(currentReaderOffset + j); - } - inputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers)); - } else { - throw new Exception("Illegal input group size in task configuration: " + groupSize); - } - - inputReaders[i].setReporter(reporter); - - currentReaderOffset += groupSize; - } - this.inputReaders = inputReaders; - - // final sanity check - if (currentReaderOffset != this.config.getNumInputs()) { - throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent."); - } - } - - /** - * Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}. - * - * This method requires that the task configuration, the driver, and the user-code class loader are set. - */ - protected void initBroadcastInputReaders() throws Exception { - final int numBroadcastInputs = this.config.getNumBroadcastInputs(); - final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs]; - - int currentReaderOffset = config.getNumInputs(); - - for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) { - // ---------------- create the input readers --------------------- - // in case where a logical input unions multiple physical inputs, create a union reader - final int groupSize = this.config.getBroadcastGroupSize(i); - if (groupSize == 1) { - // non-union case - broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset)); - } else if (groupSize > 1){ - // union case - InputGate[] readers = new InputGate[groupSize]; - for (int j = 0; j < groupSize; ++j) { - readers[j] = getEnvironment().getInputGate(currentReaderOffset + j); - } - broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers)); - } else { - throw new Exception("Illegal input group size in task configuration: " + groupSize); - } - - currentReaderOffset += groupSize; - } - this.broadcastInputReaders = broadcastInputReaders; - } - - /** - * Creates all the serializers and comparators. - */ - protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception { - this.inputSerializers = new TypeSerializerFactory<?>[numInputs]; - this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null; - this.inputIterators = new MutableObjectIterator<?>[numInputs]; - - ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - - for (int i = 0; i < numInputs; i++) { - - final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, userCodeClassLoader); - this.inputSerializers[i] = serializerFactory; - - this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]); - } - - // ---------------- create the driver's comparators --------------------- - for (int i = 0; i < numComparators; i++) { - - if (this.inputComparators != null) { - final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, userCodeClassLoader); - this.inputComparators[i] = comparatorFactory.createComparator(); - } - } - } - - /** - * Creates all the serializers and iterators for the broadcast inputs. - */ - protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception { - this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs]; - - ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - - for (int i = 0; i < numBroadcastInputs; i++) { - // ---------------- create the serializer first --------------------- - final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader); - this.broadcastInputSerializers[i] = serializerFactory; - } - } - - /** - * - * NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and - * {@code #initInputSerializersAndComparators(int)}! - * - * @param numInputs - */ - protected void initLocalStrategies(int numInputs) throws Exception { - - final MemoryManager memMan = getMemoryManager(); - final IOManager ioMan = getIOManager(); - - this.localStrategies = new CloseableInputProvider<?>[numInputs]; - this.inputs = new MutableObjectIterator<?>[numInputs]; - this.excludeFromReset = new boolean[numInputs]; - this.inputIsCached = new boolean[numInputs]; - this.inputIsAsyncMaterialized = new boolean[numInputs]; - this.materializationMemory = new int[numInputs]; - - // set up the local strategies first, such that the can work before any temp barrier is created - for (int i = 0; i < numInputs; i++) { - initInputLocalStrategy(i); - } - - // we do another loop over the inputs, because we want to instantiate all - // sorters, etc before requesting the first input (as this call may block) - - // we have two types of materialized inputs, and both are replayable (can act as a cache) - // The first variant materializes in a different thread and hence - // acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed. - // the second variant spills to the side and will not read unless the result is also consumed - // in a pipelined fashion. - this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs]; - this.tempBarriers = new TempBarrier<?>[numInputs]; - - for (int i = 0; i < numInputs; i++) { - final int memoryPages; - final boolean async = this.config.isInputAsynchronouslyMaterialized(i); - final boolean cached = this.config.isInputCached(i); - - this.inputIsAsyncMaterialized[i] = async; - this.inputIsCached[i] = cached; - - if (async || cached) { - memoryPages = memMan.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i)); - if (memoryPages <= 0) { - throw new Exception("Input marked as materialized/cached, but no memory for materialization provided."); - } - this.materializationMemory[i] = memoryPages; - } else { - memoryPages = 0; - } - - if (async) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, memoryPages); - barrier.startReading(); - this.tempBarriers[i] = barrier; - this.inputs[i] = null; - } else if (cached) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - SpillingResettableMutableObjectIterator<?> iter = new SpillingResettableMutableObjectIterator( - getInput(i), this.inputSerializers[i].getSerializer(), getMemoryManager(), getIOManager(), memoryPages, this); - this.resettableInputs[i] = iter; - this.inputs[i] = iter; - } - } - } - - protected void resetAllInputs() throws Exception { - - // first we need to make sure that caches consume remaining data - // NOTE: we need to do this before closing the local strategies - for (int i = 0; i < this.inputs.length; i++) { - - if (this.inputIsCached[i] && this.resettableInputs[i] != null) { - this.resettableInputs[i].consumeAndCacheRemainingData(); - } - } - - // close all local-strategies. they will either get re-initialized, or we have - // read them now and their data is cached - for (int i = 0; i < this.localStrategies.length; i++) { - if (this.localStrategies[i] != null) { - this.localStrategies[i].close(); - this.localStrategies[i] = null; - } - } - - final MemoryManager memMan = getMemoryManager(); - final IOManager ioMan = getIOManager(); - - // reset the caches, or re-run the input local strategy - for (int i = 0; i < this.inputs.length; i++) { - if (this.excludeFromReset[i]) { - if (this.tempBarriers[i] != null) { - this.tempBarriers[i].close(); - this.tempBarriers[i] = null; - } else if (this.resettableInputs[i] != null) { - this.resettableInputs[i].close(); - this.resettableInputs[i] = null; - } - } else { - // make sure the input is not available directly, but are lazily fetched again - this.inputs[i] = null; - - if (this.inputIsCached[i]) { - if (this.tempBarriers[i] != null) { - this.inputs[i] = this.tempBarriers[i].getIterator(); - } else if (this.resettableInputs[i] != null) { - this.resettableInputs[i].reset(); - this.inputs[i] = this.resettableInputs[i]; - } else { - throw new RuntimeException("Found a resettable input, but no temp barrier and no resettable iterator."); - } - } else { - // close the async barrier if there is one - if (this.tempBarriers[i] != null) { - this.tempBarriers[i].close(); - } - - // recreate the local strategy - initInputLocalStrategy(i); - - if (this.inputIsAsyncMaterialized[i]) { - final int pages = this.materializationMemory[i]; - @SuppressWarnings({ "unchecked", "rawtypes" }) - TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, pages); - barrier.startReading(); - this.tempBarriers[i] = barrier; - this.inputs[i] = null; - } - } - } - } - } - - protected void excludeFromReset(int inputNum) { - this.excludeFromReset[inputNum] = true; - } - - private void initInputLocalStrategy(int inputNum) throws Exception { - // check if there is already a strategy - if (this.localStrategies[inputNum] != null) { - throw new IllegalStateException(); - } - - // now set up the local strategy - final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum); - if (localStrategy != null) { - switch (localStrategy) { - case NONE: - // the input is as it is - this.inputs[inputNum] = this.inputIterators[inputNum]; - break; - case SORT: - @SuppressWarnings({ "rawtypes", "unchecked" }) - UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(), - this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), - this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled()); - // set the input to null such that it will be lazily fetched from the input strategy - this.inputs[inputNum] = null; - this.localStrategies[inputNum] = sorter; - break; - case COMBININGSORT: - // sanity check this special case! - // this still breaks a bit of the abstraction! - // we should have nested configurations for the local strategies to solve that - if (inputNum != 0) { - throw new IllegalStateException("Performing combining sort outside a (group)reduce task!"); - } - - // instantiate ourselves a combiner. we should not use the stub, because the sort and the - // subsequent (group)reduce would otherwise share it multi-threaded - final Class<S> userCodeFunctionType = this.driver.getStubType(); - if (userCodeFunctionType == null) { - throw new IllegalStateException("Performing combining sort outside a reduce task!"); - } - final S localStub; - try { - localStub = initStub(userCodeFunctionType); - } catch (Exception e) { - throw new RuntimeException("Initializing the user code and the configuration failed" + - (e.getMessage() == null ? "." : ": " + e.getMessage()), e); - } - - if (!(localStub instanceof GroupCombineFunction)) { - throw new IllegalStateException("Performing combining sort outside a reduce task!"); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger( - (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], - this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), - this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled()); - cSorter.setUdfConfiguration(this.config.getStubParameters()); - - // set the input to null such that it will be lazily fetched from the input strategy - this.inputs[inputNum] = null; - this.localStrategies[inputNum] = cSorter; - break; - default: - throw new Exception("Unrecognized local strategy provided: " + localStrategy.name()); - } - } else { - // no local strategy in the config - this.inputs[inputNum] = this.inputIterators[inputNum]; - } - } - - private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception { - TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, getUserCodeClassLoader()); - if (compFact == null) { - throw new Exception("Missing comparator factory for local strategy on input " + inputNum); - } - return compFact.createComparator(); - } - - protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) { - @SuppressWarnings("unchecked") - MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader; - @SuppressWarnings({ "unchecked", "rawtypes" }) - final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer()); - return iter; - } - - protected int getNumTaskInputs() { - return this.driver.getNumberOfInputs(); - } - - /** - * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers. - * The output collector applies the configured shipping strategies for each writer. - */ - protected void initOutputs() throws Exception { - this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>(); - this.eventualOutputs = new ArrayList<RecordWriter<?>>(); - - ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - - AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); - - this.accumulatorMap = accumulatorRegistry.getUserMap(); - - this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, - this.getExecutionConfig(), reporter, this.accumulatorMap); - } - - public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { - Environment env = getEnvironment(); - - return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), - env.getDistributedCacheEntries(), this.accumulatorMap); - } - - // -------------------------------------------------------------------------------------------- - // Task Context Signature - // ------------------------------------------------------------------------------------------- - - @Override - public TaskConfig getTaskConfig() { - return this.config; - } - - @Override - public TaskManagerRuntimeInfo getTaskManagerInfo() { - return getEnvironment().getTaskManagerInfo(); - } - - @Override - public MemoryManager getMemoryManager() { - return getEnvironment().getMemoryManager(); - } - - @Override - public IOManager getIOManager() { - return getEnvironment().getIOManager(); - } - - @Override - public S getStub() { - return this.stub; - } - - @Override - public Collector<OT> getOutputCollector() { - return this.output; - } - - @Override - public AbstractInvokable getOwningNepheleTask() { - return this; - } - - @Override - public String formatLogString(String message) { - return constructLogString(message, getEnvironment().getTaskName(), this); - } - - @Override - public <X> MutableObjectIterator<X> getInput(int index) { - if (index < 0 || index > this.driver.getNumberOfInputs()) { - throw new IndexOutOfBoundsException(); - } - - // check for lazy assignment from input strategies - if (this.inputs[index] != null) { - @SuppressWarnings("unchecked") - MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index]; - return in; - } else { - final MutableObjectIterator<X> in; - try { - if (this.tempBarriers[index] != null) { - @SuppressWarnings("unchecked") - MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.tempBarriers[index].getIterator(); - in = iter; - } else if (this.localStrategies[index] != null) { - @SuppressWarnings("unchecked") - MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator(); - in = iter; - } else { - throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy."); - } - this.inputs[index] = in; - return in; - } catch (InterruptedException iex) { - throw new RuntimeException("Interrupted while waiting for input " + index + " to become available."); - } catch (IOException ioex) { - throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + "."); - } - } - } - - - @Override - public <X> TypeSerializerFactory<X> getInputSerializer(int index) { - if (index < 0 || index >= this.driver.getNumberOfInputs()) { - throw new IndexOutOfBoundsException(); - } - - @SuppressWarnings("unchecked") - final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index]; - return serializerFactory; - } - - - @Override - public <X> TypeComparator<X> getDriverComparator(int index) { - if (this.inputComparators == null) { - throw new IllegalStateException("Comparators have not been created!"); - } - else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) { - throw new IndexOutOfBoundsException(); - } - - @SuppressWarnings("unchecked") - final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index]; - return comparator; - } - - // ============================================================================================ - // Static Utilities - // - // Utilities are consolidated here to ensure a uniform way of running, - // logging, exception handling, and error messages. - // ============================================================================================ - - // -------------------------------------------------------------------------------------------- - // Logging - // -------------------------------------------------------------------------------------------- - /** - * Utility function that composes a string for logging purposes. The string includes the given message, - * the given name of the task and the index in its subtask group as well as the number of instances - * that exist in its subtask group. - * - * @param message The main message for the log. - * @param taskName The name of the task. - * @param parent The nephele task that contains the code producing the message. - * - * @return The string for logging. - */ - public static String constructLogString(String message, String taskName, AbstractInvokable parent) { - return message + ": " + taskName + " (" + (parent.getEnvironment().getIndexInSubtaskGroup() + 1) + - '/' + parent.getEnvironment().getNumberOfSubtasks() + ')'; - } - - /** - * Prints an error message and throws the given exception. If the exception is of the type - * {@link ExceptionInChainedStubException} then the chain of contained exceptions is followed - * until an exception of a different type is found. - * - * @param ex The exception to be thrown. - * @param parent The parent task, whose information is included in the log message. - * @throws Exception Always thrown. - */ - public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception { - String taskName; - if (ex instanceof ExceptionInChainedStubException) { - do { - ExceptionInChainedStubException cex = (ExceptionInChainedStubException) ex; - taskName = cex.getTaskName(); - ex = cex.getWrappedException(); - } while (ex instanceof ExceptionInChainedStubException); - } else { - taskName = parent.getEnvironment().getTaskName(); - } - - if (LOG.isErrorEnabled()) { - LOG.error(constructLogString("Error in task code", taskName, parent), ex); - } - - throw ex; - } - - // -------------------------------------------------------------------------------------------- - // Result Shipping and Chained Tasks - // -------------------------------------------------------------------------------------------- - - /** - * Creates the {@link Collector} for the given task, as described by the given configuration. The - * output collector contains the writers that forward the data to the different tasks that the given task - * is connected to. Each writer applies a the partitioning as described in the configuration. - * - * @param task The task that the output collector is created for. - * @param config The configuration describing the output shipping strategies. - * @param cl The classloader used to load user defined types. - * @param eventualOutputs The output writers that this task forwards to the next task for each output. - * @param outputOffset The offset to start to get the writers for the outputs - * @param numOutputs The number of outputs described in the configuration. - * - * @return The OutputCollector that data produced in this task is submitted to. - */ - public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, - List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception - { - if (numOutputs == 0) { - return null; - } - - // get the factory for the serializer - final TypeSerializerFactory<T> serializerFactory = config.getOutputSerializer(cl); - - // special case the Record - if (serializerFactory.getDataType().equals(Record.class)) { - final List<RecordWriter<Record>> writers = new ArrayList<RecordWriter<Record>>(numOutputs); - - // create a writer for each output - for (int i = 0; i < numOutputs; i++) { - // create the OutputEmitter from output ship strategy - final ShipStrategyType strategy = config.getOutputShipStrategy(i); - final TypeComparatorFactory<?> compFact = config.getOutputComparator(i, cl); - final RecordOutputEmitter oe; - if (compFact == null) { - oe = new RecordOutputEmitter(strategy); - } else { - @SuppressWarnings("unchecked") - TypeComparator<Record> comparator = (TypeComparator<Record>) compFact.createComparator(); - if (!comparator.supportsCompareAgainstReference()) { - throw new Exception("Incompatibe serializer-/comparator factories."); - } - final DataDistribution distribution = config.getOutputDataDistribution(i, cl); - final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl); - - oe = new RecordOutputEmitter(strategy, comparator, partitioner, distribution); - } - - // setup accumulator counters - final RecordWriter<Record> recordWriter = new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe); - recordWriter.setReporter(reporter); - - writers.add(recordWriter); - } - if (eventualOutputs != null) { - eventualOutputs.addAll(writers); - } - - @SuppressWarnings("unchecked") - final Collector<T> outColl = (Collector<T>) new RecordOutputCollector(writers); - return outColl; - } - else { - // generic case - final List<RecordWriter<SerializationDelegate<T>>> writers = new ArrayList<RecordWriter<SerializationDelegate<T>>>(numOutputs); - - // create a writer for each output - for (int i = 0; i < numOutputs; i++) - { - // create the OutputEmitter from output ship strategy - final ShipStrategyType strategy = config.getOutputShipStrategy(i); - final TypeComparatorFactory<T> compFactory = config.getOutputComparator(i, cl); - - final ChannelSelector<SerializationDelegate<T>> oe; - if (compFactory == null) { - oe = new OutputEmitter<T>(strategy); - } - else { - final DataDistribution dataDist = config.getOutputDataDistribution(i, cl); - final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl); - - final TypeComparator<T> comparator = compFactory.createComparator(); - oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist); - } - - final RecordWriter<SerializationDelegate<T>> recordWriter = - new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe); - - // setup live accumulator counters - recordWriter.setReporter(reporter); - - writers.add(recordWriter); - } - if (eventualOutputs != null) { - eventualOutputs.addAll(writers); - } - return new OutputCollector<T>(writers, serializerFactory.getSerializer()); - } - } - - /** - * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers. - * The output collector applies the configured shipping strategy. - */ - @SuppressWarnings("unchecked") - public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config, - List<ChainedDriver<?, ?>> chainedTasksTarget, - List<RecordWriter<?>> eventualOutputs, - ExecutionConfig executionConfig, - AccumulatorRegistry.Reporter reporter, - Map<String, Accumulator<?,?>> accumulatorMap) - throws Exception - { - final int numOutputs = config.getNumOutputs(); - - // check whether we got any chained tasks - final int numChained = config.getNumberOfChainedStubs(); - if (numChained > 0) { - // got chained stubs. that means that this one may only have a single forward connection - if (numOutputs != 1 || config.getOutputShipStrategy(0) != ShipStrategyType.FORWARD) { - throw new RuntimeException("Plan Generation Bug: Found a chained stub that is not connected via an only forward connection."); - } - - // instantiate each task - @SuppressWarnings("rawtypes") - Collector previous = null; - for (int i = numChained - 1; i >= 0; --i) - { - // get the task first - final ChainedDriver<?, ?> ct; - try { - Class<? extends ChainedDriver<?, ?>> ctc = config.getChainedTask(i); - ct = ctc.newInstance(); - } - catch (Exception ex) { - throw new RuntimeException("Could not instantiate chained task driver.", ex); - } - - // get the configuration for the task - final TaskConfig chainedStubConf = config.getChainedStubConfig(i); - final String taskName = config.getChainedTaskName(i); - - if (i == numChained - 1) { - // last in chain, instantiate the output collector for this task - previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter); - } - - ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap); - chainedTasksTarget.add(0, ct); - - previous = ct; - } - // the collector of the first in the chain is the collector for the nephele task - return (Collector<T>) previous; - } - // else - - // instantiate the output collector the default way from this configuration - return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter); - } - - // -------------------------------------------------------------------------------------------- - // User Code LifeCycle - // -------------------------------------------------------------------------------------------- - - /** - * Opens the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. If the open call produces - * an exception, a new exception with a standard error message is created, using the encountered exception - * as its cause. - * - * @param stub The user code instance to be opened. - * @param parameters The parameters supplied to the user code. - * - * @throws Exception Thrown, if the user code's open method produces an exception. - */ - public static void openUserCode(Function stub, Configuration parameters) throws Exception { - try { - FunctionUtils.openFunction(stub, parameters); - } catch (Throwable t) { - throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t); - } - } - - /** - * Closes the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#close()} method. If the close call produces - * an exception, a new exception with a standard error message is created, using the encountered exception - * as its cause. - * - * @param stub The user code instance to be closed. - * - * @throws Exception Thrown, if the user code's close method produces an exception. - */ - public static void closeUserCode(Function stub) throws Exception { - try { - FunctionUtils.closeFunction(stub); - } catch (Throwable t) { - throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t); - } - } - - // -------------------------------------------------------------------------------------------- - // Chained Task LifeCycle - // -------------------------------------------------------------------------------------------- - - /** - * Opens all chained tasks, in the order as they are stored in the array. The opening process - * creates a standardized log info message. - * - * @param tasks The tasks to be opened. - * @param parent The parent task, used to obtain parameters to include in the log message. - * @throws Exception Thrown, if the opening encounters an exception. - */ - public static void openChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception { - // start all chained tasks - for (int i = 0; i < tasks.size(); i++) { - final ChainedDriver<?, ?> task = tasks.get(i); - if (LOG.isDebugEnabled()) { - LOG.debug(constructLogString("Start task code", task.getTaskName(), parent)); - } - task.openTask(); - } - } - - /** - * Closes all chained tasks, in the order as they are stored in the array. The closing process - * creates a standardized log info message. - * - * @param tasks The tasks to be closed. - * @param parent The parent task, used to obtain parameters to include in the log message. - * @throws Exception Thrown, if the closing encounters an exception. - */ - public static void closeChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception { - for (int i = 0; i < tasks.size(); i++) { - final ChainedDriver<?, ?> task = tasks.get(i); - task.closeTask(); - - if (LOG.isDebugEnabled()) { - LOG.debug(constructLogString("Finished task code", task.getTaskName(), parent)); - } - } - } - - /** - * Cancels all tasks via their {@link ChainedDriver#cancelTask()} method. Any occurring exception - * and error is suppressed, such that the canceling method of every task is invoked in all cases. - * - * @param tasks The tasks to be canceled. - */ - public static void cancelChainedTasks(List<ChainedDriver<?, ?>> tasks) { - for (int i = 0; i < tasks.size(); i++) { - try { - tasks.get(i).cancelTask(); - } catch (Throwable t) { - // do nothing - } - } - } - - // -------------------------------------------------------------------------------------------- - // Miscellaneous Utilities - // -------------------------------------------------------------------------------------------- - - /** - * Instantiates a user code class from is definition in the task configuration. - * The class is instantiated without arguments using the null-ary constructor. Instantiation - * will fail if this constructor does not exist or is not public. - * - * @param <T> The generic type of the user code class. - * @param config The task configuration containing the class description. - * @param cl The class loader to be used to load the class. - * @param superClass The super class that the user code class extends or implements, for type checking. - * - * @return An instance of the user code class. - */ - public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass) { - try { - T stub = config.<T>getStubWrapper(cl).getUserCodeObject(superClass, cl); - // check if the class is a subclass, if the check is required - if (superClass != null && !superClass.isAssignableFrom(stub.getClass())) { - throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + - superClass.getName() + "' as is required."); - } - return stub; - } - catch (ClassCastException ccex) { - throw new RuntimeException("The UDF class is not a proper subclass of " + superClass.getName(), ccex); - } - } - - private static int[] asArray(List<Integer> list) { - int[] a = new int[list.size()]; - - int i = 0; - for (int val : list) { - a[i++] = val; - } - return a; - } - - public static void clearWriters(List<RecordWriter<?>> writers) { - for (RecordWriter<?> writer : writers) { - writer.clearBuffers(); - } - } - - public static void clearReaders(MutableReader<?>[] readers) { - for (MutableReader<?> reader : readers) { - reader.clearBuffers(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java new file mode 100644 index 0000000..0ca7994 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators; + +import org.apache.flink.api.common.functions.Function; + + +/** + * This interface marks a {@code Driver} as resettable, meaning that will reset part of their internal state but + * otherwise reuse existing data structures. + * + * @see Driver + * @see TaskContext + * + * @param <S> The type of stub driven by this driver. + * @param <OT> The data type of the records produced by this driver. + */ +public interface ResettableDriver<S extends Function, OT> extends Driver<S, OT> { + + boolean isInputResettable(int inputNum); + + void initialize() throws Exception; + + void reset() throws Exception; + + void teardown() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java deleted file mode 100644 index 85cde1b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.functions.Function; - - -/** - * This interface marks a {@code PactDriver} as resettable, meaning that will reset part of their internal state but - * otherwise reuse existing data structures. - * - * @see PactDriver - * @see PactTaskContext - * - * @param <S> The type of stub driven by this driver. - * @param <OT> The data type of the records produced by this driver. - */ -public interface ResettablePactDriver<S extends Function, OT> extends PactDriver<S, OT> { - - boolean isInputResettable(int inputNum); - - void initialize() throws Exception; - - void reset() throws Exception; - - void teardown() throws Exception; -}