http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java deleted file mode 100644 index 9cb045f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.iterative.task; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.operators.util.JoinHashMap; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.InputViewIterator; -import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; -import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker; -import org.apache.flink.runtime.iterative.concurrent.Broker; -import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker; -import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker; -import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier; -import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker; -import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier; -import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch; -import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker; -import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent; -import org.apache.flink.runtime.iterative.event.TerminationEvent; -import org.apache.flink.runtime.iterative.event.WorkerDoneEvent; -import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer; -import org.apache.flink.runtime.operators.RegularPactTask; -import org.apache.flink.runtime.operators.hash.CompactingHashTable; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - -/** - * The head is responsible for coordinating an iteration and can run a - * {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read - * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing - * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a - * synchronization task and after each superstep, it will wait - * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished - * their iteration. Starting with - * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the - * iteration is done, the head - * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown. - * <p> - * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the - * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate - * connects to the synchronization task. - * - * @param <X> - * The type of the bulk partial solution / solution set and the final output. - * @param <Y> - * The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the - * same as {@code X} - */ -public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> { - - private static final Logger log = LoggerFactory.getLogger(IterationHeadPactTask.class); - - private Collector<X> finalOutputCollector; - - private TypeSerializerFactory<Y> feedbackTypeSerializer; - - private TypeSerializerFactory<X> solutionTypeSerializer; - - private ResultPartitionWriter toSync; - - private int feedbackDataInput; // workset or bulk partial solution - - // -------------------------------------------------------------------------------------------- - - @Override - protected int getNumTaskInputs() { - // this task has an additional input in the workset case for the initial solution set - boolean isWorkset = config.getIsWorksetIteration(); - return driver.getNumberOfInputs() + (isWorkset ? 1 : 0); - } - - @Override - protected void initOutputs() throws Exception { - // initialize the regular outputs first (the ones into the step function). - super.initOutputs(); - - // at this time, the outputs to the step function are created - // add the outputs for the final solution - List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>(); - final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig(); - final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter(); - this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig, - userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter); - - // sanity check the setup - final int writersIntoStepFunction = this.eventualOutputs.size(); - final int writersIntoFinalResult = finalOutputWriters.size(); - final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput(); - - if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) { - throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates."); - } - // now, we can instantiate the sync gate - this.toSync = getEnvironment().getWriter(syncGateIndex); - } - - /** - * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and - * hands it to the iteration tail via a {@link Broker} singleton - **/ - private BlockingBackChannel initBackChannel() throws Exception { - - /* get the size of the memory available to the backchannel */ - int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory()); - - /* allocate the memory available to the backchannel */ - List<MemorySegment> segments = new ArrayList<MemorySegment>(); - int segmentSize = getMemoryManager().getPageSize(); - getMemoryManager().allocatePages(this, segments, backChannelMemoryPages); - - /* instantiate the backchannel */ - BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, - getIOManager())); - - /* hand the backchannel over to the iteration tail */ - Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance(); - broker.handIn(brokerKey(), backChannel); - - return backChannel; - } - - private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception { - // get some memory - double hashjoinMemorySize = config.getRelativeSolutionSetMemory(); - final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - - TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader); - TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader); - - TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer(); - TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator(); - - CompactingHashTable<BT> hashTable = null; - List<MemorySegment> memSegments = null; - boolean success = false; - try { - int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize); - memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages); - hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments); - success = true; - return hashTable; - } finally { - if (!success) { - if (hashTable != null) { - try { - hashTable.close(); - } catch (Throwable t) { - log.error("Error closing the solution set hash table after unsuccessful creation.", t); - } - } - if (memSegments != null) { - try { - getMemoryManager().release(memSegments); - } catch (Throwable t) { - log.error("Error freeing memory after error during solution set hash table creation.", t); - } - } - } - } - } - - private <BT> JoinHashMap<BT> initJoinHashMap() { - TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer - (getUserCodeClassLoader()); - TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator - (getUserCodeClassLoader()); - - TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer(); - TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator(); - - return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator); - } - - private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException { - solutionSet.open(); - solutionSet.buildTableWithUniqueKey(solutionSetInput); - } - - private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException { - TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer(); - - X next; - while ((next = solutionSetInput.next(serializer.createInstance())) != null) { - solutionSet.insertOrReplace(next); - } - } - - private SuperstepBarrier initSuperstepBarrier() { - SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader()); - this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class); - this.toSync.subscribeToEvent(barrier, TerminationEvent.class); - return barrier; - } - - @Override - public void run() throws Exception { - final String brokerKey = brokerKey(); - final int workerIndex = getEnvironment().getIndexInSubtaskGroup(); - - final boolean objectSolutionSet = config.isSolutionSetUnmanaged(); - - CompactingHashTable<X> solutionSet = null; // if workset iteration - JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set - - boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate(); - boolean isWorksetIteration = config.getIsWorksetIteration(); - - try { - /* used for receiving the current iteration result from iteration tail */ - SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch(); - SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff); - - BlockingBackChannel backChannel = initBackChannel(); - SuperstepBarrier barrier = initSuperstepBarrier(); - SolutionSetUpdateBarrier solutionSetUpdateBarrier = null; - - feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex(); - feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput); - excludeFromReset(feedbackDataInput); - - int initialSolutionSetInput; - if (isWorksetIteration) { - initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex(); - solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader()); - - // setup the index for the solution set - @SuppressWarnings("unchecked") - MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer); - - // read the initial solution set - if (objectSolutionSet) { - solutionSetObjectMap = initJoinHashMap(); - readInitialSolutionSet(solutionSetObjectMap, solutionSetInput); - SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap); - } else { - solutionSet = initCompactingHashTable(); - readInitialSolutionSet(solutionSet, solutionSetInput); - SolutionSetBroker.instance().handIn(brokerKey, solutionSet); - } - - if (waitForSolutionSetUpdate) { - solutionSetUpdateBarrier = new SolutionSetUpdateBarrier(); - SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier); - } - } - else { - // bulk iteration case - @SuppressWarnings("unchecked") - TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer; - solutionTypeSerializer = solSer; - - // = termination Criterion tail - if (waitForSolutionSetUpdate) { - solutionSetUpdateBarrier = new SolutionSetUpdateBarrier(); - SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier); - } - } - - // instantiate all aggregators and register them at the iteration global registry - RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators - (getUserCodeClassLoader())); - IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry); - - DataInputView superstepResult = null; - - while (this.running && !terminationRequested()) { - - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } - - barrier.setup(); - - if (waitForSolutionSetUpdate) { - solutionSetUpdateBarrier.setup(); - } - - if (!inFirstIteration()) { - feedBackSuperstepResult(superstepResult); - } - - super.run(); - - // signal to connected tasks that we are done with the superstep - sendEndOfSuperstepToAllIterationOutputs(); - - if (waitForSolutionSetUpdate) { - solutionSetUpdateBarrier.waitForSolutionSetUpdate(); - } - - // blocking call to wait for the result - superstepResult = backChannel.getReadEndAfterSuperstepEnded(); - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators())); - - if (log.isInfoEnabled()) { - log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]")); - } - - barrier.waitForOtherWorkers(); - - if (barrier.terminationSignaled()) { - if (log.isInfoEnabled()) { - log.info(formatLogString("head received termination request in iteration [" - + currentIteration() - + "]")); - } - requestTermination(); - nextStepKickoff.signalTermination(); - } else { - incrementIterationCounter(); - - String[] globalAggregateNames = barrier.getAggregatorNames(); - Value[] globalAggregates = barrier.getAggregates(); - aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates); - - nextStepKickoff.triggerNextSuperstep(); - } - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations")); - } - - if (isWorksetIteration) { - if (objectSolutionSet) { - streamSolutionSetToFinalOutput(solutionSetObjectMap); - } else { - streamSolutionSetToFinalOutput(solutionSet); - } - } else { - streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer())); - } - - this.finalOutputCollector.close(); - - } finally { - // make sure we unregister everything from the broker: - // - backchannel - // - aggregator registry - // - solution set index - IterationAggregatorBroker.instance().remove(brokerKey); - BlockingBackChannelBroker.instance().remove(brokerKey); - SuperstepKickoffLatchBroker.instance().remove(brokerKey); - SolutionSetBroker.instance().remove(brokerKey); - SolutionSetUpdateBarrierBroker.instance().remove(brokerKey); - - if (solutionSet != null) { - solutionSet.close(); - } - } - } - - private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException { - final Collector<X> out = this.finalOutputCollector; - X record = this.solutionTypeSerializer.getSerializer().createInstance(); - - while ((record = results.next(record)) != null) { - out.collect(record); - } - } - - private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException { - final MutableObjectIterator<X> results = hashTable.getEntryIterator(); - final Collector<X> output = this.finalOutputCollector; - X record = solutionTypeSerializer.getSerializer().createInstance(); - - while ((record = results.next(record)) != null) { - output.collect(record); - } - } - - @SuppressWarnings("unchecked") - private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException { - final Collector<X> output = this.finalOutputCollector; - for (Object e : soluionSet.values()) { - output.collect((X) e); - } - } - - private void feedBackSuperstepResult(DataInputView superstepResult) { - this.inputs[this.feedbackDataInput] = - new InputViewIterator<Y>(superstepResult, this.feedbackTypeSerializer.getSerializer()); - } - - private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException { - if (log.isDebugEnabled()) { - log.debug(formatLogString("Sending end-of-superstep to all iteration outputs.")); - } - - for (RecordWriter<?> eventualOutput : this.eventualOutputs) { - eventualOutput.sendEndOfSuperstep(); - } - } - - private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException { - if (log.isInfoEnabled()) { - log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync")); - } - this.toSync.writeEventToAllChannels(event); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java new file mode 100644 index 0000000..c6268f4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.iterative.task; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.operators.Driver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.operators.util.JoinHashMap; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparatorFactory; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; +import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker; +import org.apache.flink.runtime.iterative.concurrent.Broker; +import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker; +import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker; +import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier; +import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker; +import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier; +import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch; +import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker; +import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent; +import org.apache.flink.runtime.iterative.event.TerminationEvent; +import org.apache.flink.runtime.iterative.event.WorkerDoneEvent; +import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.hash.CompactingHashTable; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +/** + * The head is responsible for coordinating an iteration and can run a + * {@link Driver} inside. It will read + * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing + * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a + * synchronization task and after each superstep, it will wait + * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished + * their iteration. Starting with + * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the + * iteration is done, the head + * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown. + * <p> + * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the + * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate + * connects to the synchronization task. + * + * @param <X> + * The type of the bulk partial solution / solution set and the final output. + * @param <Y> + * The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the + * same as {@code X} + */ +public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIterativeTask<S, OT> { + + private static final Logger log = LoggerFactory.getLogger(IterationHeadTask.class); + + private Collector<X> finalOutputCollector; + + private TypeSerializerFactory<Y> feedbackTypeSerializer; + + private TypeSerializerFactory<X> solutionTypeSerializer; + + private ResultPartitionWriter toSync; + + private int feedbackDataInput; // workset or bulk partial solution + + // -------------------------------------------------------------------------------------------- + + @Override + protected int getNumTaskInputs() { + // this task has an additional input in the workset case for the initial solution set + boolean isWorkset = config.getIsWorksetIteration(); + return driver.getNumberOfInputs() + (isWorkset ? 1 : 0); + } + + @Override + protected void initOutputs() throws Exception { + // initialize the regular outputs first (the ones into the step function). + super.initOutputs(); + + // at this time, the outputs to the step function are created + // add the outputs for the final solution + List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>(); + final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig(); + final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); + AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter(); + this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig, + userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter); + + // sanity check the setup + final int writersIntoStepFunction = this.eventualOutputs.size(); + final int writersIntoFinalResult = finalOutputWriters.size(); + final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput(); + + if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) { + throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates."); + } + // now, we can instantiate the sync gate + this.toSync = getEnvironment().getWriter(syncGateIndex); + } + + /** + * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and + * hands it to the iteration tail via a {@link Broker} singleton + **/ + private BlockingBackChannel initBackChannel() throws Exception { + + /* get the size of the memory available to the backchannel */ + int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory()); + + /* allocate the memory available to the backchannel */ + List<MemorySegment> segments = new ArrayList<MemorySegment>(); + int segmentSize = getMemoryManager().getPageSize(); + getMemoryManager().allocatePages(this, segments, backChannelMemoryPages); + + /* instantiate the backchannel */ + BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, + getIOManager())); + + /* hand the backchannel over to the iteration tail */ + Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance(); + broker.handIn(brokerKey(), backChannel); + + return backChannel; + } + + private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception { + // get some memory + double hashjoinMemorySize = config.getRelativeSolutionSetMemory(); + final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); + + TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader); + TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader); + + TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer(); + TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator(); + + CompactingHashTable<BT> hashTable = null; + List<MemorySegment> memSegments = null; + boolean success = false; + try { + int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize); + memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages); + hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments); + success = true; + return hashTable; + } finally { + if (!success) { + if (hashTable != null) { + try { + hashTable.close(); + } catch (Throwable t) { + log.error("Error closing the solution set hash table after unsuccessful creation.", t); + } + } + if (memSegments != null) { + try { + getMemoryManager().release(memSegments); + } catch (Throwable t) { + log.error("Error freeing memory after error during solution set hash table creation.", t); + } + } + } + } + } + + private <BT> JoinHashMap<BT> initJoinHashMap() { + TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer + (getUserCodeClassLoader()); + TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator + (getUserCodeClassLoader()); + + TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer(); + TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator(); + + return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator); + } + + private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException { + solutionSet.open(); + solutionSet.buildTableWithUniqueKey(solutionSetInput); + } + + private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException { + TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer(); + + X next; + while ((next = solutionSetInput.next(serializer.createInstance())) != null) { + solutionSet.insertOrReplace(next); + } + } + + private SuperstepBarrier initSuperstepBarrier() { + SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader()); + this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class); + this.toSync.subscribeToEvent(barrier, TerminationEvent.class); + return barrier; + } + + @Override + public void run() throws Exception { + final String brokerKey = brokerKey(); + final int workerIndex = getEnvironment().getIndexInSubtaskGroup(); + + final boolean objectSolutionSet = config.isSolutionSetUnmanaged(); + + CompactingHashTable<X> solutionSet = null; // if workset iteration + JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set + + boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate(); + boolean isWorksetIteration = config.getIsWorksetIteration(); + + try { + /* used for receiving the current iteration result from iteration tail */ + SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch(); + SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff); + + BlockingBackChannel backChannel = initBackChannel(); + SuperstepBarrier barrier = initSuperstepBarrier(); + SolutionSetUpdateBarrier solutionSetUpdateBarrier = null; + + feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex(); + feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput); + excludeFromReset(feedbackDataInput); + + int initialSolutionSetInput; + if (isWorksetIteration) { + initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex(); + solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader()); + + // setup the index for the solution set + @SuppressWarnings("unchecked") + MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer); + + // read the initial solution set + if (objectSolutionSet) { + solutionSetObjectMap = initJoinHashMap(); + readInitialSolutionSet(solutionSetObjectMap, solutionSetInput); + SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap); + } else { + solutionSet = initCompactingHashTable(); + readInitialSolutionSet(solutionSet, solutionSetInput); + SolutionSetBroker.instance().handIn(brokerKey, solutionSet); + } + + if (waitForSolutionSetUpdate) { + solutionSetUpdateBarrier = new SolutionSetUpdateBarrier(); + SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier); + } + } + else { + // bulk iteration case + @SuppressWarnings("unchecked") + TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer; + solutionTypeSerializer = solSer; + + // = termination Criterion tail + if (waitForSolutionSetUpdate) { + solutionSetUpdateBarrier = new SolutionSetUpdateBarrier(); + SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier); + } + } + + // instantiate all aggregators and register them at the iteration global registry + RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators + (getUserCodeClassLoader())); + IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry); + + DataInputView superstepResult = null; + + while (this.running && !terminationRequested()) { + + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } + + barrier.setup(); + + if (waitForSolutionSetUpdate) { + solutionSetUpdateBarrier.setup(); + } + + if (!inFirstIteration()) { + feedBackSuperstepResult(superstepResult); + } + + super.run(); + + // signal to connected tasks that we are done with the superstep + sendEndOfSuperstepToAllIterationOutputs(); + + if (waitForSolutionSetUpdate) { + solutionSetUpdateBarrier.waitForSolutionSetUpdate(); + } + + // blocking call to wait for the result + superstepResult = backChannel.getReadEndAfterSuperstepEnded(); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } + + sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators())); + + if (log.isInfoEnabled()) { + log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]")); + } + + barrier.waitForOtherWorkers(); + + if (barrier.terminationSignaled()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("head received termination request in iteration [" + + currentIteration() + + "]")); + } + requestTermination(); + nextStepKickoff.signalTermination(); + } else { + incrementIterationCounter(); + + String[] globalAggregateNames = barrier.getAggregatorNames(); + Value[] globalAggregates = barrier.getAggregates(); + aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates); + + nextStepKickoff.triggerNextSuperstep(); + } + } + + if (log.isInfoEnabled()) { + log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations")); + } + + if (isWorksetIteration) { + if (objectSolutionSet) { + streamSolutionSetToFinalOutput(solutionSetObjectMap); + } else { + streamSolutionSetToFinalOutput(solutionSet); + } + } else { + streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer())); + } + + this.finalOutputCollector.close(); + + } finally { + // make sure we unregister everything from the broker: + // - backchannel + // - aggregator registry + // - solution set index + IterationAggregatorBroker.instance().remove(brokerKey); + BlockingBackChannelBroker.instance().remove(brokerKey); + SuperstepKickoffLatchBroker.instance().remove(brokerKey); + SolutionSetBroker.instance().remove(brokerKey); + SolutionSetUpdateBarrierBroker.instance().remove(brokerKey); + + if (solutionSet != null) { + solutionSet.close(); + } + } + } + + private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException { + final Collector<X> out = this.finalOutputCollector; + X record = this.solutionTypeSerializer.getSerializer().createInstance(); + + while ((record = results.next(record)) != null) { + out.collect(record); + } + } + + private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException { + final MutableObjectIterator<X> results = hashTable.getEntryIterator(); + final Collector<X> output = this.finalOutputCollector; + X record = solutionTypeSerializer.getSerializer().createInstance(); + + while ((record = results.next(record)) != null) { + output.collect(record); + } + } + + @SuppressWarnings("unchecked") + private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException { + final Collector<X> output = this.finalOutputCollector; + for (Object e : soluionSet.values()) { + output.collect((X) e); + } + } + + private void feedBackSuperstepResult(DataInputView superstepResult) { + this.inputs[this.feedbackDataInput] = + new InputViewIterator<Y>(superstepResult, this.feedbackTypeSerializer.getSerializer()); + } + + private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException { + if (log.isDebugEnabled()) { + log.debug(formatLogString("Sending end-of-superstep to all iteration outputs.")); + } + + for (RecordWriter<?> eventualOutput : this.eventualOutputs) { + eventualOutput.sendEndOfSuperstep(); + } + } + + private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException { + if (log.isInfoEnabled()) { + log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync")); + } + this.toSync.writeEventToAllChannels(event); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java deleted file mode 100644 index e7801e4..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.iterative.task; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; -import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; -import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch; -import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker; -import org.apache.flink.runtime.iterative.event.TerminationEvent; -import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * An intermediate iteration task, which runs a Driver}inside. - * <p> - * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore - * intermediate tasks can also update the iteration state, either the workset or the solution set. - * <p> - * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via - * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case - * this task must be scheduled on the same instance as the head. - */ -public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> { - - private static final Logger log = LoggerFactory.getLogger(IterationIntermediatePactTask.class); - - private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector; - - @Override - protected void initialize() throws Exception { - super.initialize(); - - // set the last output collector of this task to reflect the iteration intermediate state update - // a) workset update - // b) solution set update - // c) none - - Collector<OT> delegate = getLastOutputCollector(); - if (isWorksetUpdate) { - // sanity check: we should not have a solution set and workset update at the same time - // in an intermediate task - if (isSolutionSetUpdate) { - throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update."); - } - - Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate); - - // we need the WorksetUpdateOutputCollector separately to count the collected elements - if (isWorksetIteration) { - worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; - } - - setLastOutputCollector(outputCollector); - } else if (isSolutionSetUpdate) { - setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate)); - } - } - - @Override - public void run() throws Exception { - - SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); - - while (this.running && !terminationRequested()) { - - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } - - super.run(); - - // check if termination was requested - verifyEndOfSuperstepState(); - - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); - - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); - } - } - } - - private void sendEndOfSuperstep() throws IOException, InterruptedException { - for (RecordWriter eventualOutput : this.eventualOutputs) { - eventualOutput.sendEndOfSuperstep(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java new file mode 100644 index 0000000..60f0dcf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.iterative.task; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; +import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; +import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch; +import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker; +import org.apache.flink.runtime.iterative.event.TerminationEvent; +import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * An intermediate iteration task, which runs a Driver}inside. + * <p> + * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore + * intermediate tasks can also update the iteration state, either the workset or the solution set. + * <p> + * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via + * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case + * this task must be scheduled on the same instance as the head. + */ +public class IterationIntermediateTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> { + + private static final Logger log = LoggerFactory.getLogger(IterationIntermediateTask.class); + + private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector; + + @Override + protected void initialize() throws Exception { + super.initialize(); + + // set the last output collector of this task to reflect the iteration intermediate state update + // a) workset update + // b) solution set update + // c) none + + Collector<OT> delegate = getLastOutputCollector(); + if (isWorksetUpdate) { + // sanity check: we should not have a solution set and workset update at the same time + // in an intermediate task + if (isSolutionSetUpdate) { + throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update."); + } + + Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate); + + // we need the WorksetUpdateOutputCollector separately to count the collected elements + if (isWorksetIteration) { + worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; + } + + setLastOutputCollector(outputCollector); + } else if (isSolutionSetUpdate) { + setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate)); + } + } + + @Override + public void run() throws Exception { + + SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + + while (this.running && !terminationRequested()) { + + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } + + super.run(); + + // check if termination was requested + verifyEndOfSuperstepState(); + + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } + + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } + + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + } + else { + incrementIterationCounter(); + } + } + } + + private void sendEndOfSuperstep() throws IOException, InterruptedException { + for (RecordWriter eventualOutput : this.eventualOutputs) { + eventualOutput.sendEndOfSuperstep(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java index fed0a17..a85e662 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.types.IntValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,6 @@ import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent; import org.apache.flink.runtime.iterative.event.TerminationEvent; import org.apache.flink.runtime.iterative.event.WorkerDoneEvent; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.RegularPactTask; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.types.Value; @@ -204,7 +204,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen } private String formatLogString(String message) { - return RegularPactTask.constructLogString(message, getEnvironment().getTaskName(), this); + return BatchTask.constructLogString(message, getEnvironment().getTaskName(), this); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java deleted file mode 100644 index 159d3f2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.iterative.task; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier; -import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker; -import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch; -import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker; -import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; -import org.apache.flink.util.Collector; - -/** - * An iteration tail, which runs a driver inside. - * <p> - * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via - * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this - * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset - * and the solution set. - * <p> - * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish. - */ -public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> { - - private static final Logger log = LoggerFactory.getLogger(IterationTailPactTask.class); - - private SolutionSetUpdateBarrier solutionSetUpdateBarrier; - - private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector; - - - @Override - protected void initialize() throws Exception { - super.initialize(); - - // sanity check: the tail has to update either the workset or the solution set - if (!isWorksetUpdate && !isSolutionSetUpdate) { - throw new RuntimeException("The iteration tail doesn't update workset or the solution set."); - } - - // set the last output collector of this task to reflect the iteration tail state update: - // a) workset update, - // b) solution set update, or - // c) merged workset and solution set update - - Collector<OT> outputCollector = null; - if (isWorksetUpdate) { - outputCollector = createWorksetUpdateOutputCollector(); - - // we need the WorksetUpdateOutputCollector separately to count the collected elements - if (isWorksetIteration) { - worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; - } - } - - if (isSolutionSetUpdate) { - if (isWorksetIteration) { - outputCollector = createSolutionSetUpdateOutputCollector(outputCollector); - } - // Bulk iteration with termination criterion - else { - outputCollector = new Collector<OT>() { - @Override - public void collect(OT record) {} - @Override - public void close() {} - }; - } - - if (!isWorksetUpdate) { - solutionSetUpdateBarrier = SolutionSetUpdateBarrierBroker.instance().get(brokerKey()); - } - } - - setLastOutputCollector(outputCollector); - } - - @Override - public void run() throws Exception { - - SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); - - while (this.running && !terminationRequested()) { - - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } - - super.run(); - - // check if termination was requested - verifyEndOfSuperstepState(); - - if (isWorksetUpdate && isWorksetIteration) { - // aggregate workset update element count - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } else if (isSolutionSetUpdate) { - // notify iteration head if responsible for solution set update - solutionSetUpdateBarrier.notifySolutionSetUpdate(); - } - - boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); - if (terminate) { - requestTermination(); - } - else { - incrementIterationCounter(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java new file mode 100644 index 0000000..9e0b560 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.iterative.task; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier; +import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker; +import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch; +import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker; +import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; +import org.apache.flink.util.Collector; + +/** + * An iteration tail, which runs a driver inside. + * <p> + * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via + * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this + * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset + * and the solution set. + * <p> + * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish. + */ +public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> { + + private static final Logger log = LoggerFactory.getLogger(IterationTailTask.class); + + private SolutionSetUpdateBarrier solutionSetUpdateBarrier; + + private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector; + + + @Override + protected void initialize() throws Exception { + super.initialize(); + + // sanity check: the tail has to update either the workset or the solution set + if (!isWorksetUpdate && !isSolutionSetUpdate) { + throw new RuntimeException("The iteration tail doesn't update workset or the solution set."); + } + + // set the last output collector of this task to reflect the iteration tail state update: + // a) workset update, + // b) solution set update, or + // c) merged workset and solution set update + + Collector<OT> outputCollector = null; + if (isWorksetUpdate) { + outputCollector = createWorksetUpdateOutputCollector(); + + // we need the WorksetUpdateOutputCollector separately to count the collected elements + if (isWorksetIteration) { + worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; + } + } + + if (isSolutionSetUpdate) { + if (isWorksetIteration) { + outputCollector = createSolutionSetUpdateOutputCollector(outputCollector); + } + // Bulk iteration with termination criterion + else { + outputCollector = new Collector<OT>() { + @Override + public void collect(OT record) {} + @Override + public void close() {} + }; + } + + if (!isWorksetUpdate) { + solutionSetUpdateBarrier = SolutionSetUpdateBarrierBroker.instance().get(brokerKey()); + } + } + + setLastOutputCollector(outputCollector); + } + + @Override + public void run() throws Exception { + + SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + + while (this.running && !terminationRequested()) { + + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } + + super.run(); + + // check if termination was requested + verifyEndOfSuperstepState(); + + if (isWorksetUpdate && isWorksetIteration) { + // aggregate workset update element count + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + + } + + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } else if (isSolutionSetUpdate) { + // notify iteration head if responsible for solution set update + solutionSetUpdateBarrier.notifySolutionSetUpdate(); + } + + boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (terminate) { + requestTermination(); + } + else { + incrementIterationCounter(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 0e0bd26..4923b3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +29,7 @@ import org.slf4j.LoggerFactory; /** * This is the abstract base class for every task that can be executed by a TaskManager. * Concrete tasks like the stream vertices of the batch tasks - * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class. + * (see {@link BatchTask}) inherit from this class. * * The TaskManager invokes the methods {@link #registerInputOutput()} and {@link #invoke()} in * this order when executing a task. The first method is responsible for setting up input and http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index 4096f0c..8f72754 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { +public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator; http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java index 38c74e0..8c964d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java @@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory; * * @see FlatJoinFunction */ -public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { +public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT2, OT>, OT> { protected static final Logger LOG = LoggerFactory.getLogger(AbstractOuterJoinDriver.class); - protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; + protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; protected volatile JoinTaskIterator<IT1, IT2, OT> outerJoinIterator; // the iterator that does the actual outer join protected volatile boolean running; @@ -50,7 +50,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements PactDrive // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { + public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java index 7b279ee..0c8dc34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java @@ -37,18 +37,18 @@ import org.slf4j.LoggerFactory; * * @see GroupCombineFunction */ -public class AllGroupCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> { +public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> { private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class); - private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext; + private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext; private boolean objectReuseEnabled = false; // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) { + public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> context) { this.taskContext = context; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java index a20fddf..255c57c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java @@ -45,11 +45,11 @@ import org.apache.flink.util.MutableObjectIterator; * * @see org.apache.flink.api.common.functions.GroupReduceFunction */ -public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> { +public class AllGroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, OT>, OT> { private static final Logger LOG = LoggerFactory.getLogger(AllGroupReduceDriver.class); - private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext; + private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext; private MutableObjectIterator<IT> input; @@ -62,7 +62,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) { + public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) { this.taskContext = context; } http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index 1f58c1b..f27ae34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -38,11 +38,11 @@ import org.apache.flink.util.MutableObjectIterator; * * @see org.apache.flink.api.common.functions.ReduceFunction */ -public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { +public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> { private static final Logger LOG = LoggerFactory.getLogger(AllReduceDriver.class); - private PactTaskContext<ReduceFunction<T>, T> taskContext; + private TaskContext<ReduceFunction<T>, T> taskContext; private MutableObjectIterator<T> input; @@ -55,7 +55,7 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<ReduceFunction<T>, T> context) { + public void setup(TaskContext<ReduceFunction<T>, T> context) { this.taskContext = context; this.running = true; }