Repository: flink Updated Branches: refs/heads/master 234646844 -> 273f54ba4
[FLINK-3666] Remove all remaining Nephele references Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273f54ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273f54ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273f54ba Branch: refs/heads/master Commit: 273f54ba4357ccc4e4ade35b8967f0fa607a1ea8 Parents: 2346468 Author: zentol <ches...@apache.org> Authored: Wed Jul 13 16:15:28 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Fri Jul 15 11:46:34 2016 +0200 ---------------------------------------------------------------------- docs/setup/local_setup.md | 13 ++++++------- .../runtime/iterative/task/IterationHeadTask.java | 2 +- .../operators/AbstractCachedBuildSideJoinDriver.java | 8 ++++---- .../org/apache/flink/runtime/operators/BatchTask.java | 14 +++++++------- .../apache/flink/runtime/operators/CrossDriver.java | 12 ++++++------ .../flink/runtime/operators/FullOuterJoinDriver.java | 12 ++++++------ .../runtime/operators/GroupReduceCombineDriver.java | 2 +- .../apache/flink/runtime/operators/JoinDriver.java | 12 ++++++------ .../flink/runtime/operators/LeftOuterJoinDriver.java | 12 ++++++------ .../flink/runtime/operators/ReduceCombineDriver.java | 2 +- .../flink/runtime/operators/RightOuterJoinDriver.java | 12 ++++++------ .../apache/flink/runtime/operators/TaskContext.java | 2 +- .../runtime/operators/ReduceTaskExternalITCase.java | 4 ++-- .../flink/runtime/operators/ReduceTaskTest.java | 2 +- .../runtime/operators/drivers/TestTaskContext.java | 2 +- .../operators/testutils/BinaryOperatorTestBase.java | 2 +- .../runtime/operators/testutils/DriverTestBase.java | 2 +- .../operators/testutils/UnaryOperatorTestBase.java | 2 +- 18 files changed, 58 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/docs/setup/local_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/local_setup.md b/docs/setup/local_setup.md index 43a9b79..983fa91 100644 --- a/docs/setup/local_setup.md +++ b/docs/setup/local_setup.md @@ -70,18 +70,17 @@ The out of the box configuration will use your default Java installation. You ca $ tar xzf flink-*.tgz $ cd flink $ bin/start-local.sh -Starting job manager +Starting jobmanager. ~~~ You can check that the system is running by checking the log files in the `logs` directory: ~~~bash $ tail log/flink-*-jobmanager-*.log -INFO ... - Initializing memory manager with 409 megabytes of memory -INFO ... - Trying to load org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler as scheduler -INFO ... - Setting up web info server, using web-root directory ... -INFO ... - Web info server will display information about nephele job-manager on localhost, port 8081. -INFO ... - Starting web info server for JobManager on port 8081 +INFO ... - Starting JobManager +INFO ... - Starting JobManager web frontend +INFO ... - Web frontend listening at 127.0.0.1:8081 +INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager) ~~~ The JobManager will also start a web frontend on port 8081, which you can check with your browser at `http://localhost:8081`. @@ -117,7 +116,7 @@ With *Cygwin* you need to start the Cygwin Terminal, navigate to your Flink dire ~~~bash $ cd flink $ bin/start-local.sh -Starting Nephele job manager +Starting jobmanager. ~~~ {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java index 66778c9..4bc4532 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java @@ -171,7 +171,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte boolean success = false; try { int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize); - memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages); + memSegments = getMemoryManager().allocatePages(getContainingTask(), numPages); hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments); success = true; return hashTable; http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index 406d430..8c66cc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -97,7 +97,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo pairComparatorFactory.createComparator21(comparator1, comparator2), this.taskContext.getMemoryManager(), this.taskContext.getIOManager(), - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), availableMemory, false, false, @@ -113,7 +113,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo pairComparatorFactory.createComparator12(comparator1, comparator2), this.taskContext.getMemoryManager(), this.taskContext.getIOManager(), - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), availableMemory, false, false, @@ -132,7 +132,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo pairComparatorFactory.createComparator21(comparator1, comparator2), this.taskContext.getMemoryManager(), this.taskContext.getIOManager(), - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), availableMemory, false, false, @@ -148,7 +148,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo pairComparatorFactory.createComparator12(comparator1, comparator2), this.taskContext.getMemoryManager(), this.taskContext.getIOManager(), - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), availableMemory, false, false, http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index 68995d8..29f1c20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -1061,7 +1061,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme } @Override - public AbstractInvokable getOwningNepheleTask() { + public AbstractInvokable getContainingTask() { return this; } @@ -1154,7 +1154,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme * * @param message The main message for the log. * @param taskName The name of the task. - * @param parent The nephele task that contains the code producing the message. + * @param parent The task that contains the code producing the message. * * @return The string for logging. */ @@ -1260,7 +1260,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme * The output collector applies the configured shipping strategy. */ @SuppressWarnings("unchecked") - public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config, + public static <T> Collector<T> initOutputs(AbstractInvokable containingTask, ClassLoader cl, TaskConfig config, List<ChainedDriver<?, ?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig, @@ -1299,21 +1299,21 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme if (i == numChained - 1) { // last in chain, instantiate the output collector for this task - previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter); + previous = getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter); } - ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap); + ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap); chainedTasksTarget.add(0, ct); previous = ct; } - // the collector of the first in the chain is the collector for the nephele task + // the collector of the first in the chain is the collector for the task return (Collector<T>) previous; } // else // instantiate the output collector the default way from this configuration - return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter); + return getOutputCollector(containingTask , config, cl, eventualOutputs, 0, numOutputs, reporter); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java index 3e1d01f..fee0874 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java @@ -209,12 +209,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> final BlockResettableMutableObjectIterator<T1> blockVals = new BlockResettableMutableObjectIterator<T1>(this.memManager, in1, serializer1, this.memPagesForBlockSide, - this.taskContext.getOwningNepheleTask()); + this.taskContext.getContainingTask()); this.blockIter = blockVals; final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>( in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, - this.taskContext.getOwningNepheleTask()); + this.taskContext.getContainingTask()); this.spillIter = spillVals; @@ -277,12 +277,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>( in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, - this.taskContext.getOwningNepheleTask()); + this.taskContext.getContainingTask()); this.spillIter = spillVals; final BlockResettableMutableObjectIterator<T2> blockVals = new BlockResettableMutableObjectIterator<T2>(this.memManager, in2, serializer2, this.memPagesForBlockSide, - this.taskContext.getOwningNepheleTask()); + this.taskContext.getContainingTask()); this.blockIter = blockVals; final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); @@ -343,7 +343,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>( in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, - this.taskContext.getOwningNepheleTask()); + this.taskContext.getContainingTask()); this.spillIter = spillVals; final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); @@ -396,7 +396,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT> final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>( in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, - this.taskContext.getOwningNepheleTask()); + this.taskContext.getContainingTask()); this.spillIter = spillVals; final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java index a41a6ec..98a72ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java @@ -68,7 +68,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I memoryManager, ioManager, numPages, - super.taskContext.getOwningNepheleTask() + super.taskContext.getContainingTask() ); case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: return new ReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -76,7 +76,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, true, @@ -87,7 +87,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, true, @@ -126,7 +126,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I memoryManager, ioManager, numPages, - super.taskContext.getOwningNepheleTask() + super.taskContext.getContainingTask() ); case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: return new NonReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -134,7 +134,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, true, @@ -145,7 +145,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, true, http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index 320e006..8edcee2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -129,7 +129,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFun MemoryManager memManager = this.taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver()); - this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages); + this.memory = memManager.allocatePages(this.taskContext.getContainingTask(), numMemoryPages); // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter if (sortingComparator.supportsSerializationWithKeyNormalization() && http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index efb59a7..8543723 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -135,7 +135,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), - memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); + memoryManager, ioManager, numPages, this.taskContext.getContainingTask()); break; case HYBRIDHASH_BUILD_FIRST: this.joinIterator = new ReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -143,7 +143,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), fractionAvailableMemory, false, false, @@ -155,7 +155,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), fractionAvailableMemory, false, false, @@ -171,7 +171,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), - memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); + memoryManager, ioManager, numPages, this.taskContext.getContainingTask()); break; case HYBRIDHASH_BUILD_FIRST: @@ -180,7 +180,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), fractionAvailableMemory, false, false, @@ -192,7 +192,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), fractionAvailableMemory, false, false, http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java index 41bb54d..fbd7f35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java @@ -68,7 +68,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I memoryManager, ioManager, numPages, - super.taskContext.getOwningNepheleTask() + super.taskContext.getContainingTask() ); case LEFT_HYBRIDHASH_BUILD_FIRST: return new ReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -76,7 +76,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, false, true, @@ -87,7 +87,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, false, @@ -126,7 +126,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I memoryManager, ioManager, numPages, - super.taskContext.getOwningNepheleTask() + super.taskContext.getContainingTask() ); case LEFT_HYBRIDHASH_BUILD_FIRST: return new NonReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -134,7 +134,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, false, true, @@ -145,7 +145,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, false, http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index 6c4ded1..aea7ae8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -125,7 +125,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> { MemoryManager memManager = taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages( taskContext.getTaskConfig().getRelativeMemoryDriver()); - memory = memManager.allocatePages(taskContext.getOwningNepheleTask(), numMemoryPages); + memory = memManager.allocatePages(taskContext.getContainingTask(), numMemoryPages); ExecutionConfig executionConfig = taskContext.getExecutionConfig(); objectReuseEnabled = executionConfig.isObjectReuseEnabled(); http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java index 96f65b4..d684aba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java @@ -68,7 +68,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver< memoryManager, ioManager, numPages, - super.taskContext.getOwningNepheleTask() + super.taskContext.getContainingTask() ); case RIGHT_HYBRIDHASH_BUILD_FIRST: return new ReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -76,7 +76,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver< serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, false, @@ -87,7 +87,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver< serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, false, true, @@ -126,7 +126,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver< memoryManager, ioManager, numPages, - super.taskContext.getOwningNepheleTask() + super.taskContext.getContainingTask() ); case RIGHT_HYBRIDHASH_BUILD_FIRST: return new NonReusingBuildFirstHashJoinIterator<>(in1, in2, @@ -134,7 +134,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver< serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, true, false, @@ -145,7 +145,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver< serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, - this.taskContext.getOwningNepheleTask(), + this.taskContext.getContainingTask(), driverMemFraction, false, true, http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java index df22528..bc3e4c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java @@ -65,7 +65,7 @@ public interface TaskContext<S, OT> { Collector<OT> getOutputCollector(); - AbstractInvokable getOwningNepheleTask(); + AbstractInvokable getContainingTask(); String formatLogString(String message); http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java index 2184302..babe69e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java @@ -134,7 +134,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc try { sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), - getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), + getContainingTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, 2, 0.8f, true /* use large record handler */, true); addInput(sorter.getIterator()); @@ -180,7 +180,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc try { sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), - getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), + getContainingTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, 2, 0.8f, true /* use large record handler */, false); addInput(sorter.getIterator()); http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java index 7c92dba..718b446 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java @@ -129,7 +129,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor try { sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), - getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, + getContainingTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, 4, 0.8f, true /* use large record handler */, true); addInput(sorter.getIterator()); http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 89cde95..62110a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -216,7 +216,7 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> { } @Override - public AbstractInvokable getOwningNepheleTask() { + public AbstractInvokable getContainingTask() { return this.owner; } http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 7531b99..75f960e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -361,7 +361,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog } @Override - public AbstractInvokable getOwningNepheleTask() { + public AbstractInvokable getContainingTask() { return this.owner; } http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index e9a0ba5..088435a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -358,7 +358,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta } @Override - public AbstractInvokable getOwningNepheleTask() { + public AbstractInvokable getContainingTask() { return this.owner; } http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index ff12e76..a94e694 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -353,7 +353,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg } @Override - public AbstractInvokable getOwningNepheleTask() { + public AbstractInvokable getContainingTask() { return this.owner; }