http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
deleted file mode 100644
index 89e4642..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.Driver;
-import org.apache.flink.runtime.operators.TaskContext;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.tez.runtime.input.TezReaderIterator;
-import org.apache.flink.tez.runtime.output.TezChannelSelector;
-import org.apache.flink.tez.runtime.output.TezOutputEmitter;
-import org.apache.flink.tez.runtime.output.TezOutputCollector;
-import org.apache.flink.tez.util.DummyInvokable;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-
-public class TezTask<S extends Function,OT>  implements TaskContext<S, OT> {
-
-       protected static final Log LOG = LogFactory.getLog(TezTask.class);
-
-       DummyInvokable invokable = new DummyInvokable();
-
-       /**
-        * The driver that invokes the user code (the stub implementation). The 
central driver in this task
-        * (further drivers may be chained behind this driver).
-        */
-       protected volatile Driver<S, OT> driver;
-
-       /**
-        * The instantiated user code of this task's main operator (driver). 
May be null if the operator has no udf.
-        */
-       protected S stub;
-
-       /**
-        * The udf's runtime context.
-        */
-       protected RuntimeUDFContext runtimeUdfContext;
-
-       /**
-        * The collector that forwards the user code's results. May forward to 
a channel or to chained drivers within
-        * this task.
-        */
-       protected Collector<OT> output;
-
-       /**
-        * The inputs reader, wrapped in an iterator. Prior to the local 
strategies, etc...
-        */
-       protected MutableObjectIterator<?>[] inputIterators;
-
-       /**
-        * The local strategies that are applied on the inputs.
-        */
-       protected volatile CloseableInputProvider<?>[] localStrategies;
-
-       /**
-        * The inputs to the operator. Return the readers' data after the 
application of the local strategy
-        * and the temp-table barrier.
-        */
-       protected MutableObjectIterator<?>[] inputs;
-
-       /**
-        * The serializers for the input data type.
-        */
-       protected TypeSerializerFactory<?>[] inputSerializers;
-
-       /**
-        * The comparators for the central driver.
-        */
-       protected TypeComparator<?>[] inputComparators;
-
-       /**
-        * The task configuration with the setup parameters.
-        */
-       protected TezTaskConfig config;
-
-       /**
-        * The class loader used to instantiate user code and user data types.
-        */
-       protected ClassLoader userCodeClassLoader = 
ClassLoader.getSystemClassLoader();
-
-       /**
-        * For now, create a default ExecutionConfig 
-        */
-       protected ExecutionConfig executionConfig;
-
-       /*
-        * Tez-specific variables given by the Processor
-        */
-       protected TypeSerializer<OT> outSerializer;
-
-       protected List<Integer> numberOfSubTasksInOutputs;
-
-       protected String taskName;
-
-       protected int numberOfSubtasks;
-
-       protected int indexInSubtaskGroup;
-
-       TezRuntimeEnvironment runtimeEnvironment;
-
-       public TezTask(TezTaskConfig config, RuntimeUDFContext 
runtimeUdfContext, long availableMemory) {
-               this.config = config;
-               final Class<? extends Driver<S, OT>> driverClass = 
this.config.getDriver();
-               this.driver = InstantiationUtil.instantiate(driverClass, 
Driver.class);
-               
-               LOG.info("ClassLoader URLs: " + 
Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
-               
-               this.stub = 
this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class,
 this.userCodeClassLoader); //TODO get superclass properly
-               this.runtimeUdfContext = runtimeUdfContext;
-               this.outSerializer = (TypeSerializer<OT>) 
this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer();
-               this.numberOfSubTasksInOutputs = 
this.config.getNumberSubtasksInOutput();
-               this.taskName = this.config.getTaskName();
-               this.numberOfSubtasks = 
this.runtimeUdfContext.getNumberOfParallelSubtasks();
-               this.indexInSubtaskGroup = 
this.runtimeUdfContext.getIndexOfThisSubtask();
-               this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 
* availableMemory));
-               this.executionConfig = runtimeUdfContext.getExecutionConfig();
-               this.invokable.setExecutionConfig(this.executionConfig);
-       }
-
-
-       //-------------------------------------------------------------
-       // Interface to FlinkProcessor
-       //-------------------------------------------------------------
-
-       public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> 
writers) throws Exception {
-
-               // whatever happens in this scope, make sure that the local 
strategies are cleaned up!
-               // note that the initialization of the local strategies is in 
the try-finally block as well,
-               // so that the thread that creates them catches its own errors 
that may happen in that process.
-               // this is especially important, since there may be 
asynchronous closes (such as through canceling).
-               try {
-                       // initialize the inputs and outputs
-                       initInputsOutputs(readers, writers);
-
-                       // pre main-function initialization
-                       initialize();
-
-                       // the work goes here
-                       run();
-               }
-               finally {
-                       // clean up in any case!
-                       closeLocalStrategies();
-               }
-       }
-
-
-       /*
-        * Initialize inputs, input serializers, input comparators, and 
collector
-        * Assumes that the config and userCodeClassLoader has been set
-        */
-       private void initInputsOutputs (List<KeyValueReader> readers, 
List<KeyValueWriter> writers) throws Exception {
-
-               int numInputs = readers.size();
-               Preconditions.checkArgument(numInputs == 
driver.getNumberOfInputs());
-
-               // Prior to local strategies
-               this.inputIterators = new MutableObjectIterator[numInputs];
-               //local strategies
-               this.localStrategies = new CloseableInputProvider[numInputs];
-               // After local strategies
-               this.inputs = new MutableObjectIterator[numInputs];
-
-               int numComparators = driver.getNumberOfDriverComparators();
-               initInputsSerializersAndComparators(numInputs, numComparators);
-
-               int index = 0;
-               for (KeyValueReader reader : readers) {
-                       this.inputIterators[index] = new 
TezReaderIterator<Object>(reader);
-                       initInputLocalStrategy(index);
-                       index++;
-               }
-
-               int numOutputs = writers.size();
-               ArrayList<TezChannelSelector<OT>> channelSelectors = new 
ArrayList<TezChannelSelector<OT>>(numOutputs);
-               //ArrayList<Integer> numStreamsInOutputs = new 
ArrayList<Integer>(numOutputs);
-               for (int i = 0; i < numOutputs; i++) {
-                       final ShipStrategyType strategy = 
config.getOutputShipStrategy(i);
-                       final TypeComparatorFactory<OT> compFactory = 
config.getOutputComparator(i, this.userCodeClassLoader);
-                       final DataDistribution dataDist = 
config.getOutputDataDistribution(i, this.userCodeClassLoader);
-                       if (compFactory == null) {
-                               channelSelectors.add(i, new 
TezOutputEmitter<OT>(strategy));
-                       } else if (dataDist == null){
-                               final TypeComparator<OT> comparator = 
compFactory.createComparator();
-                               channelSelectors.add(i, new 
TezOutputEmitter<OT>(strategy, comparator));
-                       } else {
-                               final TypeComparator<OT> comparator = 
compFactory.createComparator();
-                               channelSelectors.add(i,new 
TezOutputEmitter<OT>(strategy, comparator, dataDist));
-                       }
-               }
-               this.output = new TezOutputCollector<OT>(writers, 
channelSelectors, outSerializer, numberOfSubTasksInOutputs);
-       }
-
-
-
-       // --------------------------------------------------------------------
-       // TaskContext interface
-       // --------------------------------------------------------------------
-
-       @Override
-       public TaskConfig getTaskConfig() {
-               return (TaskConfig) this.config;
-       }
-
-       @Override
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
-       }
-
-       @Override
-       public ClassLoader getUserCodeClassLoader() {
-               return this.userCodeClassLoader;
-       }
-
-       @Override
-       public MemoryManager getMemoryManager() {
-               return runtimeEnvironment.getMemoryManager();
-       }
-
-       @Override
-       public IOManager getIOManager() {
-               return runtimeEnvironment.getIOManager();
-       }
-
-       @Override
-       public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TaskManagerRuntimeInfo("localhost", new 
Configuration());
-       }
-
-       @Override
-       public <X> MutableObjectIterator<X> getInput(int index) {
-               if (index < 0 || index > this.driver.getNumberOfInputs()) {
-                       throw new IndexOutOfBoundsException();
-               }
-               // check for lazy assignment from input strategies
-               if (this.inputs[index] != null) {
-                       @SuppressWarnings("unchecked")
-                       MutableObjectIterator<X> in = 
(MutableObjectIterator<X>) this.inputs[index];
-                       return in;
-               } else {
-                       final MutableObjectIterator<X> in;
-                       try {
-                               if (this.localStrategies[index] != null) {
-                                       @SuppressWarnings("unchecked")
-                                       MutableObjectIterator<X> iter = 
(MutableObjectIterator<X>) this.localStrategies[index].getIterator();
-                                       in = iter;
-                               } else {
-                                       throw new RuntimeException("Bug: null 
input iterator, null temp barrier, and null local strategy.");
-                               }
-                               this.inputs[index] = in;
-                               return in;
-                       } catch (InterruptedException iex) {
-                               throw new RuntimeException("Interrupted while 
waiting for input " + index + " to become available.");
-                       } catch (IOException ioex) {
-                               throw new RuntimeException("An I/O Exception 
occurred whily obaining input " + index + ".");
-                       }
-               }
-       }
-
-       @Override
-       public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
-               if (index < 0 || index >= this.driver.getNumberOfInputs()) {
-                       throw new IndexOutOfBoundsException();
-               }
-
-               @SuppressWarnings("unchecked")
-               final TypeSerializerFactory<X> serializerFactory = 
(TypeSerializerFactory<X>) this.inputSerializers[index];
-               return serializerFactory;
-       }
-
-       @Override
-       public <X> TypeComparator<X> getDriverComparator(int index) {
-               if (this.inputComparators == null) {
-                       throw new IllegalStateException("Comparators have not 
been created!");
-               }
-               else if (index < 0 || index >= 
this.driver.getNumberOfDriverComparators()) {
-                       throw new IndexOutOfBoundsException();
-               }
-
-               @SuppressWarnings("unchecked")
-               final TypeComparator<X> comparator = (TypeComparator<X>) 
this.inputComparators[index];
-               return comparator;
-       }
-
-
-
-       @Override
-       public S getStub() {
-               return this.stub;
-       }
-
-       @Override
-       public Collector<OT> getOutputCollector() {
-               return this.output;
-       }
-
-       @Override
-       public AbstractInvokable getOwningNepheleTask() {
-               return this.invokable;
-       }
-
-       @Override
-       public String formatLogString(String message) {
-               return null;
-       }
-
-
-       // --------------------------------------------------------------------
-       // Adapted from BatchTask
-       // --------------------------------------------------------------------
-
-       private void initInputLocalStrategy(int inputNum) throws Exception {
-               // check if there is already a strategy
-               if (this.localStrategies[inputNum] != null) {
-                       throw new IllegalStateException();
-               }
-
-               // now set up the local strategy
-               final LocalStrategy localStrategy = 
this.config.getInputLocalStrategy(inputNum);
-               if (localStrategy != null) {
-                       switch (localStrategy) {
-                               case NONE:
-                                       // the input is as it is
-                                       this.inputs[inputNum] = 
this.inputIterators[inputNum];
-                                       break;
-                               case SORT:
-                                       @SuppressWarnings({ "rawtypes", 
"unchecked" })
-                                       UnilateralSortMerger<?> sorter = new 
UnilateralSortMerger(getMemoryManager(), getIOManager(),
-                                                       
this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
-                                                       
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                                       
this.config.getSpillingThresholdInput(inputNum), 
this.executionConfig.isObjectReuseEnabled());
-                                       // set the input to null such that it 
will be lazily fetched from the input strategy
-                                       this.inputs[inputNum] = null;
-                                       this.localStrategies[inputNum] = sorter;
-                                       break;
-                               case COMBININGSORT:
-                                       // sanity check this special case!
-                                       // this still breaks a bit of the 
abstraction!
-                                       // we should have nested configurations 
for the local strategies to solve that
-                                       if (inputNum != 0) {
-                                               throw new 
IllegalStateException("Performing combining sort outside a (group)reduce 
task!");
-                                       }
-
-                                       // instantiate ourselves a combiner. we 
should not use the stub, because the sort and the
-                                       // subsequent (group)reduce would 
otherwise share it multi-threaded
-                                       final Class<S> userCodeFunctionType = 
this.driver.getStubType();
-                                       if (userCodeFunctionType == null) {
-                                               throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
-                                       }
-                                       final S localStub;
-                                       try {
-                                               localStub = 
initStub(userCodeFunctionType);
-                                       } catch (Exception e) {
-                                               throw new 
RuntimeException("Initializing the user code and the configuration failed" +
-                                                               (e.getMessage() 
== null ? "." : ": " + e.getMessage()), e);
-                                       }
-
-                                       if (!(localStub instanceof 
GroupCombineFunction)) {
-                                               throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
-                                       }
-
-                                       @SuppressWarnings({ "rawtypes", 
"unchecked" })
-                                       CombiningUnilateralSortMerger<?> 
cSorter = new CombiningUnilateralSortMerger(
-                                                       (GroupCombineFunction) 
localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
-                                                       this.invokable, 
this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-                                                       
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                                       
this.config.getSpillingThresholdInput(inputNum), 
this.executionConfig.isObjectReuseEnabled());
-                                       
cSorter.setUdfConfiguration(this.config.getStubParameters());
-
-                                       // set the input to null such that it 
will be lazily fetched from the input strategy
-                                       this.inputs[inputNum] = null;
-                                       this.localStrategies[inputNum] = 
cSorter;
-                                       break;
-                               default:
-                                       throw new Exception("Unrecognized local 
strategy provided: " + localStrategy.name());
-                       }
-               } else {
-                       // no local strategy in the config
-                       this.inputs[inputNum] = this.inputIterators[inputNum];
-               }
-       }
-
-       private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) 
throws Exception {
-               TypeComparatorFactory<T> compFact = 
this.config.getInputComparator(inputNum, this.userCodeClassLoader);
-               if (compFact == null) {
-                       throw new Exception("Missing comparator factory for 
local strategy on input " + inputNum);
-               }
-               return compFact.createComparator();
-       }
-
-       protected S initStub(Class<? super S> stubSuperClass) throws Exception {
-               try {
-                       S stub = 
config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass,
 this.userCodeClassLoader);
-                       // check if the class is a subclass, if the check is 
required
-                       if (stubSuperClass != null && 
!stubSuperClass.isAssignableFrom(stub.getClass())) {
-                               throw new RuntimeException("The class '" + 
stub.getClass().getName() + "' is not a subclass of '" +
-                                               stubSuperClass.getName() + "' 
as is required.");
-                       }
-                       FunctionUtils.setFunctionRuntimeContext(stub, 
this.runtimeUdfContext);
-                       return stub;
-               }
-               catch (ClassCastException ccex) {
-                       throw new Exception("The stub class is not a proper 
subclass of " + stubSuperClass.getName(), ccex);
-               }
-       }
-
-       /**
-        * Creates all the serializers and comparators.
-        */
-       protected void initInputsSerializersAndComparators(int numInputs, int 
numComparators) throws Exception {
-               this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-               this.inputComparators = numComparators > 0 ? new 
TypeComparator[numComparators] : null;
-               //this.inputComparators = 
this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
-               this.inputIterators = new MutableObjectIterator[numInputs];
-
-               for (int i = 0; i < numInputs; i++) {
-                       //  ---------------- create the serializer first 
---------------------
-                       final TypeSerializerFactory<?> serializerFactory = 
this.config.getInputSerializer(i, this.userCodeClassLoader);
-                       this.inputSerializers[i] = serializerFactory;
-                       // this.inputIterators[i] = 
createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
-               }
-               //  ---------------- create the driver's comparators 
---------------------
-               for (int i = 0; i < numComparators; i++) {
-                       if (this.inputComparators != null) {
-                               final TypeComparatorFactory<?> 
comparatorFactory = this.config.getDriverComparator(i, 
this.userCodeClassLoader);
-                               this.inputComparators[i] = 
comparatorFactory.createComparator();
-                       }
-               }
-       }
-
-       protected void initialize() throws Exception {
-               // create the operator
-               try {
-                       this.driver.setup(this);
-               }
-               catch (Throwable t) {
-                       throw new Exception("The driver setup for '" + //TODO 
put taks name here
-                                       "' , caused an error: " + 
t.getMessage(), t);
-               }
-
-               //this.runtimeUdfContext = createRuntimeContext();
-
-               // instantiate the UDF
-               try {
-                       final Class<? super S> userCodeFunctionType = 
this.driver.getStubType();
-                       // if the class is null, the driver has no user code
-                       if (userCodeFunctionType != null) {
-                               this.stub = initStub(userCodeFunctionType);
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Initializing the UDF" +
-                                       e.getMessage() == null ? "." : ": " + 
e.getMessage(), e);
-               }
-       }
-
-       /*
-       public RuntimeUDFContext createRuntimeContext() {
-               return new RuntimeUDFContext(this.taskName, 
this.numberOfSubtasks, this.indexInSubtaskGroup, null);
-       }
-       */
-
-       protected void closeLocalStrategies() {
-               if (this.localStrategies != null) {
-                       for (int i = 0; i < this.localStrategies.length; i++) {
-                               if (this.localStrategies[i] != null) {
-                                       try {
-                                               this.localStrategies[i].close();
-                                       } catch (Throwable t) {
-                                               LOG.error("Error closing local 
strategy for input " + i, t);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       protected void run() throws Exception {
-               // ---------------------------- Now, the actual processing 
starts ------------------------
-               // check for asynchronous canceling
-
-               boolean stubOpen = false;
-
-               try {
-                       // run the data preparation
-                       try {
-                               this.driver.prepare();
-                       }
-                       catch (Throwable t) {
-                               // if the preparation caused an error, clean up
-                               // errors during clean-up are swallowed, 
because we have already a root exception
-                               throw new Exception("The data preparation for 
task '" + this.taskName +
-                                               "' , caused an error: " + 
t.getMessage(), t);
-                       }
-
-                       // open stub implementation
-                       if (this.stub != null) {
-                               try {
-                                       Configuration stubConfig = 
this.config.getStubParameters();
-                                       FunctionUtils.openFunction(this.stub, 
stubConfig);
-                                       stubOpen = true;
-                               }
-                               catch (Throwable t) {
-                                       throw new Exception("The user defined 
'open()' method caused an exception: " + t.getMessage(), t);
-                               }
-                       }
-
-                       // run the user code
-                       this.driver.run();
-
-                       // close. We close here such that a regular close 
throwing an exception marks a task as failed.
-                       if (this.stub != null) {
-                               FunctionUtils.closeFunction(this.stub);
-                               stubOpen = false;
-                       }
-
-                       this.output.close();
-
-               }
-               catch (Exception ex) {
-                       // close the input, but do not report any exceptions, 
since we already have another root cause
-                       ex.printStackTrace();
-                       throw new RuntimeException("Exception in TaskContext: " 
+ ex.getMessage() + " "+  ex.getStackTrace());
-               }
-               finally {
-                       this.driver.cleanup();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
deleted file mode 100644
index 94a8315..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class TezTaskConfig extends TaskConfig {
-
-       public static final String TEZ_TASK_CONFIG = 
"tez.task.flink.processor.taskconfig";
-
-       private static final String NUMBER_SUBTASKS_IN_OUTPUTS = 
"tez.num_subtasks_in_output";
-
-       private static final String INPUT_SPLIT_PROVIDER = 
"tez.input_split_provider";
-
-       private static final String INPUT_POSITIONS = "tez.input_positions";
-
-       private static final String INPUT_FORMAT = "tez.input_format";
-
-       private static final String DATASOURCE_PROCESSOR_NAME = 
"tez.datasource_processor_name";
-
-       public TezTaskConfig(Configuration config) {
-               super(config);
-       }
-
-
-       public void setDatasourceProcessorName(String name) {
-               if (name != null) {
-                       this.config.setString(DATASOURCE_PROCESSOR_NAME, name);
-               }
-       }
-
-       public String getDatasourceProcessorName() {
-               return this.config.getString(DATASOURCE_PROCESSOR_NAME, null);
-       }
-
-       public void setNumberSubtasksInOutput(ArrayList<Integer> 
numberSubtasksInOutputs) {
-               try {
-                       
InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, 
NUMBER_SUBTASKS_IN_OUTPUTS);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while writing the 
input split provider object to the task configuration.");
-               }
-       }
-
-       public ArrayList<Integer> getNumberSubtasksInOutput() {
-               ArrayList<Integer> numberOfSubTasksInOutputs = null;
-               try {
-                       numberOfSubTasksInOutputs = (ArrayList) 
InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, 
getClass().getClassLoader());
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Error while reading the 
number of subtasks in outputs object from the task configuration.");
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Error while reading the 
number of subtasks in outpurs object from the task configuration. " +
-                                       "class not found.");
-               }
-               if (numberOfSubTasksInOutputs == null) {
-                       throw new NullPointerException();
-               }
-               return numberOfSubTasksInOutputs;
-
-       }
-
-
-       public void setInputSplitProvider (InputSplitProvider 
inputSplitProvider) {
-               try {
-                       
InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, 
INPUT_SPLIT_PROVIDER);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while writing the 
input split provider object to the task configuration.");
-               }
-       }
-
-       public InputSplitProvider getInputSplitProvider () {
-               InputSplitProvider inputSplitProvider = null;
-               try {
-                       inputSplitProvider = (InputSplitProvider) 
InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, 
getClass().getClassLoader());
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration.");
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration. " +
-                                       "ChannelSelector class not found.");
-               }
-               if (inputSplitProvider == null) {
-                       throw new NullPointerException();
-               }
-               return inputSplitProvider;
-       }
-
-
-       public void setInputPositions(HashMap<String,ArrayList<Integer>> 
inputPositions) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(inputPositions, 
this.config, INPUT_POSITIONS);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while writing the 
input positions object to the task configuration.");
-               }
-       }
-
-       public HashMap<String,ArrayList<Integer>> getInputPositions () {
-               HashMap<String,ArrayList<Integer>> inputPositions = null;
-               try {
-                       inputPositions = (HashMap) 
InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, 
getClass().getClassLoader());
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Error while reading the 
input positions object from the task configuration.");
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Error while reading the 
input positions object from the task configuration. " +
-                                       "ChannelSelector class not found.");
-               }
-               if (inputPositions == null) {
-                       throw new NullPointerException();
-               }
-               return inputPositions;
-       }
-
-       public void setInputFormat (InputFormat inputFormat) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(inputFormat, 
this.config, INPUT_FORMAT);
-               } catch (IOException e) {
-                       throw new RuntimeException("Error while writing the 
input format object to the task configuration.");
-               }
-       }
-
-       public InputFormat getInputFormat () {
-               InputFormat inputFormat = null;
-               try {
-                       inputFormat = (InputFormat) 
InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, 
getClass().getClassLoader());
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration.");
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Error while reading the 
input split provider object from the task configuration. " +
-                                       "ChannelSelector class not found.");
-               }
-               if (inputFormat == null) {
-                       throw new NullPointerException();
-               }
-               return inputFormat;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
deleted file mode 100644
index 7ceeac8..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class UnionProcessor extends AbstractLogicalIOProcessor {
-
-       private TezTaskConfig config;
-       protected Map<String, LogicalInput> inputs;
-       protected Map<String, LogicalOutput> outputs;
-       private List<KeyValueReader> readers;
-       private List<KeyValueWriter> writers;
-       private int numInputs;
-       private int numOutputs;
-
-       public UnionProcessor(ProcessorContext context) {
-               super(context);
-       }
-
-       @Override
-       public void initialize() throws Exception {
-               UserPayload payload = getContext().getUserPayload();
-               Configuration conf = 
TezUtils.createConfFromUserPayload(payload);
-
-               this.config = (TezTaskConfig) 
EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), 
getClass().getClassLoader());
-               config.setTaskName(getContext().getTaskVertexName());
-       }
-
-       @Override
-       public void handleEvents(List<Event> processorEvents) {
-
-       }
-
-       @Override
-       public void close() throws Exception {
-
-       }
-
-       @Override
-       public void run(Map<String, LogicalInput> inputs, Map<String, 
LogicalOutput> outputs) throws Exception {
-               this.inputs = inputs;
-               this.outputs = outputs;
-               this.numInputs = inputs.size();
-               this.numOutputs = outputs.size();
-
-               this.readers = new ArrayList<KeyValueReader>(numInputs);
-               if (this.inputs != null) {
-                       for (LogicalInput input: this.inputs.values()) {
-                               input.start();
-                               readers.add((KeyValueReader) input.getReader());
-                       }
-               }
-
-               this.writers = new ArrayList<KeyValueWriter>(numOutputs);
-               if (this.outputs != null) {
-                       for (LogicalOutput output : this.outputs.values()) {
-                               output.start();
-                               writers.add((KeyValueWriter) 
output.getWriter());
-                       }
-               }
-
-               Preconditions.checkArgument(writers.size() == 1);
-               KeyValueWriter writer = writers.get(0);
-
-               for (KeyValueReader reader: this.readers) {
-                       while (reader.next()) {
-                               Object key = reader.getCurrentKey();
-                               Object value = reader.getCurrentValue();
-                               writer.write(key, value);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
deleted file mode 100644
index ef59fd0..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-public class FlinkInput extends AbstractLogicalInput {
-
-       private static final Log LOG = LogFactory.getLog(FlinkInput.class);
-       
-       private InputSplit split;
-       private boolean splitIsCreated;
-       private final ReentrantLock rrLock = new ReentrantLock();
-       private final Condition rrInited = rrLock.newCondition();
-
-       public FlinkInput(InputContext inputContext, int numPhysicalInputs) {
-               super(inputContext, numPhysicalInputs);
-               getContext().requestInitialMemory(0l, null); // mandatory call
-               split = null;
-       }
-
-       @Override
-       public void handleEvents(List<Event> inputEvents) throws Exception {
-               
-               LOG.info("Received " + inputEvents.size() + " events (should be 
= 1)");
-               
-               Event event = inputEvents.iterator().next();
-               
-               Preconditions.checkArgument(event instanceof 
InputDataInformationEvent,
-                               getClass().getSimpleName()
-                                               + " can only handle a single 
event of type: "
-                                               + 
InputDataInformationEvent.class.getSimpleName());
-
-               initSplitFromEvent ((InputDataInformationEvent)event);
-       }
-
-       private void initSplitFromEvent (InputDataInformationEvent e) throws 
Exception {
-               rrLock.lock();
-
-               try {
-                       ByteString byteString = 
ByteString.copyFrom(e.getUserPayload());
-                       this.split =  (InputSplit) 
InstantiationUtil.deserializeObject(byteString.toByteArray(), 
getClass().getClassLoader());
-                       this.splitIsCreated = true;
-                       
-                       LOG.info ("Initializing input split " + 
split.getSplitNumber() + ": " + split.toString() + " from event (" + 
e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString());
-                       
-                       rrInited.signal();
-               }
-               catch (Exception ex) {
-                       throw new IOException(
-                                       "Interrupted waiting for InputSplit 
initialization");
-               }
-               finally {
-                       rrLock.unlock();
-               }
-       }
-
-       @Override
-       public List<Event> close() throws Exception {
-               return null;
-       }
-
-       @Override
-       public void start() throws Exception {
-       }
-
-       @Override
-       public Reader getReader() throws Exception {
-               throw new RuntimeException("FlinkInput does not contain a 
Reader. Should use getSplit instead");
-       }
-
-       @Override
-       public List<Event> initialize() throws Exception {
-               return null;
-       }
-
-       public InputSplit getSplit () throws Exception {
-
-               rrLock.lock();
-               try {
-                       if (!splitIsCreated) {
-                               checkAndAwaitSplitInitialization();
-                       }
-               }
-               finally {
-                       rrLock.unlock();
-               }
-               if (split == null) {
-                       LOG.info("Input split has not been created. This should 
not happen");
-                       throw new RuntimeException("Input split has not been 
created. This should not happen");
-               }
-               return split;
-       }
-
-       void checkAndAwaitSplitInitialization() throws IOException {
-               assert rrLock.getHoldCount() == 1;
-               rrLock.lock();
-               try {
-                       rrInited.await();
-               } catch (Exception e) {
-                       throw new IOException(
-                                       "Interrupted waiting for InputSplit 
initialization");
-               } finally {
-                       rrLock.unlock();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
deleted file mode 100644
index db1261c..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-public class FlinkInputSplitGenerator extends InputInitializer {
-
-       private static final Log LOG = 
LogFactory.getLog(FlinkInputSplitGenerator.class);
-
-       InputFormat format;
-
-       public FlinkInputSplitGenerator(InputInitializerContext 
initializerContext) {
-               super(initializerContext);
-       }
-
-       @Override
-       public List<Event> initialize() throws Exception {
-
-               Configuration tezConf = 
TezUtils.createConfFromUserPayload(this.getContext().getUserPayload());
-
-               TezTaskConfig taskConfig = (TezTaskConfig) 
EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG),
 getClass().getClassLoader());
-
-               this.format = taskConfig.getInputFormat();
-
-               int numTasks = this.getContext().getNumTasks();
-
-               LOG.info ("Creating splits for " + numTasks + " tasks for input 
format " + format);
-               
-               InputSplit[] splits = format.createInputSplits((numTasks > 0) ? 
numTasks : 1 );
-
-               LOG.info ("Created " + splits.length + " input splits" + " 
tasks for input format " + format);
-               
-               //LOG.info ("Created + " + splits.length + " input splits for 
input format " + format);
-
-               LOG.info ("Sending input split events");
-               LinkedList<Event> events = new LinkedList<Event>();
-               for (int i = 0; i < splits.length; i++) {
-                       byte [] bytes = 
InstantiationUtil.serializeObject(splits[i]);
-                       ByteBuffer buf = ByteBuffer.wrap(bytes);
-                       InputDataInformationEvent event = 
InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf);
-                       event.setTargetIndex(i % numTasks);
-                       events.add(event);
-                       LOG.info ("Added event of index " + i + ": " + event);
-               }
-               return events;
-       }
-
-       @Override
-       public void handleInputInitializerEvent(List<InputInitializerEvent> 
events) throws Exception {
-
-       }
-
-       @Override
-       public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
-               //super.onVertexStateUpdated(stateUpdate);
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
deleted file mode 100644
index 722f0a1..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-
-import java.io.IOException;
-
-public class TezReaderIterator<T> implements MutableObjectIterator<T>{
-
-       private KeyValueReader kvReader;
-
-       public TezReaderIterator(KeyValueReader kvReader) {
-               this.kvReader = kvReader;
-       }
-
-       @Override
-       public T next(T reuse) throws IOException {
-               if (kvReader.next()) {
-                       Object key = kvReader.getCurrentKey();
-                       Object value = kvReader.getCurrentValue();
-                       if (!(key instanceof IntWritable)) {
-                               throw new IllegalStateException("Wrong key 
type");
-                       }
-                       reuse = (T) value;
-                       return reuse;
-               }
-               else {
-                       return null;
-               }
-       }
-
-       @Override
-       public T next() throws IOException {
-               if (kvReader.next()) {
-                       Object key = kvReader.getCurrentKey();
-                       Object value = kvReader.getCurrentValue();
-                       if (!(key instanceof IntWritable)) {
-                               throw new IllegalStateException("Wrong key 
type");
-                       }
-                       return (T) value;
-               }
-               else {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
deleted file mode 100644
index 2358f29..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.Partitioner;
-
-public class SimplePartitioner implements Partitioner {
-
-       @Override
-       public int getPartition(Object key, Object value, int numPartitions) {
-               if (!(key instanceof IntWritable)) {
-                       throw new IllegalStateException("Partitioning key 
should be int");
-               }
-               IntWritable channel = (IntWritable) key;
-               return channel.get();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
deleted file mode 100644
index 7e5cd55..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import java.io.Serializable;
-
-public interface TezChannelSelector<T> extends Serializable {
-
-       /**
-        * Called to determine to which attached {@link 
org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given 
record shall be forwarded.
-        *
-        * @param record
-        *        the record to the determine the output channels for
-        * @param numberOfOutputChannels
-        *        the total number of output channels which are attached to 
respective output gate
-        * @return a (possibly empty) array of integer numbers which indicate 
the indices of the output channels through
-        *         which the record shall be forwarded
-        */
-       int[] selectChannels(T record, int numberOfOutputChannels);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
deleted file mode 100644
index b68e6c8..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.io.IOException;
-import java.util.List;
-
-public class TezOutputCollector<T> implements Collector<T> {
-
-       private List<KeyValueWriter> writers;
-
-       private List<TezChannelSelector<T>> outputEmitters;
-
-       private List<Integer> numberOfStreamsInOutputs;
-
-       private int numOutputs;
-
-       private TypeSerializer<T> serializer;
-
-       public TezOutputCollector(List<KeyValueWriter> writers, 
List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, 
List<Integer> numberOfStreamsInOutputs) {
-               this.writers = writers;
-               this.outputEmitters = outputEmitters;
-               this.numberOfStreamsInOutputs = numberOfStreamsInOutputs;
-               this.serializer = serializer;
-               this.numOutputs = writers.size();
-       }
-
-       @Override
-       public void collect(T record) {
-               for (int i = 0; i < numOutputs; i++) {
-                       KeyValueWriter writer = writers.get(i);
-                       TezChannelSelector<T> outputEmitter = 
outputEmitters.get(i);
-                       int numberOfStreamsInOutput = 
numberOfStreamsInOutputs.get(i);
-                       try {
-                               for (int channel : 
outputEmitter.selectChannels(record, numberOfStreamsInOutput)) {
-                                       IntWritable key = new 
IntWritable(channel);
-                                       writer.write(key, record);
-                               }
-                       }
-                       catch (IOException e) {
-                               throw new RuntimeException("Emitting the record 
caused an I/O exception: " + e.getMessage(), e);
-                       }
-               }
-       }
-
-       @Override
-       public void close() {
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
deleted file mode 100644
index 6dcee0b..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-public class TezOutputEmitter<T> implements TezChannelSelector<T> {
-
-       private final ShipStrategyType strategy;                // the shipping 
strategy used by this output emitter
-
-       private int[] channels;                                         // the 
reused array defining target channels
-
-       private int nextChannelToSendTo = 0;            // counter to go over 
channels round robin
-
-       private final TypeComparator<T> comparator;     // the comparator for 
hashing / sorting
-
-       // 
------------------------------------------------------------------------
-       // Constructors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new channel selector that distributes data round robin.
-        */
-       public TezOutputEmitter() {
-               this(ShipStrategyType.NONE);
-       }
-
-       /**
-        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...).
-        *
-        * @param strategy The distribution strategy to be used.
-        */
-       public TezOutputEmitter(ShipStrategyType strategy) {
-               this(strategy, null);
-       }
-
-       /**
-        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...)
-        * and uses the supplied comparator to hash / compare records for 
partitioning them deterministically.
-        *
-        * @param strategy The distribution strategy to be used.
-        * @param comparator The comparator used to hash / compare the records.
-        */
-       public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> 
comparator) {
-               this(strategy, comparator, null);
-       }
-
-       /**
-        * Creates a new channel selector that uses the given strategy 
(broadcasting, partitioning, ...)
-        * and uses the supplied comparator to hash / compare records for 
partitioning them deterministically.
-        *
-        * @param strategy The distribution strategy to be used.
-        * @param comparator The comparator used to hash / compare the records.
-        * @param distr The distribution pattern used in the case of a range 
partitioning.
-        */
-       public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> 
comparator, DataDistribution distr) {
-               if (strategy == null) {
-                       throw new NullPointerException();
-               }
-
-               this.strategy = strategy;
-               this.comparator = comparator;
-
-               switch (strategy) {
-                       case FORWARD:
-                       case PARTITION_HASH:
-                       case PARTITION_RANGE:
-                       case PARTITION_RANDOM:
-                       case PARTITION_FORCED_REBALANCE:
-                       case BROADCAST:
-                               break;
-                       default:
-                               throw new IllegalArgumentException("Invalid 
shipping strategy for OutputEmitter: " + strategy.name());
-               }
-
-               if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == 
null) {
-                       throw new NullPointerException("Data distribution must 
not be null when the ship strategy is range partitioning.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Channel Selection
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public final int[] selectChannels(T record, int numberOfChannels) {
-               switch (strategy) {
-                       case FORWARD:
-                       case PARTITION_RANDOM:
-                       case PARTITION_FORCED_REBALANCE:
-                               return robin(numberOfChannels);
-                       case PARTITION_HASH:
-                               return hashPartitionDefault(record, 
numberOfChannels);
-                       case PARTITION_RANGE:
-                               return rangePartition(record, numberOfChannels);
-                       case BROADCAST:
-                               return broadcast(numberOfChannels);
-                       default:
-                               throw new 
UnsupportedOperationException("Unsupported distribution strategy: " + 
strategy.name());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private final int[] robin(int numberOfChannels) {
-               if (this.channels == null || this.channels.length != 1) {
-                       this.channels = new int[1];
-               }
-
-               int nextChannel = nextChannelToSendTo + 1;
-               nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
-
-               this.nextChannelToSendTo = nextChannel;
-               this.channels[0] = nextChannel;
-               return this.channels;
-       }
-
-       private final int[] broadcast(int numberOfChannels) {
-               if (channels == null || channels.length != numberOfChannels) {
-                       channels = new int[numberOfChannels];
-                       for (int i = 0; i < numberOfChannels; i++) {
-                               channels[i] = i;
-                       }
-               }
-
-               return channels;
-       }
-
-       private final int[] hashPartitionDefault(T record, int 
numberOfChannels) {
-               if (channels == null || channels.length != 1) {
-                       channels = new int[1];
-               }
-
-               int hash = this.comparator.hash(record);
-
-               hash = murmurHash(hash);
-
-               if (hash >= 0) {
-                       this.channels[0] = hash % numberOfChannels;
-               }
-               else if (hash != Integer.MIN_VALUE) {
-                       this.channels[0] = -hash % numberOfChannels;
-               }
-               else {
-                       this.channels[0] = 0;
-               }
-
-               return this.channels;
-       }
-
-       private final int murmurHash(int k) {
-               k *= 0xcc9e2d51;
-               k = Integer.rotateLeft(k, 15);
-               k *= 0x1b873593;
-
-               k = Integer.rotateLeft(k, 13);
-               k *= 0xe6546b64;
-
-               k ^= 4;
-               k ^= k >>> 16;
-               k *= 0x85ebca6b;
-               k ^= k >>> 13;
-               k *= 0xc2b2ae35;
-               k ^= k >>> 16;
-
-               return k;
-       }
-
-       private final int[] rangePartition(T record, int numberOfChannels) {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
deleted file mode 100644
index 39d247c..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class DummyInvokable extends AbstractInvokable {
-
-       private ExecutionConfig executionConfig;
-
-       public DummyInvokable() {
-       }
-
-       public DummyInvokable(ExecutionConfig executionConfig) {
-               this.executionConfig = executionConfig;
-       }
-
-       public void setExecutionConfig(ExecutionConfig executionConfig) {
-               this.executionConfig = executionConfig;
-       }
-
-       @Override
-       public void registerInputOutput() {}
-
-
-       @Override
-       public void invoke() throws Exception {}
-
-       @Override
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
deleted file mode 100644
index 202cb24..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.commons.codec.binary.Base64;
-
-import java.io.IOException;
-
-public class EncodingUtils {
-
-       public static Object decodeObjectFromString(String encoded, ClassLoader 
cl) {
-
-               try {
-                       if (encoded == null) {
-                               return null;
-                       }
-                       byte[] bytes = Base64.decodeBase64(encoded);
-
-                       return InstantiationUtil.deserializeObject(bytes, cl);
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-                       throw new RuntimeException();
-               }
-               catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-                       throw new RuntimeException();
-               }
-       }
-
-       public static String encodeObjectToString(Object o) {
-
-               try {
-                       byte[] bytes = InstantiationUtil.serializeObject(o);
-
-                       String encoded = Base64.encodeBase64String(bytes);
-                       return encoded;
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       System.exit(-1);
-                       throw new RuntimeException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
deleted file mode 100644
index 07c5f97..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class FlinkSerialization<T> extends Configured implements 
Serialization<T>{
-
-       @Override
-       public boolean accept(Class<?> c) {
-               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
-               T instance = typeSerializer.createInstance();
-               return instance.getClass().isAssignableFrom(c);
-       }
-
-       @Override
-       public Serializer<T> getSerializer(Class<T> c) {
-               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
-               return new FlinkSerializer<T>(typeSerializer);
-       }
-
-       @Override
-       public Deserializer<T> getDeserializer(Class<T> c) {
-               TypeSerializer<T> typeSerializer = (TypeSerializer) 
EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"),
 getClass().getClassLoader());
-               return new FlinkDeserializer<T>(typeSerializer);
-       }
-
-       public static class FlinkSerializer<T> implements Serializer<T> {
-
-               private OutputStream dataOut;
-               private DataOutputViewOutputStreamWrapper dataOutputView;
-               private TypeSerializer<T> typeSerializer;
-
-               public FlinkSerializer(TypeSerializer<T> typeSerializer) {
-                       this.typeSerializer = typeSerializer;
-               }
-
-               @Override
-               public void open(OutputStream out) throws IOException {
-                       this.dataOut = out;
-                       this.dataOutputView = new 
DataOutputViewOutputStreamWrapper(out);
-               }
-
-               @Override
-               public void serialize(T t) throws IOException {
-                       typeSerializer.serialize(t, dataOutputView);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       this.dataOut.close();
-               }
-       }
-
-       public static class FlinkDeserializer<T> implements Deserializer<T> {
-
-               private InputStream dataIn;
-               private TypeSerializer<T> typeSerializer;
-               private DataInputViewInputStreamWrapper dataInputView;
-
-               public FlinkDeserializer(TypeSerializer<T> typeSerializer) {
-                       this.typeSerializer = typeSerializer;
-               }
-
-               @Override
-               public void open(InputStream in) throws IOException {
-                       this.dataIn = in;
-                       this.dataInputView = new 
DataInputViewInputStreamWrapper(in);
-               }
-
-               @Override
-               public T deserialize(T t) throws IOException {
-                       T reuse = t;
-                       if (reuse == null) {
-                               reuse = typeSerializer.createInstance();
-                       }
-                       return typeSerializer.deserialize(reuse, dataInputView);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       this.dataIn.close();
-               }
-       }
-
-       private static final class DataOutputViewOutputStreamWrapper implements 
DataOutputView {
-
-               private final DataOutputStream dos;
-
-               public DataOutputViewOutputStreamWrapper(OutputStream output) {
-                       this.dos = new DataOutputStream(output);
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       dos.write(b);
-               }
-
-               @Override
-               public void write(byte[] b) throws IOException {
-                       dos.write(b);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       dos.write(b, off, len);
-               }
-
-               @Override
-               public void writeBoolean(boolean v) throws IOException {
-                       dos.writeBoolean(v);
-               }
-
-               @Override
-               public void writeByte(int v) throws IOException {
-                       dos.writeByte(v);
-               }
-
-               @Override
-               public void writeShort(int v) throws IOException {
-                       dos.writeShort(v);
-               }
-
-               @Override
-               public void writeChar(int v) throws IOException {
-                       dos.writeChar(v);
-               }
-
-               @Override
-               public void writeInt(int v) throws IOException {
-                       dos.writeInt(v);
-               }
-
-               @Override
-               public void writeLong(long v) throws IOException {
-                       dos.writeLong(v);
-               }
-
-               @Override
-               public void writeFloat(float v) throws IOException {
-                       dos.writeFloat(v);
-               }
-
-               @Override
-               public void writeDouble(double v) throws IOException {
-                       dos.writeDouble(v);
-               }
-
-               @Override
-               public void writeBytes(String s) throws IOException {
-                       dos.writeBytes(s);
-               }
-
-               @Override
-               public void writeChars(String s) throws IOException {
-                       dos.writeChars(s);
-               }
-
-               @Override
-               public void writeUTF(String s) throws IOException {
-                       dos.writeUTF(s);
-               }
-
-               @Override
-               public void skipBytesToWrite(int num) throws IOException {
-                       for (int i = 0; i < num; i++) {
-                               dos.write(0);
-                       }
-               }
-
-               @Override
-               public void write(DataInputView inview, int num) throws 
IOException {
-                       for (int i = 0; i < num; i++) {
-                               dos.write(inview.readByte());
-                       }
-               }
-       }
-
-       private static final class DataInputViewInputStreamWrapper implements 
DataInputView {
-
-               private final DataInputStream dis;
-
-
-               public DataInputViewInputStreamWrapper(InputStream input) {
-                       this.dis = new DataInputStream(input);
-               }
-
-               @Override
-               public void readFully(byte[] b) throws IOException {
-                       dis.readFully(b);
-               }
-
-               @Override
-               public void readFully(byte[] b, int off, int len) throws 
IOException {
-                       dis.readFully(b, off, len);
-               }
-
-               @Override
-               public int skipBytes(int n) throws IOException {
-                       return dis.skipBytes(n);
-               }
-
-               @Override
-               public boolean readBoolean() throws IOException {
-                       return dis.readBoolean();
-               }
-
-               @Override
-               public byte readByte() throws IOException {
-                       return dis.readByte();
-               }
-
-               @Override
-               public int readUnsignedByte() throws IOException {
-                       return dis.readUnsignedByte();
-               }
-
-               @Override
-               public short readShort() throws IOException {
-                       return dis.readShort();
-               }
-
-               @Override
-               public int readUnsignedShort() throws IOException {
-                       return dis.readUnsignedShort();
-               }
-
-               @Override
-               public char readChar() throws IOException {
-                       return dis.readChar();
-               }
-
-               @Override
-               public int readInt() throws IOException {
-                       return dis.readInt();
-               }
-
-               @Override
-               public long readLong() throws IOException {
-                       return dis.readLong();
-               }
-
-               @Override
-               public float readFloat() throws IOException {
-                       return dis.readFloat();
-               }
-
-               @Override
-               public double readDouble() throws IOException {
-                       return dis.readDouble();
-               }
-
-               @Override
-               public String readLine() throws IOException {
-                       return dis.readLine();
-               }
-
-               @Override
-               public String readUTF() throws IOException {
-                       return dis.readUTF();
-               }
-
-               @Override
-               public void skipBytesToRead(int numBytes) throws IOException {
-                       while (numBytes > 0) {
-                               numBytes -= dis.skipBytes(numBytes);
-                       }
-               }
-
-               @Override
-               public int read(byte[] b, int off, int len) throws IOException {
-                       dis.readFully(b, off, len);
-                       return len;
-               }
-
-               @Override
-               public int read(byte[] b) throws IOException {
-                       return read(b, 0, b.length);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/resources/log4j.properties 
b/flink-contrib/flink-tez/src/main/resources/log4j.properties
deleted file mode 100644
index 0845c81..0000000
--- a/flink-contrib/flink-tez/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
deleted file mode 100644
index 9124faa..0000000
--- 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.tez.examples.ConnectedComponentsStep;
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/*
- * Note: This does not test whether the program computes one step of the
- * Weakly Connected Components program correctly. It only tests whether
- * the program assigns a wrong component to a vertex.
- */
-
-public class ConnectedComponentsStepITCase extends TezProgramTestBase {
-
-    private static final long SEED = 0xBADC0FFEEBEEFL;
-
-    private static final int NUM_VERTICES = 1000;
-
-    private static final int NUM_EDGES = 10000;
-
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-
-
-    @Override
-    protected void preSubmit() throws Exception {
-        verticesPath = createTempFile("vertices.txt", 
ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
-        edgesPath = createTempFile("edges.txt", 
ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
-        resultPath = getTempFilePath("results");
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, 
"100");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        for (BufferedReader reader : getResultReader(resultPath)) {
-            checkOddEvenResult(reader);
-        }
-    }
-
-    private static void checkOddEvenResult(BufferedReader result) throws 
IOException {
-        Pattern split = Pattern.compile(" ");
-        String line;
-        while ((line = result.readLine()) != null) {
-            String[] res = split.split(line);
-            Assert.assertEquals("Malformed result: Wrong number of tokens in 
line.", 2, res.length);
-            try {
-                int vertex = Integer.parseInt(res[0]);
-                int component = Integer.parseInt(res[1]);
-                Assert.assertTrue(((vertex % 2) == (component % 2)));
-            } catch (NumberFormatException e) {
-                Assert.fail("Malformed result.");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
deleted file mode 100644
index 9a203fe..0000000
--- 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.tez.examples.PageRankBasicStep;
-
-public class PageRankBasicStepITCase extends TezProgramTestBase {
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-    private String expectedResult;
-
-    public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" +
-            "2 0.25666666666666665\n" +
-            "3 0.1716666666666667\n" +
-            "4 0.1716666666666667\n" +
-            "5 0.2";
-
-    @Override
-    protected void preSubmit() throws Exception {
-        resultPath = getTempDirPath("result");
-        verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
-        edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        PageRankBasicStep.main(new String[]{verticesPath, edgesPath, 
resultPath, PageRankData.NUM_VERTICES+"", "-1"});
-        expectedResult = RANKS_AFTER_1_ITERATION;
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
deleted file mode 100644
index eda9d1a..0000000
--- 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.tez.client.LocalTezEnvironment;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public abstract class TezProgramTestBase extends AbstractTestBase {
-
-    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
-
-    private JobExecutionResult latestExecutionResult;
-
-    private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
-
-
-    public TezProgramTestBase() {
-        this(new Configuration());
-    }
-
-    public TezProgramTestBase(Configuration config) {
-        super (config);
-    }
-
-
-    public void setParallelism(int degreeOfParallelism) {
-        this.degreeOfParallelism = degreeOfParallelism;
-    }
-
-    public JobExecutionResult getLatestExecutionResult() {
-        return this.latestExecutionResult;
-    }
-
-
-    protected abstract void testProgram() throws Exception;
-
-    protected void preSubmit() throws Exception {}
-
-    protected void postSubmit() throws Exception {}
-
-    // 
--------------------------------------------------------------------------------------------
-    //  Test entry point
-    // 
--------------------------------------------------------------------------------------------
-
-    // Ignored due to deadlocks in Tez 0.6.1 
(https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt)
-    // TODO Reactivate with future Tez versions
-    @Ignore
-    @Test
-    public void testJob() throws Exception {
-        // pre-submit
-        try {
-            preSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-        }
-
-        // prepare the test environment
-        LocalTezEnvironment env = LocalTezEnvironment.create();
-        env.setParallelism(degreeOfParallelism);
-        env.setAsContext();
-
-        // call the test program
-        try {
-            testProgram();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Error while calling the test program: " + 
e.getMessage());
-        }
-
-        // post-submit
-        try {
-            postSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Post-submit work caused an error: " + e.getMessage());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
deleted file mode 100644
index 35aa54a..0000000
--- 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-
-import org.apache.flink.examples.java.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-
-public class WebLogAnalysisITCase extends TezProgramTestBase {
-
-    private String docsPath;
-    private String ranksPath;
-    private String visitsPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
-        ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
-        visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, 
resultPath);
-    }
-    @Override
-    protected void testProgram() throws Exception {
-        WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, 
resultPath});
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
 
b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
deleted file mode 100644
index d73aa8b..0000000
--- 
a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class WordCountITCase extends TezProgramTestBase {
-
-    protected String textPath;
-    protected String resultPath;
-
-    public WordCountITCase(){
-    }
-
-    @Override
-    protected void preSubmit() throws Exception {
-        textPath = createTempFile("text.txt", WordCountData.TEXT);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        WordCount.main(new String[]{textPath, resultPath});
-    }
-}

Reply via email to