Repository: flink Updated Branches: refs/heads/master e494c2795 -> b08669abf
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java new file mode 100644 index 0000000..fd5d238 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + + +/** + * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to + * the runtime components and configuration that they can use to fulfil their task. + * + * @param <S> The UDF type. + * @param <OT> The produced data type. + * + * @see Driver + */ +public interface TaskContext<S, OT> { + + TaskConfig getTaskConfig(); + + TaskManagerRuntimeInfo getTaskManagerInfo(); + + ClassLoader getUserCodeClassLoader(); + + MemoryManager getMemoryManager(); + + IOManager getIOManager(); + + <X> MutableObjectIterator<X> getInput(int index); + + <X> TypeSerializerFactory<X> getInputSerializer(int index); + + <X> TypeComparator<X> getDriverComparator(int index); + + S getStub(); + + ExecutionConfig getExecutionConfig(); + + Collector<OT> getOutputCollector(); + + AbstractInvokable getOwningNepheleTask(); + + String formatLogString(String message); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java index 098686c..4791761 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java @@ -22,18 +22,18 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public class UnionWithTempOperator<T> implements PactDriver<Function, T> { +public class UnionWithTempOperator<T> implements Driver<Function, T> { private static final int CACHED_INPUT = 0; private static final int STREAMED_INPUT = 1; - private PactTaskContext<Function, T> taskContext; + private TaskContext<Function, T> taskContext; private volatile boolean running; @Override - public void setup(PactTaskContext<Function, T> context) { + public void setup(TaskContext<Function, T> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java index 4641fce..46ee41b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,7 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> { @Override public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") - final ReduceFunction<IT> red = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class); + final ReduceFunction<IT> red = BatchTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class); this.reducer = red; FunctionUtils.setFunctionRuntimeContext(red, getUdfRuntimeContext()); @@ -56,12 +56,12 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> { @Override public void openTask() throws Exception { Configuration stubConfig = this.config.getStubParameters(); - RegularPactTask.openUserCode(this.reducer, stubConfig); + BatchTask.openUserCode(this.reducer, stubConfig); } @Override public void closeTask() throws Exception { - RegularPactTask.closeUserCode(this.reducer); + BatchTask.closeUserCode(this.reducer); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java index 482103c..8900ed7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; @SuppressWarnings("deprecation") public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @@ -35,7 +35,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> { public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") final GenericCollectorMap<IT, OT> mapper = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class); + BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class); this.mapper = mapper; mapper.setRuntimeContext(getUdfRuntimeContext()); } @@ -43,12 +43,12 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void openTask() throws Exception { Configuration stubConfig = this.config.getStubParameters(); - RegularPactTask.openUserCode(this.mapper, stubConfig); + BatchTask.openUserCode(this.mapper, stubConfig); } @Override public void closeTask() throws Exception { - RegularPactTask.closeUserCode(this.mapper); + BatchTask.closeUserCode(this.mapper); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index ea6cfe3..6edeb84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; @@ -32,7 +32,7 @@ import org.apache.flink.util.Collector; import java.util.Map; /** - * The interface to be implemented by drivers that do not run in an own pact task context, but are chained to other + * The interface to be implemented by drivers that do not run in an own task context, but are chained to other * tasks. */ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { @@ -63,8 +63,8 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { Environment env = parent.getEnvironment(); - if (parent instanceof RegularPactTask) { - this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName); + if (parent instanceof BatchTask) { + this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(taskName); } else { this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(), http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java index bc3b6a1..f51cb68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @@ -36,7 +36,7 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") final FlatMapFunction<IT, OT> mapper = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class); + BatchTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class); this.mapper = mapper; FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext()); } @@ -44,12 +44,12 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void openTask() throws Exception { Configuration stubConfig = this.config.getStubParameters(); - RegularPactTask.openUserCode(this.mapper, stubConfig); + BatchTask.openUserCode(this.mapper, stubConfig); } @Override public void closeTask() throws Exception { - RegularPactTask.closeUserCode(this.mapper); + BatchTask.closeUserCode(this.mapper); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java index db192df..9b888f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @@ -36,7 +36,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") final MapFunction<IT, OT> mapper = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class); + BatchTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class); this.mapper = mapper; FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext()); } @@ -44,12 +44,12 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void openTask() throws Exception { Configuration stubConfig = this.config.getStubParameters(); - RegularPactTask.openUserCode(this.mapper, stubConfig); + BatchTask.openUserCode(this.mapper, stubConfig); } @Override public void closeTask() throws Exception { - RegularPactTask.closeUserCode(this.mapper); + BatchTask.closeUserCode(this.mapper); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java index cf0fc85..4a04fb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java @@ -29,7 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; import org.apache.flink.runtime.operators.sort.InMemorySorter; import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; @@ -87,7 +87,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> { @SuppressWarnings("unchecked") final GroupReduceFunction<IN, OUT> combiner = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupReduceFunction.class); + BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GroupReduceFunction.class); this.reducer = combiner; FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext()); } @@ -96,7 +96,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> { public void openTask() throws Exception { // open the stub first final Configuration stubConfig = this.config.getStubParameters(); - RegularPactTask.openUserCode(this.reducer, stubConfig); + BatchTask.openUserCode(this.reducer, stubConfig); // ----------------- Set up the sorter ------------------------- @@ -135,7 +135,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> { this.parent.getEnvironment().getMemoryManager().release(this.memory); if (this.running) { - RegularPactTask.closeUserCode(this.reducer); + BatchTask.closeUserCode(this.reducer); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java index da9698c..408abc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java @@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; import org.apache.flink.runtime.operators.sort.InMemorySorter; import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; @@ -87,7 +87,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, @SuppressWarnings("unchecked") final GroupCombineFunction<IN, OUT> combiner = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class); + BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class); this.combiner = combiner; FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext()); } @@ -96,7 +96,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, public void openTask() throws Exception { // open the stub first final Configuration stubConfig = this.config.getStubParameters(); - RegularPactTask.openUserCode(this.combiner, stubConfig); + BatchTask.openUserCode(this.combiner, stubConfig); // ----------------- Set up the sorter ------------------------- @@ -134,7 +134,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, this.parent.getEnvironment().getMemoryManager().release(this.memory); if (this.running) { - RegularPactTask.closeUserCode(this.combiner); + BatchTask.closeUserCode(this.combiner); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index 6c97097..0254c8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -48,7 +48,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.PactDriver; +import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.types.Value; @@ -307,11 +307,11 @@ public class TaskConfig implements Serializable { // Driver // -------------------------------------------------------------------------------------------- - public void setDriver(@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver) { + public void setDriver(@SuppressWarnings("rawtypes") Class<? extends Driver> driver) { this.config.setString(DRIVER_CLASS, driver.getName()); } - public <S extends Function, OT> Class<? extends PactDriver<S, OT>> getDriver() { + public <S extends Function, OT> Class<? extends Driver<S, OT>> getDriver() { final String className = this.config.getString(DRIVER_CLASS, null); if (className == null) { throw new CorruptConfigurationException("The pact driver class is missing."); @@ -319,7 +319,7 @@ public class TaskConfig implements Serializable { try { @SuppressWarnings("unchecked") - final Class<PactDriver<S, OT>> pdClazz = (Class<PactDriver<S, OT>>) (Class<?>) PactDriver.class; + final Class<Driver<S, OT>> pdClazz = (Class<Driver<S, OT>>) (Class<?>) Driver.class; return Class.forName(className).asSubclass(pdClazz); } catch (ClassNotFoundException cnfex) { throw new CorruptConfigurationException("The given driver class cannot be found."); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 3a36fe8..58755f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; @@ -47,7 +47,7 @@ public class TaskDeploymentDescriptorTest { final int currentNumberOfSubtasks = 1; final Configuration jobConfiguration = new Configuration(); final Configuration taskConfiguration = new Configuration(); - final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class; + final Class<? extends AbstractInvokable> invokableClass = BatchTask.class; final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0); final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0); final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index e3fc852..bea7c22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; @@ -69,10 +69,10 @@ public class ExecutionGraphDeploymentTest { v3.setParallelism(10); v4.setParallelism(10); - v1.setInvokableClass(RegularPactTask.class); - v2.setInvokableClass(RegularPactTask.class); - v3.setInvokableClass(RegularPactTask.class); - v4.setInvokableClass(RegularPactTask.class); + v1.setInvokableClass(BatchTask.class); + v2.setInvokableClass(BatchTask.class); + v3.setInvokableClass(BatchTask.class); + v4.setInvokableClass(BatchTask.class); v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); @@ -111,7 +111,7 @@ public class ExecutionGraphDeploymentTest { assertEquals(jid2, descr.getVertexID()); assertEquals(3, descr.getIndexInSubtaskGroup()); assertEquals(10, descr.getNumberOfSubtasks()); - assertEquals(RegularPactTask.class.getName(), descr.getInvokableClassName()); + assertEquals(BatchTask.class.getName(), descr.getInvokableClassName()); assertEquals("v2", descr.getTaskName()); List<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions(); @@ -276,8 +276,8 @@ public class ExecutionGraphDeploymentTest { v1.setParallelism(dop1); v2.setParallelism(dop2); - v1.setInvokableClass(RegularPactTask.class); - v2.setInvokableClass(RegularPactTask.class); + v1.setInvokableClass(BatchTask.class); + v2.setInvokableClass(BatchTask.class); // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java index 88a71c4..1f19699 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java @@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.CollectorMapDriver; import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.RegularPactTask; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub; import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -102,8 +102,8 @@ public class ChainTaskTest extends TaskTestBase { // chained map+combine { - RegularPactTask<GenericCollectorMap<Record, Record>, Record> testTask = - new RegularPactTask<GenericCollectorMap<Record, Record>, Record>(); + BatchTask<GenericCollectorMap<Record, Record>, Record> testTask = + new BatchTask<GenericCollectorMap<Record, Record>, Record>(); registerTask(testTask, CollectorMapDriver.class, MockMapStub.class); try { @@ -163,8 +163,8 @@ public class ChainTaskTest extends TaskTestBase { // chained map+combine { - final RegularPactTask<GenericCollectorMap<Record, Record>, Record> testTask = - new RegularPactTask<GenericCollectorMap<Record, Record>, Record>(); + final BatchTask<GenericCollectorMap<Record, Record>, Record> testTask = + new BatchTask<GenericCollectorMap<Record, Record>, Record>(); super.registerTask(testTask, CollectorMapDriver.class, MockMapStub.class); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 0a02f30..9be957a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -29,14 +29,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.PactTaskContext; +import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public class TestTaskContext<S, T> implements PactTaskContext<S, T> { +public class TestTaskContext<S, T> implements TaskContext<S, T> { private final AbstractInvokable owner = new DummyInvokable(); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 5136aea..7043a63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -30,9 +30,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.PactDriver; -import org.apache.flink.runtime.operators.PactTaskContext; -import org.apache.flink.runtime.operators.ResettablePactDriver; +import org.apache.flink.runtime.operators.Driver; +import org.apache.flink.runtime.operators.TaskContext; +import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -51,7 +51,7 @@ import java.util.LinkedList; import java.util.List; @RunWith(Parameterized.class) -public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements PactTaskContext<S, OUT> { +public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements TaskContext<S, OUT> { protected static final int PAGE_SIZE = 32 * 1024; @@ -81,7 +81,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog private S stub; - private PactDriver<S, IN> driver; + private Driver<S, IN> driver; private volatile boolean running = true; @@ -176,12 +176,12 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog } @SuppressWarnings("rawtypes") - public void testDriver(PactDriver driver, Class stubClass) throws Exception { + public void testDriver(Driver driver, Class stubClass) throws Exception { testDriverInternal(driver, stubClass); } @SuppressWarnings({"unchecked", "rawtypes"}) - public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception { + public void testDriverInternal(Driver driver, Class stubClass) throws Exception { this.driver = driver; driver.setup(this); @@ -232,8 +232,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog } // if resettable driver invoke tear down - if (this.driver instanceof ResettablePactDriver) { - final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver; + if (this.driver instanceof ResettableDriver) { + final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver; try { resDriver.teardown(); } catch (Throwable t) { @@ -252,7 +252,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog } @SuppressWarnings({"unchecked", "rawtypes"}) - public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception { + public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception { driver.setup(this); for (int i = 0; i < iterations; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 116fdec..c442940 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -38,9 +39,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.PactDriver; -import org.apache.flink.runtime.operators.PactTaskContext; -import org.apache.flink.runtime.operators.ResettablePactDriver; +import org.apache.flink.runtime.operators.TaskContext; +import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.types.Record; @@ -51,7 +51,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @RunWith(Parameterized.class) -public class DriverTestBase<S extends Function> extends TestLogger implements PactTaskContext<S, Record> { +public class DriverTestBase<S extends Function> extends TestLogger implements TaskContext<S, Record> { protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024; @@ -83,7 +83,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa private S stub; - private PactDriver<S, Record> driver; + private Driver<S, Record> driver; private volatile boolean running = true; @@ -168,12 +168,12 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa } @SuppressWarnings("rawtypes") - public void testDriver(PactDriver driver, Class stubClass) throws Exception { + public void testDriver(Driver driver, Class stubClass) throws Exception { testDriverInternal(driver, stubClass); } @SuppressWarnings({"unchecked","rawtypes"}) - public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception { + public void testDriverInternal(Driver driver, Class stubClass) throws Exception { this.driver = driver; driver.setup(this); @@ -226,8 +226,8 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa } // if resettable driver invoke tear down - if (this.driver instanceof ResettablePactDriver) { - final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver; + if (this.driver instanceof ResettableDriver) { + final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver; try { resDriver.teardown(); } catch (Throwable t) { @@ -247,7 +247,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa } @SuppressWarnings({"unchecked","rawtypes"}) - public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception { + public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception { driver.setup(this); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index 4662762..777bfc8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -30,7 +30,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.PactDriver; +import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.types.Record; @@ -89,7 +89,7 @@ public abstract class TaskTestBase extends TestLogger { } public void registerTask(AbstractInvokable task, - @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, + @SuppressWarnings("rawtypes") Class<? extends Driver> driver, Class<? extends RichFunction> stubClass) { final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration()); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index e2b2430..886c881 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -30,9 +30,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.PactDriver; -import org.apache.flink.runtime.operators.PactTaskContext; -import org.apache.flink.runtime.operators.ResettablePactDriver; +import org.apache.flink.runtime.operators.Driver; +import org.apache.flink.runtime.operators.TaskContext; +import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -51,7 +51,7 @@ import java.util.Collection; import java.util.List; @RunWith(Parameterized.class) -public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements PactTaskContext<S, OUT> { +public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements TaskContext<S, OUT> { protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024; @@ -85,7 +85,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg private S stub; - private PactDriver<S, OUT> driver; + private Driver<S, OUT> driver; private volatile boolean running; @@ -170,12 +170,12 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg } @SuppressWarnings("rawtypes") - public void testDriver(PactDriver driver, Class stubClass) throws Exception { + public void testDriver(Driver driver, Class stubClass) throws Exception { testDriverInternal(driver, stubClass); } @SuppressWarnings({"unchecked","rawtypes"}) - public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception { + public void testDriverInternal(Driver driver, Class stubClass) throws Exception { this.driver = driver; driver.setup(this); @@ -227,8 +227,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg } // if resettable driver invoke tear-down - if (this.driver instanceof ResettablePactDriver) { - final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver; + if (this.driver instanceof ResettableDriver) { + final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver; try { resDriver.teardown(); } catch (Throwable t) { @@ -248,7 +248,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg } @SuppressWarnings({"unchecked","rawtypes"}) - public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception { + public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception { driver.setup(this); for (int i = 0; i < iterations; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/logback-test.xml b/flink-runtime/src/test/resources/logback-test.xml index 17f7020..1d64d46 100644 --- a/flink-runtime/src/test/resources/logback-test.xml +++ b/flink-runtime/src/test/resources/logback-test.xml @@ -31,7 +31,7 @@ throw error to test failing scenarios. Logging those would overflow the log. --> <!----> <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/> - <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/> + <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/> <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/> <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/> <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/> http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-ml/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/resources/logback-test.xml b/flink-staging/flink-ml/src/test/resources/logback-test.xml index 17f7020..1d64d46 100644 --- a/flink-staging/flink-ml/src/test/resources/logback-test.xml +++ b/flink-staging/flink-ml/src/test/resources/logback-test.xml @@ -31,7 +31,7 @@ throw error to test failing scenarios. Logging those would overflow the log. --> <!----> <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/> - <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/> + <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/> <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/> <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/> <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/> http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java index b117bab..287129d 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.operators.PactDriver; +import org.apache.flink.runtime.operators.Driver; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.tez.util.EncodingUtils; import org.apache.flink.util.InstantiationUtil; @@ -93,8 +93,8 @@ public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOP this.inputs = inputs; this.outputs = outputs; - final Class<? extends PactDriver<S, OT>> driverClass = this.task.getTaskConfig().getDriver(); - PactDriver<S,OT> driver = InstantiationUtil.instantiate(driverClass, PactDriver.class); + final Class<? extends Driver<S, OT>> driverClass = this.task.getTaskConfig().getDriver(); + Driver<S,OT> driver = InstantiationUtil.instantiate(driverClass, Driver.class); this.numInputs = driver.getNumberOfInputs(); this.numOutputs = outputs.size(); http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java index b7cbfb4..89e4642 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java @@ -36,8 +36,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.PactDriver; -import org.apache.flink.runtime.operators.PactTaskContext; +import org.apache.flink.runtime.operators.Driver; +import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; @@ -64,7 +64,7 @@ import java.util.Arrays; import java.util.List; -public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { +public class TezTask<S extends Function,OT> implements TaskContext<S, OT> { protected static final Log LOG = LogFactory.getLog(TezTask.class); @@ -74,7 +74,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { * The driver that invokes the user code (the stub implementation). The central driver in this task * (further drivers may be chained behind this driver). */ - protected volatile PactDriver<S, OT> driver; + protected volatile Driver<S, OT> driver; /** * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf. @@ -150,8 +150,8 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) { this.config = config; - final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver(); - this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class); + final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver(); + this.driver = InstantiationUtil.instantiate(driverClass, Driver.class); LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs())); @@ -244,7 +244,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { // -------------------------------------------------------------------- - // PactTaskContext interface + // TaskContext interface // -------------------------------------------------------------------- @Override @@ -356,7 +356,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { // -------------------------------------------------------------------- - // Adapted from RegularPactTask + // Adapted from BatchTask // -------------------------------------------------------------------- private void initInputLocalStrategy(int inputNum) throws Exception { @@ -402,7 +402,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { localStub = initStub(userCodeFunctionType); } catch (Exception e) { throw new RuntimeException("Initializing the user code and the configuration failed" + - e.getMessage() == null ? "." : ": " + e.getMessage(), e); + (e.getMessage() == null ? "." : ": " + e.getMessage()), e); } if (!(localStub instanceof GroupCombineFunction)) { http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/test/resources/logback-test.xml b/flink-staging/flink-tez/src/test/resources/logback-test.xml index 9c2e75f..48e4374 100644 --- a/flink-staging/flink-tez/src/test/resources/logback-test.xml +++ b/flink-staging/flink-tez/src/test/resources/logback-test.xml @@ -27,7 +27,7 @@ <appender-ref ref="STDOUT"/> </root> - <!--<logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>--> + <!--<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>--> <!--<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>--> <!--<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>--> <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>--> http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-tests/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/logback-test.xml b/flink-tests/src/test/resources/logback-test.xml index 9c2e75f..48e4374 100644 --- a/flink-tests/src/test/resources/logback-test.xml +++ b/flink-tests/src/test/resources/logback-test.xml @@ -27,7 +27,7 @@ <appender-ref ref="STDOUT"/> </root> - <!--<logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>--> + <!--<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>--> <!--<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>--> <!--<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>--> <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>-->