http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java deleted file mode 100644 index 89e4642..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java +++ /dev/null @@ -1,578 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.functions.GroupCombineFunction; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.functions.util.RuntimeUDFContext; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.Driver; -import org.apache.flink.runtime.operators.TaskContext; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; -import org.apache.flink.runtime.operators.util.CloseableInputProvider; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.tez.runtime.input.TezReaderIterator; -import org.apache.flink.tez.runtime.output.TezChannelSelector; -import org.apache.flink.tez.runtime.output.TezOutputEmitter; -import org.apache.flink.tez.runtime.output.TezOutputCollector; -import org.apache.flink.tez.util.DummyInvokable; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.MutableObjectIterator; - -import org.apache.tez.runtime.library.api.KeyValueReader; -import org.apache.tez.runtime.library.api.KeyValueWriter; - -import java.io.IOException; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - - -public class TezTask<S extends Function,OT> implements TaskContext<S, OT> { - - protected static final Log LOG = LogFactory.getLog(TezTask.class); - - DummyInvokable invokable = new DummyInvokable(); - - /** - * The driver that invokes the user code (the stub implementation). The central driver in this task - * (further drivers may be chained behind this driver). - */ - protected volatile Driver<S, OT> driver; - - /** - * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf. - */ - protected S stub; - - /** - * The udf's runtime context. - */ - protected RuntimeUDFContext runtimeUdfContext; - - /** - * The collector that forwards the user code's results. May forward to a channel or to chained drivers within - * this task. - */ - protected Collector<OT> output; - - /** - * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc... - */ - protected MutableObjectIterator<?>[] inputIterators; - - /** - * The local strategies that are applied on the inputs. - */ - protected volatile CloseableInputProvider<?>[] localStrategies; - - /** - * The inputs to the operator. Return the readers' data after the application of the local strategy - * and the temp-table barrier. - */ - protected MutableObjectIterator<?>[] inputs; - - /** - * The serializers for the input data type. - */ - protected TypeSerializerFactory<?>[] inputSerializers; - - /** - * The comparators for the central driver. - */ - protected TypeComparator<?>[] inputComparators; - - /** - * The task configuration with the setup parameters. - */ - protected TezTaskConfig config; - - /** - * The class loader used to instantiate user code and user data types. - */ - protected ClassLoader userCodeClassLoader = ClassLoader.getSystemClassLoader(); - - /** - * For now, create a default ExecutionConfig - */ - protected ExecutionConfig executionConfig; - - /* - * Tez-specific variables given by the Processor - */ - protected TypeSerializer<OT> outSerializer; - - protected List<Integer> numberOfSubTasksInOutputs; - - protected String taskName; - - protected int numberOfSubtasks; - - protected int indexInSubtaskGroup; - - TezRuntimeEnvironment runtimeEnvironment; - - public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) { - this.config = config; - final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver(); - this.driver = InstantiationUtil.instantiate(driverClass, Driver.class); - - LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs())); - - this.stub = this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class, this.userCodeClassLoader); //TODO get superclass properly - this.runtimeUdfContext = runtimeUdfContext; - this.outSerializer = (TypeSerializer<OT>) this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer(); - this.numberOfSubTasksInOutputs = this.config.getNumberSubtasksInOutput(); - this.taskName = this.config.getTaskName(); - this.numberOfSubtasks = this.runtimeUdfContext.getNumberOfParallelSubtasks(); - this.indexInSubtaskGroup = this.runtimeUdfContext.getIndexOfThisSubtask(); - this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * availableMemory)); - this.executionConfig = runtimeUdfContext.getExecutionConfig(); - this.invokable.setExecutionConfig(this.executionConfig); - } - - - //------------------------------------------------------------- - // Interface to FlinkProcessor - //------------------------------------------------------------- - - public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception { - - // whatever happens in this scope, make sure that the local strategies are cleaned up! - // note that the initialization of the local strategies is in the try-finally block as well, - // so that the thread that creates them catches its own errors that may happen in that process. - // this is especially important, since there may be asynchronous closes (such as through canceling). - try { - // initialize the inputs and outputs - initInputsOutputs(readers, writers); - - // pre main-function initialization - initialize(); - - // the work goes here - run(); - } - finally { - // clean up in any case! - closeLocalStrategies(); - } - } - - - /* - * Initialize inputs, input serializers, input comparators, and collector - * Assumes that the config and userCodeClassLoader has been set - */ - private void initInputsOutputs (List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception { - - int numInputs = readers.size(); - Preconditions.checkArgument(numInputs == driver.getNumberOfInputs()); - - // Prior to local strategies - this.inputIterators = new MutableObjectIterator[numInputs]; - //local strategies - this.localStrategies = new CloseableInputProvider[numInputs]; - // After local strategies - this.inputs = new MutableObjectIterator[numInputs]; - - int numComparators = driver.getNumberOfDriverComparators(); - initInputsSerializersAndComparators(numInputs, numComparators); - - int index = 0; - for (KeyValueReader reader : readers) { - this.inputIterators[index] = new TezReaderIterator<Object>(reader); - initInputLocalStrategy(index); - index++; - } - - int numOutputs = writers.size(); - ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs); - //ArrayList<Integer> numStreamsInOutputs = new ArrayList<Integer>(numOutputs); - for (int i = 0; i < numOutputs; i++) { - final ShipStrategyType strategy = config.getOutputShipStrategy(i); - final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader); - final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader); - if (compFactory == null) { - channelSelectors.add(i, new TezOutputEmitter<OT>(strategy)); - } else if (dataDist == null){ - final TypeComparator<OT> comparator = compFactory.createComparator(); - channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator)); - } else { - final TypeComparator<OT> comparator = compFactory.createComparator(); - channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist)); - } - } - this.output = new TezOutputCollector<OT>(writers, channelSelectors, outSerializer, numberOfSubTasksInOutputs); - } - - - - // -------------------------------------------------------------------- - // TaskContext interface - // -------------------------------------------------------------------- - - @Override - public TaskConfig getTaskConfig() { - return (TaskConfig) this.config; - } - - @Override - public ExecutionConfig getExecutionConfig() { - return executionConfig; - } - - @Override - public ClassLoader getUserCodeClassLoader() { - return this.userCodeClassLoader; - } - - @Override - public MemoryManager getMemoryManager() { - return runtimeEnvironment.getMemoryManager(); - } - - @Override - public IOManager getIOManager() { - return runtimeEnvironment.getIOManager(); - } - - @Override - public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TaskManagerRuntimeInfo("localhost", new Configuration()); - } - - @Override - public <X> MutableObjectIterator<X> getInput(int index) { - if (index < 0 || index > this.driver.getNumberOfInputs()) { - throw new IndexOutOfBoundsException(); - } - // check for lazy assignment from input strategies - if (this.inputs[index] != null) { - @SuppressWarnings("unchecked") - MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index]; - return in; - } else { - final MutableObjectIterator<X> in; - try { - if (this.localStrategies[index] != null) { - @SuppressWarnings("unchecked") - MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator(); - in = iter; - } else { - throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy."); - } - this.inputs[index] = in; - return in; - } catch (InterruptedException iex) { - throw new RuntimeException("Interrupted while waiting for input " + index + " to become available."); - } catch (IOException ioex) { - throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + "."); - } - } - } - - @Override - public <X> TypeSerializerFactory<X> getInputSerializer(int index) { - if (index < 0 || index >= this.driver.getNumberOfInputs()) { - throw new IndexOutOfBoundsException(); - } - - @SuppressWarnings("unchecked") - final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index]; - return serializerFactory; - } - - @Override - public <X> TypeComparator<X> getDriverComparator(int index) { - if (this.inputComparators == null) { - throw new IllegalStateException("Comparators have not been created!"); - } - else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) { - throw new IndexOutOfBoundsException(); - } - - @SuppressWarnings("unchecked") - final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index]; - return comparator; - } - - - - @Override - public S getStub() { - return this.stub; - } - - @Override - public Collector<OT> getOutputCollector() { - return this.output; - } - - @Override - public AbstractInvokable getOwningNepheleTask() { - return this.invokable; - } - - @Override - public String formatLogString(String message) { - return null; - } - - - // -------------------------------------------------------------------- - // Adapted from BatchTask - // -------------------------------------------------------------------- - - private void initInputLocalStrategy(int inputNum) throws Exception { - // check if there is already a strategy - if (this.localStrategies[inputNum] != null) { - throw new IllegalStateException(); - } - - // now set up the local strategy - final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum); - if (localStrategy != null) { - switch (localStrategy) { - case NONE: - // the input is as it is - this.inputs[inputNum] = this.inputIterators[inputNum]; - break; - case SORT: - @SuppressWarnings({ "rawtypes", "unchecked" }) - UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(), - this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), - this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled()); - // set the input to null such that it will be lazily fetched from the input strategy - this.inputs[inputNum] = null; - this.localStrategies[inputNum] = sorter; - break; - case COMBININGSORT: - // sanity check this special case! - // this still breaks a bit of the abstraction! - // we should have nested configurations for the local strategies to solve that - if (inputNum != 0) { - throw new IllegalStateException("Performing combining sort outside a (group)reduce task!"); - } - - // instantiate ourselves a combiner. we should not use the stub, because the sort and the - // subsequent (group)reduce would otherwise share it multi-threaded - final Class<S> userCodeFunctionType = this.driver.getStubType(); - if (userCodeFunctionType == null) { - throw new IllegalStateException("Performing combining sort outside a reduce task!"); - } - final S localStub; - try { - localStub = initStub(userCodeFunctionType); - } catch (Exception e) { - throw new RuntimeException("Initializing the user code and the configuration failed" + - (e.getMessage() == null ? "." : ": " + e.getMessage()), e); - } - - if (!(localStub instanceof GroupCombineFunction)) { - throw new IllegalStateException("Performing combining sort outside a reduce task!"); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger( - (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], - this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), - this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled()); - cSorter.setUdfConfiguration(this.config.getStubParameters()); - - // set the input to null such that it will be lazily fetched from the input strategy - this.inputs[inputNum] = null; - this.localStrategies[inputNum] = cSorter; - break; - default: - throw new Exception("Unrecognized local strategy provided: " + localStrategy.name()); - } - } else { - // no local strategy in the config - this.inputs[inputNum] = this.inputIterators[inputNum]; - } - } - - private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception { - TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader); - if (compFact == null) { - throw new Exception("Missing comparator factory for local strategy on input " + inputNum); - } - return compFact.createComparator(); - } - - protected S initStub(Class<? super S> stubSuperClass) throws Exception { - try { - S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader); - // check if the class is a subclass, if the check is required - if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) { - throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + - stubSuperClass.getName() + "' as is required."); - } - FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext); - return stub; - } - catch (ClassCastException ccex) { - throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex); - } - } - - /** - * Creates all the serializers and comparators. - */ - protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception { - this.inputSerializers = new TypeSerializerFactory<?>[numInputs]; - this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null; - //this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null; - this.inputIterators = new MutableObjectIterator[numInputs]; - - for (int i = 0; i < numInputs; i++) { - // ---------------- create the serializer first --------------------- - final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader); - this.inputSerializers[i] = serializerFactory; - // this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]); - } - // ---------------- create the driver's comparators --------------------- - for (int i = 0; i < numComparators; i++) { - if (this.inputComparators != null) { - final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader); - this.inputComparators[i] = comparatorFactory.createComparator(); - } - } - } - - protected void initialize() throws Exception { - // create the operator - try { - this.driver.setup(this); - } - catch (Throwable t) { - throw new Exception("The driver setup for '" + //TODO put taks name here - "' , caused an error: " + t.getMessage(), t); - } - - //this.runtimeUdfContext = createRuntimeContext(); - - // instantiate the UDF - try { - final Class<? super S> userCodeFunctionType = this.driver.getStubType(); - // if the class is null, the driver has no user code - if (userCodeFunctionType != null) { - this.stub = initStub(userCodeFunctionType); - } - } catch (Exception e) { - throw new RuntimeException("Initializing the UDF" + - e.getMessage() == null ? "." : ": " + e.getMessage(), e); - } - } - - /* - public RuntimeUDFContext createRuntimeContext() { - return new RuntimeUDFContext(this.taskName, this.numberOfSubtasks, this.indexInSubtaskGroup, null); - } - */ - - protected void closeLocalStrategies() { - if (this.localStrategies != null) { - for (int i = 0; i < this.localStrategies.length; i++) { - if (this.localStrategies[i] != null) { - try { - this.localStrategies[i].close(); - } catch (Throwable t) { - LOG.error("Error closing local strategy for input " + i, t); - } - } - } - } - } - - protected void run() throws Exception { - // ---------------------------- Now, the actual processing starts ------------------------ - // check for asynchronous canceling - - boolean stubOpen = false; - - try { - // run the data preparation - try { - this.driver.prepare(); - } - catch (Throwable t) { - // if the preparation caused an error, clean up - // errors during clean-up are swallowed, because we have already a root exception - throw new Exception("The data preparation for task '" + this.taskName + - "' , caused an error: " + t.getMessage(), t); - } - - // open stub implementation - if (this.stub != null) { - try { - Configuration stubConfig = this.config.getStubParameters(); - FunctionUtils.openFunction(this.stub, stubConfig); - stubOpen = true; - } - catch (Throwable t) { - throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t); - } - } - - // run the user code - this.driver.run(); - - // close. We close here such that a regular close throwing an exception marks a task as failed. - if (this.stub != null) { - FunctionUtils.closeFunction(this.stub); - stubOpen = false; - } - - this.output.close(); - - } - catch (Exception ex) { - // close the input, but do not report any exceptions, since we already have another root cause - ex.printStackTrace(); - throw new RuntimeException("Exception in TaskContext: " + ex.getMessage() + " "+ ex.getStackTrace()); - } - finally { - this.driver.cleanup(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java deleted file mode 100644 index 94a8315..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.util.InstantiationUtil; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; - -public class TezTaskConfig extends TaskConfig { - - public static final String TEZ_TASK_CONFIG = "tez.task.flink.processor.taskconfig"; - - private static final String NUMBER_SUBTASKS_IN_OUTPUTS = "tez.num_subtasks_in_output"; - - private static final String INPUT_SPLIT_PROVIDER = "tez.input_split_provider"; - - private static final String INPUT_POSITIONS = "tez.input_positions"; - - private static final String INPUT_FORMAT = "tez.input_format"; - - private static final String DATASOURCE_PROCESSOR_NAME = "tez.datasource_processor_name"; - - public TezTaskConfig(Configuration config) { - super(config); - } - - - public void setDatasourceProcessorName(String name) { - if (name != null) { - this.config.setString(DATASOURCE_PROCESSOR_NAME, name); - } - } - - public String getDatasourceProcessorName() { - return this.config.getString(DATASOURCE_PROCESSOR_NAME, null); - } - - public void setNumberSubtasksInOutput(ArrayList<Integer> numberSubtasksInOutputs) { - try { - InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, NUMBER_SUBTASKS_IN_OUTPUTS); - } catch (IOException e) { - throw new RuntimeException("Error while writing the input split provider object to the task configuration."); - } - } - - public ArrayList<Integer> getNumberSubtasksInOutput() { - ArrayList<Integer> numberOfSubTasksInOutputs = null; - try { - numberOfSubTasksInOutputs = (ArrayList) InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, getClass().getClassLoader()); - } - catch (IOException e) { - throw new RuntimeException("Error while reading the number of subtasks in outputs object from the task configuration."); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Error while reading the number of subtasks in outpurs object from the task configuration. " + - "class not found."); - } - if (numberOfSubTasksInOutputs == null) { - throw new NullPointerException(); - } - return numberOfSubTasksInOutputs; - - } - - - public void setInputSplitProvider (InputSplitProvider inputSplitProvider) { - try { - InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, INPUT_SPLIT_PROVIDER); - } catch (IOException e) { - throw new RuntimeException("Error while writing the input split provider object to the task configuration."); - } - } - - public InputSplitProvider getInputSplitProvider () { - InputSplitProvider inputSplitProvider = null; - try { - inputSplitProvider = (InputSplitProvider) InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, getClass().getClassLoader()); - } - catch (IOException e) { - throw new RuntimeException("Error while reading the input split provider object from the task configuration."); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Error while reading the input split provider object from the task configuration. " + - "ChannelSelector class not found."); - } - if (inputSplitProvider == null) { - throw new NullPointerException(); - } - return inputSplitProvider; - } - - - public void setInputPositions(HashMap<String,ArrayList<Integer>> inputPositions) { - try { - InstantiationUtil.writeObjectToConfig(inputPositions, this.config, INPUT_POSITIONS); - } catch (IOException e) { - throw new RuntimeException("Error while writing the input positions object to the task configuration."); - } - } - - public HashMap<String,ArrayList<Integer>> getInputPositions () { - HashMap<String,ArrayList<Integer>> inputPositions = null; - try { - inputPositions = (HashMap) InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, getClass().getClassLoader()); - } - catch (IOException e) { - throw new RuntimeException("Error while reading the input positions object from the task configuration."); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Error while reading the input positions object from the task configuration. " + - "ChannelSelector class not found."); - } - if (inputPositions == null) { - throw new NullPointerException(); - } - return inputPositions; - } - - public void setInputFormat (InputFormat inputFormat) { - try { - InstantiationUtil.writeObjectToConfig(inputFormat, this.config, INPUT_FORMAT); - } catch (IOException e) { - throw new RuntimeException("Error while writing the input format object to the task configuration."); - } - } - - public InputFormat getInputFormat () { - InputFormat inputFormat = null; - try { - inputFormat = (InputFormat) InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, getClass().getClassLoader()); - } - catch (IOException e) { - throw new RuntimeException("Error while reading the input split provider object from the task configuration."); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Error while reading the input split provider object from the task configuration. " + - "ChannelSelector class not found."); - } - if (inputFormat == null) { - throw new NullPointerException(); - } - return inputFormat; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java deleted file mode 100644 index 7ceeac8..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - - -import com.google.common.base.Preconditions; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalInput; -import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.library.api.KeyValueReader; -import org.apache.tez.runtime.library.api.KeyValueWriter; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class UnionProcessor extends AbstractLogicalIOProcessor { - - private TezTaskConfig config; - protected Map<String, LogicalInput> inputs; - protected Map<String, LogicalOutput> outputs; - private List<KeyValueReader> readers; - private List<KeyValueWriter> writers; - private int numInputs; - private int numOutputs; - - public UnionProcessor(ProcessorContext context) { - super(context); - } - - @Override - public void initialize() throws Exception { - UserPayload payload = getContext().getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(payload); - - this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); - config.setTaskName(getContext().getTaskVertexName()); - } - - @Override - public void handleEvents(List<Event> processorEvents) { - - } - - @Override - public void close() throws Exception { - - } - - @Override - public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { - this.inputs = inputs; - this.outputs = outputs; - this.numInputs = inputs.size(); - this.numOutputs = outputs.size(); - - this.readers = new ArrayList<KeyValueReader>(numInputs); - if (this.inputs != null) { - for (LogicalInput input: this.inputs.values()) { - input.start(); - readers.add((KeyValueReader) input.getReader()); - } - } - - this.writers = new ArrayList<KeyValueWriter>(numOutputs); - if (this.outputs != null) { - for (LogicalOutput output : this.outputs.values()) { - output.start(); - writers.add((KeyValueWriter) output.getWriter()); - } - } - - Preconditions.checkArgument(writers.size() == 1); - KeyValueWriter writer = writers.get(0); - - for (KeyValueReader reader: this.readers) { - while (reader.next()) { - Object key = reader.getCurrentKey(); - Object value = reader.getCurrentValue(); - writer.write(key, value); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java deleted file mode 100644 index ef59fd0..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.input; - -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.util.InstantiationUtil; -import org.apache.tez.runtime.api.AbstractLogicalInput; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.InputContext; -import org.apache.tez.runtime.api.Reader; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - - -public class FlinkInput extends AbstractLogicalInput { - - private static final Log LOG = LogFactory.getLog(FlinkInput.class); - - private InputSplit split; - private boolean splitIsCreated; - private final ReentrantLock rrLock = new ReentrantLock(); - private final Condition rrInited = rrLock.newCondition(); - - public FlinkInput(InputContext inputContext, int numPhysicalInputs) { - super(inputContext, numPhysicalInputs); - getContext().requestInitialMemory(0l, null); // mandatory call - split = null; - } - - @Override - public void handleEvents(List<Event> inputEvents) throws Exception { - - LOG.info("Received " + inputEvents.size() + " events (should be = 1)"); - - Event event = inputEvents.iterator().next(); - - Preconditions.checkArgument(event instanceof InputDataInformationEvent, - getClass().getSimpleName() - + " can only handle a single event of type: " - + InputDataInformationEvent.class.getSimpleName()); - - initSplitFromEvent ((InputDataInformationEvent)event); - } - - private void initSplitFromEvent (InputDataInformationEvent e) throws Exception { - rrLock.lock(); - - try { - ByteString byteString = ByteString.copyFrom(e.getUserPayload()); - this.split = (InputSplit) InstantiationUtil.deserializeObject(byteString.toByteArray(), getClass().getClassLoader()); - this.splitIsCreated = true; - - LOG.info ("Initializing input split " + split.getSplitNumber() + ": " + split.toString() + " from event (" + e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString()); - - rrInited.signal(); - } - catch (Exception ex) { - throw new IOException( - "Interrupted waiting for InputSplit initialization"); - } - finally { - rrLock.unlock(); - } - } - - @Override - public List<Event> close() throws Exception { - return null; - } - - @Override - public void start() throws Exception { - } - - @Override - public Reader getReader() throws Exception { - throw new RuntimeException("FlinkInput does not contain a Reader. Should use getSplit instead"); - } - - @Override - public List<Event> initialize() throws Exception { - return null; - } - - public InputSplit getSplit () throws Exception { - - rrLock.lock(); - try { - if (!splitIsCreated) { - checkAndAwaitSplitInitialization(); - } - } - finally { - rrLock.unlock(); - } - if (split == null) { - LOG.info("Input split has not been created. This should not happen"); - throw new RuntimeException("Input split has not been created. This should not happen"); - } - return split; - } - - void checkAndAwaitSplitInitialization() throws IOException { - assert rrLock.getHoldCount() == 1; - rrLock.lock(); - try { - rrInited.await(); - } catch (Exception e) { - throw new IOException( - "Interrupted waiting for InputSplit initialization"); - } finally { - rrLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java deleted file mode 100644 index db1261c..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.input; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.InputInitializer; -import org.apache.tez.runtime.api.InputInitializerContext; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.InputInitializerEvent; - -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; - -public class FlinkInputSplitGenerator extends InputInitializer { - - private static final Log LOG = LogFactory.getLog(FlinkInputSplitGenerator.class); - - InputFormat format; - - public FlinkInputSplitGenerator(InputInitializerContext initializerContext) { - super(initializerContext); - } - - @Override - public List<Event> initialize() throws Exception { - - Configuration tezConf = TezUtils.createConfFromUserPayload(this.getContext().getUserPayload()); - - TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); - - this.format = taskConfig.getInputFormat(); - - int numTasks = this.getContext().getNumTasks(); - - LOG.info ("Creating splits for " + numTasks + " tasks for input format " + format); - - InputSplit[] splits = format.createInputSplits((numTasks > 0) ? numTasks : 1 ); - - LOG.info ("Created " + splits.length + " input splits" + " tasks for input format " + format); - - //LOG.info ("Created + " + splits.length + " input splits for input format " + format); - - LOG.info ("Sending input split events"); - LinkedList<Event> events = new LinkedList<Event>(); - for (int i = 0; i < splits.length; i++) { - byte [] bytes = InstantiationUtil.serializeObject(splits[i]); - ByteBuffer buf = ByteBuffer.wrap(bytes); - InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf); - event.setTargetIndex(i % numTasks); - events.add(event); - LOG.info ("Added event of index " + i + ": " + event); - } - return events; - } - - @Override - public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { - - } - - @Override - public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { - //super.onVertexStateUpdated(stateUpdate); - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java deleted file mode 100644 index 722f0a1..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.input; - - -import org.apache.flink.util.MutableObjectIterator; -import org.apache.hadoop.io.IntWritable; -import org.apache.tez.runtime.library.api.KeyValueReader; - -import java.io.IOException; - -public class TezReaderIterator<T> implements MutableObjectIterator<T>{ - - private KeyValueReader kvReader; - - public TezReaderIterator(KeyValueReader kvReader) { - this.kvReader = kvReader; - } - - @Override - public T next(T reuse) throws IOException { - if (kvReader.next()) { - Object key = kvReader.getCurrentKey(); - Object value = kvReader.getCurrentValue(); - if (!(key instanceof IntWritable)) { - throw new IllegalStateException("Wrong key type"); - } - reuse = (T) value; - return reuse; - } - else { - return null; - } - } - - @Override - public T next() throws IOException { - if (kvReader.next()) { - Object key = kvReader.getCurrentKey(); - Object value = kvReader.getCurrentValue(); - if (!(key instanceof IntWritable)) { - throw new IllegalStateException("Wrong key type"); - } - return (T) value; - } - else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java deleted file mode 100644 index 2358f29..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.output; - - -import org.apache.hadoop.io.IntWritable; -import org.apache.tez.runtime.library.api.Partitioner; - -public class SimplePartitioner implements Partitioner { - - @Override - public int getPartition(Object key, Object value, int numPartitions) { - if (!(key instanceof IntWritable)) { - throw new IllegalStateException("Partitioning key should be int"); - } - IntWritable channel = (IntWritable) key; - return channel.get(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java deleted file mode 100644 index 7e5cd55..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.output; - -import java.io.Serializable; - -public interface TezChannelSelector<T> extends Serializable { - - /** - * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded. - * - * @param record - * the record to the determine the output channels for - * @param numberOfOutputChannels - * the total number of output channels which are attached to respective output gate - * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through - * which the record shall be forwarded - */ - int[] selectChannels(T record, int numberOfOutputChannels); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java deleted file mode 100644 index b68e6c8..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.output; - - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.util.Collector; -import org.apache.hadoop.io.IntWritable; -import org.apache.tez.runtime.library.api.KeyValueWriter; - -import java.io.IOException; -import java.util.List; - -public class TezOutputCollector<T> implements Collector<T> { - - private List<KeyValueWriter> writers; - - private List<TezChannelSelector<T>> outputEmitters; - - private List<Integer> numberOfStreamsInOutputs; - - private int numOutputs; - - private TypeSerializer<T> serializer; - - public TezOutputCollector(List<KeyValueWriter> writers, List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, List<Integer> numberOfStreamsInOutputs) { - this.writers = writers; - this.outputEmitters = outputEmitters; - this.numberOfStreamsInOutputs = numberOfStreamsInOutputs; - this.serializer = serializer; - this.numOutputs = writers.size(); - } - - @Override - public void collect(T record) { - for (int i = 0; i < numOutputs; i++) { - KeyValueWriter writer = writers.get(i); - TezChannelSelector<T> outputEmitter = outputEmitters.get(i); - int numberOfStreamsInOutput = numberOfStreamsInOutputs.get(i); - try { - for (int channel : outputEmitter.selectChannels(record, numberOfStreamsInOutput)) { - IntWritable key = new IntWritable(channel); - writer.write(key, record); - } - } - catch (IOException e) { - throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e); - } - } - } - - @Override - public void close() { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java deleted file mode 100644 index 6dcee0b..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime.output; - -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - -public class TezOutputEmitter<T> implements TezChannelSelector<T> { - - private final ShipStrategyType strategy; // the shipping strategy used by this output emitter - - private int[] channels; // the reused array defining target channels - - private int nextChannelToSendTo = 0; // counter to go over channels round robin - - private final TypeComparator<T> comparator; // the comparator for hashing / sorting - - // ------------------------------------------------------------------------ - // Constructors - // ------------------------------------------------------------------------ - - /** - * Creates a new channel selector that distributes data round robin. - */ - public TezOutputEmitter() { - this(ShipStrategyType.NONE); - } - - /** - * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...). - * - * @param strategy The distribution strategy to be used. - */ - public TezOutputEmitter(ShipStrategyType strategy) { - this(strategy, null); - } - - /** - * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...) - * and uses the supplied comparator to hash / compare records for partitioning them deterministically. - * - * @param strategy The distribution strategy to be used. - * @param comparator The comparator used to hash / compare the records. - */ - public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) { - this(strategy, comparator, null); - } - - /** - * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...) - * and uses the supplied comparator to hash / compare records for partitioning them deterministically. - * - * @param strategy The distribution strategy to be used. - * @param comparator The comparator used to hash / compare the records. - * @param distr The distribution pattern used in the case of a range partitioning. - */ - public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) { - if (strategy == null) { - throw new NullPointerException(); - } - - this.strategy = strategy; - this.comparator = comparator; - - switch (strategy) { - case FORWARD: - case PARTITION_HASH: - case PARTITION_RANGE: - case PARTITION_RANDOM: - case PARTITION_FORCED_REBALANCE: - case BROADCAST: - break; - default: - throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name()); - } - - if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) { - throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning."); - } - } - - // ------------------------------------------------------------------------ - // Channel Selection - // ------------------------------------------------------------------------ - - @Override - public final int[] selectChannels(T record, int numberOfChannels) { - switch (strategy) { - case FORWARD: - case PARTITION_RANDOM: - case PARTITION_FORCED_REBALANCE: - return robin(numberOfChannels); - case PARTITION_HASH: - return hashPartitionDefault(record, numberOfChannels); - case PARTITION_RANGE: - return rangePartition(record, numberOfChannels); - case BROADCAST: - return broadcast(numberOfChannels); - default: - throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name()); - } - } - - // -------------------------------------------------------------------------------------------- - - private final int[] robin(int numberOfChannels) { - if (this.channels == null || this.channels.length != 1) { - this.channels = new int[1]; - } - - int nextChannel = nextChannelToSendTo + 1; - nextChannel = nextChannel < numberOfChannels ? nextChannel : 0; - - this.nextChannelToSendTo = nextChannel; - this.channels[0] = nextChannel; - return this.channels; - } - - private final int[] broadcast(int numberOfChannels) { - if (channels == null || channels.length != numberOfChannels) { - channels = new int[numberOfChannels]; - for (int i = 0; i < numberOfChannels; i++) { - channels[i] = i; - } - } - - return channels; - } - - private final int[] hashPartitionDefault(T record, int numberOfChannels) { - if (channels == null || channels.length != 1) { - channels = new int[1]; - } - - int hash = this.comparator.hash(record); - - hash = murmurHash(hash); - - if (hash >= 0) { - this.channels[0] = hash % numberOfChannels; - } - else if (hash != Integer.MIN_VALUE) { - this.channels[0] = -hash % numberOfChannels; - } - else { - this.channels[0] = 0; - } - - return this.channels; - } - - private final int murmurHash(int k) { - k *= 0xcc9e2d51; - k = Integer.rotateLeft(k, 15); - k *= 0x1b873593; - - k = Integer.rotateLeft(k, 13); - k *= 0xe6546b64; - - k ^= 4; - k ^= k >>> 16; - k *= 0x85ebca6b; - k ^= k >>> 13; - k *= 0xc2b2ae35; - k ^= k >>> 16; - - return k; - } - - private final int[] rangePartition(T record, int numberOfChannels) { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java deleted file mode 100644 index 39d247c..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.util; - - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -public class DummyInvokable extends AbstractInvokable { - - private ExecutionConfig executionConfig; - - public DummyInvokable() { - } - - public DummyInvokable(ExecutionConfig executionConfig) { - this.executionConfig = executionConfig; - } - - public void setExecutionConfig(ExecutionConfig executionConfig) { - this.executionConfig = executionConfig; - } - - @Override - public void registerInputOutput() {} - - - @Override - public void invoke() throws Exception {} - - @Override - public ExecutionConfig getExecutionConfig() { - return executionConfig; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java deleted file mode 100644 index 202cb24..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.util; - -import org.apache.flink.util.InstantiationUtil; -import org.apache.commons.codec.binary.Base64; - -import java.io.IOException; - -public class EncodingUtils { - - public static Object decodeObjectFromString(String encoded, ClassLoader cl) { - - try { - if (encoded == null) { - return null; - } - byte[] bytes = Base64.decodeBase64(encoded); - - return InstantiationUtil.deserializeObject(bytes, cl); - } - catch (IOException e) { - e.printStackTrace(); - System.exit(-1); - throw new RuntimeException(); - } - catch (ClassNotFoundException e) { - e.printStackTrace(); - System.exit(-1); - throw new RuntimeException(); - } - } - - public static String encodeObjectToString(Object o) { - - try { - byte[] bytes = InstantiationUtil.serializeObject(o); - - String encoded = Base64.encodeBase64String(bytes); - return encoded; - } - catch (IOException e) { - e.printStackTrace(); - System.exit(-1); - throw new RuntimeException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java deleted file mode 100644 index 07c5f97..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.util; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public class FlinkSerialization<T> extends Configured implements Serialization<T>{ - - @Override - public boolean accept(Class<?> c) { - TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); - T instance = typeSerializer.createInstance(); - return instance.getClass().isAssignableFrom(c); - } - - @Override - public Serializer<T> getSerializer(Class<T> c) { - TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); - return new FlinkSerializer<T>(typeSerializer); - } - - @Override - public Deserializer<T> getDeserializer(Class<T> c) { - TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader()); - return new FlinkDeserializer<T>(typeSerializer); - } - - public static class FlinkSerializer<T> implements Serializer<T> { - - private OutputStream dataOut; - private DataOutputViewOutputStreamWrapper dataOutputView; - private TypeSerializer<T> typeSerializer; - - public FlinkSerializer(TypeSerializer<T> typeSerializer) { - this.typeSerializer = typeSerializer; - } - - @Override - public void open(OutputStream out) throws IOException { - this.dataOut = out; - this.dataOutputView = new DataOutputViewOutputStreamWrapper(out); - } - - @Override - public void serialize(T t) throws IOException { - typeSerializer.serialize(t, dataOutputView); - } - - @Override - public void close() throws IOException { - this.dataOut.close(); - } - } - - public static class FlinkDeserializer<T> implements Deserializer<T> { - - private InputStream dataIn; - private TypeSerializer<T> typeSerializer; - private DataInputViewInputStreamWrapper dataInputView; - - public FlinkDeserializer(TypeSerializer<T> typeSerializer) { - this.typeSerializer = typeSerializer; - } - - @Override - public void open(InputStream in) throws IOException { - this.dataIn = in; - this.dataInputView = new DataInputViewInputStreamWrapper(in); - } - - @Override - public T deserialize(T t) throws IOException { - T reuse = t; - if (reuse == null) { - reuse = typeSerializer.createInstance(); - } - return typeSerializer.deserialize(reuse, dataInputView); - } - - @Override - public void close() throws IOException { - this.dataIn.close(); - } - } - - private static final class DataOutputViewOutputStreamWrapper implements DataOutputView { - - private final DataOutputStream dos; - - public DataOutputViewOutputStreamWrapper(OutputStream output) { - this.dos = new DataOutputStream(output); - } - - @Override - public void write(int b) throws IOException { - dos.write(b); - } - - @Override - public void write(byte[] b) throws IOException { - dos.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - dos.write(b, off, len); - } - - @Override - public void writeBoolean(boolean v) throws IOException { - dos.writeBoolean(v); - } - - @Override - public void writeByte(int v) throws IOException { - dos.writeByte(v); - } - - @Override - public void writeShort(int v) throws IOException { - dos.writeShort(v); - } - - @Override - public void writeChar(int v) throws IOException { - dos.writeChar(v); - } - - @Override - public void writeInt(int v) throws IOException { - dos.writeInt(v); - } - - @Override - public void writeLong(long v) throws IOException { - dos.writeLong(v); - } - - @Override - public void writeFloat(float v) throws IOException { - dos.writeFloat(v); - } - - @Override - public void writeDouble(double v) throws IOException { - dos.writeDouble(v); - } - - @Override - public void writeBytes(String s) throws IOException { - dos.writeBytes(s); - } - - @Override - public void writeChars(String s) throws IOException { - dos.writeChars(s); - } - - @Override - public void writeUTF(String s) throws IOException { - dos.writeUTF(s); - } - - @Override - public void skipBytesToWrite(int num) throws IOException { - for (int i = 0; i < num; i++) { - dos.write(0); - } - } - - @Override - public void write(DataInputView inview, int num) throws IOException { - for (int i = 0; i < num; i++) { - dos.write(inview.readByte()); - } - } - } - - private static final class DataInputViewInputStreamWrapper implements DataInputView { - - private final DataInputStream dis; - - - public DataInputViewInputStreamWrapper(InputStream input) { - this.dis = new DataInputStream(input); - } - - @Override - public void readFully(byte[] b) throws IOException { - dis.readFully(b); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - dis.readFully(b, off, len); - } - - @Override - public int skipBytes(int n) throws IOException { - return dis.skipBytes(n); - } - - @Override - public boolean readBoolean() throws IOException { - return dis.readBoolean(); - } - - @Override - public byte readByte() throws IOException { - return dis.readByte(); - } - - @Override - public int readUnsignedByte() throws IOException { - return dis.readUnsignedByte(); - } - - @Override - public short readShort() throws IOException { - return dis.readShort(); - } - - @Override - public int readUnsignedShort() throws IOException { - return dis.readUnsignedShort(); - } - - @Override - public char readChar() throws IOException { - return dis.readChar(); - } - - @Override - public int readInt() throws IOException { - return dis.readInt(); - } - - @Override - public long readLong() throws IOException { - return dis.readLong(); - } - - @Override - public float readFloat() throws IOException { - return dis.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return dis.readDouble(); - } - - @Override - public String readLine() throws IOException { - return dis.readLine(); - } - - @Override - public String readUTF() throws IOException { - return dis.readUTF(); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - while (numBytes > 0) { - numBytes -= dis.skipBytes(numBytes); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - dis.readFully(b, off, len); - return len; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/resources/log4j.properties b/flink-contrib/flink-tez/src/main/resources/log4j.properties deleted file mode 100644 index 0845c81..0000000 --- a/flink-contrib/flink-tez/src/main/resources/log4j.properties +++ /dev/null @@ -1,30 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -log4j.rootLogger=INFO, testlogger - -# A1 is set to be a ConsoleAppender. -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java deleted file mode 100644 index 9124faa..0000000 --- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.test.testdata.ConnectedComponentsData; -import org.apache.flink.tez.examples.ConnectedComponentsStep; -import org.junit.Assert; - -import java.io.BufferedReader; -import java.io.IOException; -import java.util.regex.Pattern; - -/* - * Note: This does not test whether the program computes one step of the - * Weakly Connected Components program correctly. It only tests whether - * the program assigns a wrong component to a vertex. - */ - -public class ConnectedComponentsStepITCase extends TezProgramTestBase { - - private static final long SEED = 0xBADC0FFEEBEEFL; - - private static final int NUM_VERTICES = 1000; - - private static final int NUM_EDGES = 10000; - - - private String verticesPath; - private String edgesPath; - private String resultPath; - - - @Override - protected void preSubmit() throws Exception { - verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES)); - edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED)); - resultPath = getTempFilePath("results"); - } - - @Override - protected void testProgram() throws Exception { - ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, "100"); - } - - @Override - protected void postSubmit() throws Exception { - for (BufferedReader reader : getResultReader(resultPath)) { - checkOddEvenResult(reader); - } - } - - private static void checkOddEvenResult(BufferedReader result) throws IOException { - Pattern split = Pattern.compile(" "); - String line; - while ((line = result.readLine()) != null) { - String[] res = split.split(line); - Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length); - try { - int vertex = Integer.parseInt(res[0]); - int component = Integer.parseInt(res[1]); - Assert.assertTrue(((vertex % 2) == (component % 2))); - } catch (NumberFormatException e) { - Assert.fail("Malformed result."); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java deleted file mode 100644 index 9a203fe..0000000 --- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.test.testdata.PageRankData; -import org.apache.flink.tez.examples.PageRankBasicStep; - -public class PageRankBasicStepITCase extends TezProgramTestBase { - - private String verticesPath; - private String edgesPath; - private String resultPath; - private String expectedResult; - - public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" + - "2 0.25666666666666665\n" + - "3 0.1716666666666667\n" + - "4 0.1716666666666667\n" + - "5 0.2"; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); - edgesPath = createTempFile("edges.txt", PageRankData.EDGES); - } - - @Override - protected void testProgram() throws Exception { - PageRankBasicStep.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "-1"}); - expectedResult = RANKS_AFTER_1_ITERATION; - } - - @Override - protected void postSubmit() throws Exception { - compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java deleted file mode 100644 index eda9d1a..0000000 --- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.tez.client.LocalTezEnvironment; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -public abstract class TezProgramTestBase extends AbstractTestBase { - - private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4; - - private JobExecutionResult latestExecutionResult; - - private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM; - - - public TezProgramTestBase() { - this(new Configuration()); - } - - public TezProgramTestBase(Configuration config) { - super (config); - } - - - public void setParallelism(int degreeOfParallelism) { - this.degreeOfParallelism = degreeOfParallelism; - } - - public JobExecutionResult getLatestExecutionResult() { - return this.latestExecutionResult; - } - - - protected abstract void testProgram() throws Exception; - - protected void preSubmit() throws Exception {} - - protected void postSubmit() throws Exception {} - - // -------------------------------------------------------------------------------------------- - // Test entry point - // -------------------------------------------------------------------------------------------- - - // Ignored due to deadlocks in Tez 0.6.1 (https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt) - // TODO Reactivate with future Tez versions - @Ignore - @Test - public void testJob() throws Exception { - // pre-submit - try { - preSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Pre-submit work caused an error: " + e.getMessage()); - } - - // prepare the test environment - LocalTezEnvironment env = LocalTezEnvironment.create(); - env.setParallelism(degreeOfParallelism); - env.setAsContext(); - - // call the test program - try { - testProgram(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Error while calling the test program: " + e.getMessage()); - } - - // post-submit - try { - postSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Post-submit work caused an error: " + e.getMessage()); - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java deleted file mode 100644 index 35aa54a..0000000 --- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - - -import org.apache.flink.examples.java.relational.WebLogAnalysis; -import org.apache.flink.test.testdata.WebLogAnalysisData; - -public class WebLogAnalysisITCase extends TezProgramTestBase { - - private String docsPath; - private String ranksPath; - private String visitsPath; - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - docsPath = createTempFile("docs", WebLogAnalysisData.DOCS); - ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS); - visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS); - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath); - } - @Override - protected void testProgram() throws Exception { - WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath}); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java deleted file mode 100644 index d73aa8b..0000000 --- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.test; - -import org.apache.flink.examples.java.wordcount.WordCount; -import org.apache.flink.test.testdata.WordCountData; - -public class WordCountITCase extends TezProgramTestBase { - - protected String textPath; - protected String resultPath; - - public WordCountITCase(){ - } - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", WordCountData.TEXT); - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); - } - - @Override - protected void testProgram() throws Exception { - WordCount.main(new String[]{textPath, resultPath}); - } -}