[FLINK-3169] Remove Key class

This closes #1667.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe117afd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe117afd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe117afd

Branch: refs/heads/master
Commit: fe117afd31e823006f680d145c8f75033816ed17
Parents: d7aa989
Author: zentol <s.mo...@web.de>
Authored: Tue Feb 16 12:23:41 2016 +0100
Committer: zentol <s.mo...@web.de>
Committed: Tue Mar 8 13:49:01 2016 +0100

----------------------------------------------------------------------
 .../distributions/SimpleDistribution.java       | 182 --------------
 .../SimpleIntegerDistribution.java              | 160 ------------
 .../UniformDoubleDistribution.java              |  65 -----
 .../UniformIntegerDistribution.java             |  66 -----
 .../flink/api/common/operators/Ordering.java    |  11 +-
 .../org/apache/flink/types/BooleanValue.java    |   2 -
 .../java/org/apache/flink/types/ByteValue.java  |   2 -
 .../java/org/apache/flink/types/CharValue.java  |   2 -
 .../org/apache/flink/types/DoubleValue.java     |   4 +-
 .../java/org/apache/flink/types/FloatValue.java |   4 +-
 .../java/org/apache/flink/types/IntValue.java   |   2 -
 .../main/java/org/apache/flink/types/Key.java   |  59 -----
 .../java/org/apache/flink/types/LongValue.java  |   2 -
 .../org/apache/flink/types/NormalizableKey.java |   4 +-
 .../java/org/apache/flink/types/NullValue.java  |   2 -
 .../main/java/org/apache/flink/types/Pair.java  | 177 --------------
 .../java/org/apache/flink/types/ShortValue.java |   2 -
 .../org/apache/flink/types/StringValue.java     |   3 +-
 .../SimpleDataDistributionTest.java             | 228 -----------------
 .../flink/types/CollectionsDataTypeTest.java    | 243 -------------------
 .../flink/optimizer/postpass/PostPassUtils.java |  47 ----
 .../optimizer/postpass/SparseKeySchema.java     |  86 -------
 .../dataproperties/MockDistribution.java        |   5 +-
 .../runtime/operators/CachedMatchTaskTest.java  |   6 +-
 .../operators/CoGroupTaskExternalITCase.java    |   6 +-
 .../runtime/operators/CoGroupTaskTest.java      |   6 +-
 .../operators/CombineTaskExternalITCase.java    |   4 +-
 .../operators/JoinTaskExternalITCase.java       |   6 +-
 .../flink/runtime/operators/JoinTaskTest.java   |   6 +-
 .../operators/ReduceTaskExternalITCase.java     |   5 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   4 +-
 .../runtime/operators/hash/HashTableITCase.java |   4 +-
 .../testutils/recordutils/RecordComparator.java |  46 ++--
 .../recordutils/RecordComparatorFactory.java    |  16 +-
 .../recordutils/RecordPairComparator.java       |  18 +-
 .../RecordPairComparatorFactory.java            |  12 +-
 .../org/apache/flink/test/util/CoordVector.java |   4 +-
 37 files changed, 84 insertions(+), 1417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
deleted file mode 100644
index bf759d9..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.InstantiationUtil;
-
-@PublicEvolving
-public class SimpleDistribution implements DataDistribution {
-       
-       private static final long serialVersionUID = 1L;
-
-       protected Key<?>[][] boundaries; 
-       
-       protected int dim;
-       
-       
-       public SimpleDistribution() {
-               boundaries = new Key[0][];
-       }
-       
-       public SimpleDistribution(Key<?>[] bucketBoundaries) {
-               if (bucketBoundaries == null) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be null.");
-               }
-               if (bucketBoundaries.length == 0) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be empty.");
-               }
-
-               // dimensionality is one in this case
-               dim = 1;
-               
-               @SuppressWarnings("unchecked")
-               Class<? extends Key<?>> clazz = (Class<? extends Key<?>>) 
bucketBoundaries[0].getClass();
-               
-               // make the array 2-dimensional
-               boundaries = new Key[bucketBoundaries.length][];
-               for (int i = 0; i < bucketBoundaries.length; i++) {
-                       if (bucketBoundaries[i].getClass() != clazz) {
-                               throw new IllegalArgumentException("The bucket 
boundaries are of different class types.");
-                       }
-                       
-                       boundaries[i] = new Key[] { bucketBoundaries[i] };
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       public SimpleDistribution(Key<?>[][] bucketBoundaries) {
-               if (bucketBoundaries == null) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be null.");
-               }
-               if (bucketBoundaries.length == 0) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be empty.");
-               }
-
-               // dimensionality is one in this case
-               dim = bucketBoundaries[0].length;
-               
-               Class<? extends Key<?>>[] types = new Class[dim];
-               for (int i = 0; i < dim; i++) {
-                       types[i] = (Class<? extends Key<?>>) 
bucketBoundaries[0][i].getClass();
-               }
-               
-               // check the array
-               for (int i = 1; i < bucketBoundaries.length; i++) {
-                       if (bucketBoundaries[i].length != dim) {
-                               throw new IllegalArgumentException("All bucket 
boundaries must have the same dimensionality.");
-                       }
-                       for (int d = 0; d < dim; d++) {
-                               if (types[d] != 
bucketBoundaries[i][d].getClass()) {
-                                       throw new IllegalArgumentException("The 
bucket boundaries are of different class types.");
-                               }
-                       }
-               }
-               
-               boundaries = bucketBoundaries;
-       }
-       
-       @Override
-       public int getNumberOfFields() {
-               return this.dim;
-       }
-
-       @Override
-       public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-               // check validity of arguments
-               if(bucketNum < 0) {
-                       throw new IllegalArgumentException("Requested bucket 
must be greater than or equal to 0.");
-               } else if(bucketNum >= (totalNumBuckets - 1)) {
-                       throw new IllegalArgumentException("Request bucket must 
be smaller than the total number of buckets minus 1.");
-               }
-               if(totalNumBuckets < 1) {
-                       throw new IllegalArgumentException("Total number of 
bucket must be larger than 0.");
-               }
-               
-               final int maxNumBuckets = boundaries.length + 1;
-               
-               // check if max number of buckets is equal to or an even 
multiple of the requested number of buckets
-               if((maxNumBuckets % totalNumBuckets) == 0) {
-                       // easy case, just use each n-th boundary
-                       final int n = maxNumBuckets / totalNumBuckets;
-                       final int bucketId = bucketNum * n + (n -  1); 
-                       
-                       return boundaries[bucketId];
-               } else {
-                       throw new IllegalArgumentException("Interpolation of 
bucket boundaries currently not supported. " +
-                                       "Please use an even divider of the 
maximum possible buckets (here: "+maxNumBuckets+") as totalBuckets.");
-                       // TODO: might be relaxed if much more boundary records 
are available than requested
-               }
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeInt(this.dim);
-               out.writeInt(boundaries.length);
-               
-               // write types
-               for (int i = 0; i < dim; i++) {
-                       out.writeUTF(boundaries[0][i].getClass().getName());
-               }
-               
-               for (int i = 0; i < boundaries.length; i++) {
-                       for (int d = 0; d < dim; d++) {
-                               boundaries[i][d].write(out);
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void read(DataInputView in) throws IOException {
-               this.dim = in.readInt();
-               final int len = in.readInt();
-               
-               boundaries = new Key[len][];
-               
-               // read types
-               Class<? extends Key<?>>[] types = new Class[dim];
-               for (int i = 0; i < dim; i++) {
-                       String className = in.readUTF();
-                       try {
-                               types[i] = (Class<? extends Key<?>>) 
Class.forName(className, true, 
getClass().getClassLoader()).asSubclass(Key.class);
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException("Could not load type 
class '" + className + "'.");
-                       } catch (Throwable t) {
-                               throw new IOException("Error loading type class 
'" + className + "'.", t);
-                       }
-               }
-               
-               for (int i = 0; i < len; i++) {
-                       Key<?>[] bucket = new Key[dim]; 
-                       for (int d = 0; d < dim; d++) {
-                               Key<?> val = 
InstantiationUtil.instantiate(types[d], Key.class);
-                               val.read(in);
-                               bucket[d] = val;
-                       }
-                       
-                       boundaries[i] = bucket;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
deleted file mode 100644
index cd1a8c5..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-@PublicEvolving
-public class SimpleIntegerDistribution extends SimpleDistribution {
-       
-       private static final long serialVersionUID = 1L;
-       
-       
-       public SimpleIntegerDistribution() {
-               boundaries = new IntValue[0][];
-       }
-       
-       public SimpleIntegerDistribution(int[] bucketBoundaries) {
-               if (bucketBoundaries == null) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be null.");
-               }
-               if (bucketBoundaries.length == 0) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be empty.");
-               }
-
-               // dimensionality is one in this case
-               dim = 1;
-               
-               // make the array 2-dimensional
-               boundaries = packIntegers(bucketBoundaries);
-       }
-       
-       public SimpleIntegerDistribution(IntValue[] bucketBoundaries) {
-               if (bucketBoundaries == null) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be null.");
-               }
-               if (bucketBoundaries.length == 0) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be empty.");
-               }
-
-               // dimensionality is one in this case
-               dim = 1;
-               
-               // make the array 2-dimensional
-               boundaries = new IntValue[bucketBoundaries.length][];
-               for (int i = 0; i < bucketBoundaries.length; i++) {
-                       boundaries[i] = new IntValue[] { bucketBoundaries[i] };
-               }
-       }
-       
-       public SimpleIntegerDistribution(IntValue[][] bucketBoundaries) {
-               if (bucketBoundaries == null) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be null.");
-               }
-               if (bucketBoundaries.length == 0) {
-                       throw new IllegalArgumentException("Bucket boundaries 
must not be empty.");
-               }
-
-               // dimensionality is one in this case
-               dim = bucketBoundaries[0].length;
-               
-               // check the array
-               for (int i = 1; i < bucketBoundaries.length; i++) {
-                       if (bucketBoundaries[i].length != dim) {
-                               throw new IllegalArgumentException("All bucket 
boundaries must have the same dimensionality.");
-                       }
-               }
-       }
-       
-       @Override
-       public int getNumberOfFields() {
-               return this.dim;
-       }
-
-       @Override
-       public IntValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) 
{
-               // check validity of arguments
-               if(bucketNum < 0) {
-                       throw new IllegalArgumentException("Requested bucket 
must be greater than or equal to 0.");
-               } else if(bucketNum >= (totalNumBuckets - 1)) {
-                       throw new IllegalArgumentException("Request bucket must 
be smaller than the total number of buckets minus 1.");
-               }
-               if(totalNumBuckets < 1) {
-                       throw new IllegalArgumentException("Total number of 
bucket must be larger than 0.");
-               }
-               
-               final int maxNumBuckets = boundaries.length + 1;
-               
-               // check if max number of buckets is equal to or an even 
multiple of the requested number of buckets
-               if((maxNumBuckets % totalNumBuckets) == 0) {
-                       // easy case, just use each n-th boundary
-                       final int n = maxNumBuckets / totalNumBuckets;
-                       final int bucketId = bucketNum * n + (n -  1); 
-                       
-                       return (IntValue[]) boundaries[bucketId];
-               } else {
-                       throw new IllegalArgumentException("Interpolation of 
bucket boundaries currently not supported. " +
-                                       "Please use an even divider of the 
maximum possible buckets (here: "+maxNumBuckets+") as totalBuckets.");
-                       // TODO: might be relaxed if much more boundary records 
are available than requested
-               }
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeInt(this.dim);
-               out.writeInt(boundaries.length);
-               
-               for (int i = 0; i < boundaries.length; i++) {
-                       for (int d = 0; d < dim; d++) {
-                               out.writeInt(((IntValue) 
boundaries[i][d]).getValue());
-                       }
-               }
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               this.dim = in.readInt();
-               final int len = in.readInt();
-               
-               boundaries = new IntValue[len][];
-               
-               for (int i = 0; i < len; i++) {
-                       IntValue[] bucket = new IntValue[dim]; 
-                       for (int d = 0; d < dim; d++) {
-                               bucket[d] = new IntValue(in.readInt());
-                       }
-                       
-                       boundaries[i] = bucket;
-               }
-       }
-       
-       private static IntValue[][] packIntegers(int[] values) {
-               IntValue[][] packed = new IntValue[values.length][];
-               for (int i = 0; i < values.length; i++) {
-                       packed[i] = new IntValue[] { new IntValue(values[i]) };
-               }
-               
-               return packed;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
deleted file mode 100644
index df1f095..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.DoubleValue;
-
-@PublicEvolving
-public class UniformDoubleDistribution implements DataDistribution {
-
-       private static final long serialVersionUID = 1L;
-       
-       private double min, max; 
-
-       
-       public UniformDoubleDistribution() {}
-       
-       public UniformDoubleDistribution(double min, double max) {
-               this.min = min;
-               this.max = max;
-       }
-
-       @Override
-       public DoubleValue[] getBucketBoundary(int bucketNum, int 
totalNumBuckets) {
-               double bucketSize = (max - min) / totalNumBuckets;
-               return new DoubleValue[] {new DoubleValue(min + (bucketNum+1) * 
bucketSize) };
-       }
-
-       @Override
-       public int getNumberOfFields() {
-               return 1;
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeDouble(min);
-               out.writeDouble(max);
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               min = in.readDouble();
-               max = in.readDouble();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
deleted file mode 100644
index 504f65b..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.distributions;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-@PublicEvolving
-public class UniformIntegerDistribution implements DataDistribution {
-
-       private static final long serialVersionUID = 1L;
-       
-       private int min, max; 
-
-       
-       public UniformIntegerDistribution() {}
-       
-       public UniformIntegerDistribution(int min, int max) {
-               this.min = min;
-               this.max = max;
-       }
-
-       @Override
-       public IntValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) 
{
-               long diff = ((long) max) - ((long) min) + 1;
-               double bucketSize = diff / ((double) totalNumBuckets);
-               return new IntValue[] {new IntValue(min + (int) ((bucketNum+1) 
* bucketSize)) };
-       }
-
-       @Override
-       public int getNumberOfFields() {
-               return 1;
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeInt(min);
-               out.writeInt(max);
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               min = in.readInt();
-               max = in.readInt();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index 23928b3..7332698 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.types.Key;
 
 /**
  * This class represents an ordering on a set of fields. It specifies the 
fields and order direction
@@ -34,7 +33,7 @@ public class Ordering implements Cloneable {
        
        protected FieldList indexes = new FieldList();
        
-       protected final ArrayList<Class<? extends Key<?>>> types = new 
ArrayList<Class<? extends Key<?>>>();
+       protected final ArrayList<Class<? extends Comparable<?>>> types = new 
ArrayList<Class<? extends Comparable<?>>>();
        
        protected final ArrayList<Order> orders = new ArrayList<Order>();
 
@@ -50,7 +49,7 @@ public class Ordering implements Cloneable {
         * @param type
         * @param order
         */
-       public Ordering(int index, Class<? extends Key<?>> type, Order order) {
+       public Ordering(int index, Class<? extends Comparable<?>> type, Order 
order) {
                appendOrdering(index, type, order);
        }
        
@@ -63,7 +62,7 @@ public class Ordering implements Cloneable {
         * 
         * @return This ordering with an additional appended order requirement.
         */
-       public Ordering appendOrdering(Integer index, Class<? extends Key<?>> 
type, Order order) {
+       public Ordering appendOrdering(Integer index, Class<? extends 
Comparable<?>> type, Order order) {
                if (index.intValue() < 0) {
                        throw new IllegalArgumentException("The key index must 
not be negative.");
                }
@@ -97,7 +96,7 @@ public class Ordering implements Cloneable {
                return this.indexes.get(index);
        }
        
-       public Class<? extends Key<?>> getType(int index) {
+       public Class<? extends Comparable<?>> getType(int index) {
                if (index < 0 || index >= this.types.size()) {
                        throw new 
IndexOutOfBoundsException(String.valueOf(index));
                }
@@ -114,7 +113,7 @@ public class Ordering implements Cloneable {
        // 
--------------------------------------------------------------------------------------------
        
        @SuppressWarnings("unchecked")
-       public Class<? extends Key<?>>[] getTypes() {
+       public Class<? extends Comparable<?>>[] getTypes() {
                return this.types.toArray(new Class[this.types.size()]);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java 
b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
index 10ca069..4bc387b 100644
--- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable boolean type, representing the primitive
  * type {@code boolean}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class BooleanValue implements NormalizableKey<BooleanValue>, 
ResettableValue<BooleanValue>, CopyableValue<BooleanValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java 
b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
index a1a9e32..cb1c669 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable byte type, representing the primitive
  * type {@code byte} (signed 8 bit integer).
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class ByteValue implements NormalizableKey<ByteValue>, 
ResettableValue<ByteValue>, CopyableValue<ByteValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/CharValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java 
b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
index 8b2ab29..f800832 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable character type, representing the primitive
  * type {@code char}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class CharValue implements NormalizableKey<CharValue>, 
ResettableValue<CharValue>, CopyableValue<CharValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java 
b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
index 80fb77b..7dba25b 100644
--- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
@@ -28,11 +28,9 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * Boxed serializable and comparable double precision floating point type, 
representing the primitive
  * type {@code double}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
-public class DoubleValue implements Key<DoubleValue>, 
ResettableValue<DoubleValue>, CopyableValue<DoubleValue> {
+public class DoubleValue implements Comparable<DoubleValue>, 
ResettableValue<DoubleValue>, CopyableValue<DoubleValue> {
        private static final long serialVersionUID = 1L;
 
        private double value;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java 
b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
index 371ce52..e3e49ce 100644
--- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
@@ -28,11 +28,9 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * Boxed serializable and comparable single precision floating point type, 
representing the primitive
  * type {@code float}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
-public class FloatValue implements Key<FloatValue>, 
ResettableValue<FloatValue>, CopyableValue<FloatValue> {
+public class FloatValue implements Comparable<FloatValue>, 
ResettableValue<FloatValue>, CopyableValue<FloatValue> {
        private static final long serialVersionUID = 1L;
 
        private float value;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/IntValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java 
b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
index 0f63117..347fd1d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable integer type, representing the primitive
  * type {@code int}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class IntValue implements NormalizableKey<IntValue>, 
ResettableValue<IntValue>, CopyableValue<IntValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/Key.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Key.java 
b/flink-core/src/main/java/org/apache/flink/types/Key.java
deleted file mode 100644
index c1e0626..0000000
--- a/flink-core/src/main/java/org/apache/flink/types/Key.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * This interface has to be implemented by all data types that act as key. 
Keys are used to establish
- * relationships between values. A key must always be {@link 
java.lang.Comparable} to other keys of
- * the same type. In addition, keys must implement a correct {@link 
java.lang.Object#hashCode()} method
- * and {@link java.lang.Object#equals(Object)} method to ensure that grouping 
on keys works properly.
- * <p>
- * This interface extends {@link org.apache.flink.types.Value} and requires to 
implement
- * the serialization of its value.
- * 
- * @see org.apache.flink.types.Value
- * @see org.apache.flink.core.io.IOReadableWritable
- * @see java.lang.Comparable
- * 
- * @deprecated The Key type is a relict of a deprecated and removed API and 
will be removed
- *             in future versions as well.
- */
-@Deprecated
-@PublicEvolving
-public interface Key<T> extends Value, Comparable<T> {
-       
-       /**
-        * All keys must override the hash-code function to generate proper 
deterministic hash codes,
-        * based on their contents.
-        * 
-        * @return The hash code of the key
-        */
-       public int hashCode();
-       
-       /**
-        * Compares the object on equality with another object.
-        * 
-        * @param other The other object to compare against.
-        * 
-        * @return True, iff this object is identical to the other object, 
false otherwise.
-        */
-       public boolean equals(Object other);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/LongValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java 
b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
index ef93864..dfb7845 100644
--- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable long integer type, representing the 
primitive
  * type {@code long}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class LongValue implements NormalizableKey<LongValue>, 
ResettableValue<LongValue>, CopyableValue<LongValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java 
b/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
index 6c5e204..e040599 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java
@@ -36,8 +36,8 @@ import org.apache.flink.core.memory.MemorySegment;
  * key length.
  */
 @Public
-public interface NormalizableKey<T> extends Key<T> {
-       
+public interface NormalizableKey<T> extends Comparable<T> {
+
        /**
         * Gets the maximal length of normalized keys that the data type would 
produce to determine
         * the order of instances solely by the normalized key. A value of 
{@link java.lang.Integer}.MAX_VALUE

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/NullValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java 
b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
index 9a3885d..e188929 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
@@ -28,8 +28,6 @@ import org.apache.flink.core.memory.MemorySegment;
 
 /**
  * Null base type for programs that implements the Key interface.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public final class NullValue implements NormalizableKey<NullValue>, 
CopyableValue<NullValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/Pair.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Pair.java 
b/flink-core/src/main/java/org/apache/flink/types/Pair.java
deleted file mode 100644
index 9bf2387..0000000
--- a/flink-core/src/main/java/org/apache/flink/types/Pair.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.types;
-
-import java.io.IOException;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.ReflectionUtil;
-
-/**
- * Generic pair base type. 
- * 
- * @see org.apache.flink.types.Key
- * 
- * @param <U> Type of the pair's first element.
- * @param <V> Type of the pair's second element.
- */
-@PublicEvolving
-public abstract class Pair<U extends Key<U>, V extends Key<V>> implements 
Key<Pair<U, V>> {
-       private static final long serialVersionUID = 1L;
-       
-       // class of the first pair element
-       private final Class<U> firstClass;
-       // class of the second pair element
-       private final Class<V> secondClass;
-
-       // the first pair element
-       private U first;
-       // the second pair element
-       private V second;
-
-       /**
-        * Initializes both encapsulated pair elements with empty objects.
-        */
-       public Pair() {
-               this.firstClass = ReflectionUtil.<U> 
getTemplateType1(this.getClass());
-               this.secondClass = ReflectionUtil.<V> 
getTemplateType2(this.getClass());
-
-               try {
-                       this.first = this.firstClass.newInstance();
-                       this.second = this.secondClass.newInstance();
-               } catch (final InstantiationException e) {
-                       throw new RuntimeException(e);
-               } catch (final IllegalAccessException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       /**
-        * Initializes the encapsulated pair elements with the provided values.
-        * 
-        * @param first Initial value of the first encapsulated pair element.
-        * @param second Initial value of the second encapsulated pair element.
-        */
-       public Pair(U first, V second) {
-               this.firstClass = ReflectionUtil.<U> 
getTemplateType1(this.getClass());
-               this.secondClass = ReflectionUtil.<V> 
getTemplateType1(this.getClass());
-
-               this.first = first;
-               this.second = second;
-       }
-
-       /**
-        * Returns the first encapsulated pair element.
-        * 
-        * @return The first encapsulated pair element.
-        */
-       public U getFirst() {
-               return this.first;
-       }
-
-       /**
-        * Sets the first encapsulated pair element to the specified value.
-        * 
-        * @param first
-        *        The new value of the first encapsulated pair element.
-        */
-       public void setFirst(final U first) {
-               if (first == null) {
-                       throw new NullPointerException("first must not be 
null");
-               }
-
-               this.first = first;
-       }
-
-       /**
-        * Returns the second encapsulated pair element.
-        * 
-        * @return The second encapsulated pair element.
-        */
-       public V getSecond() {
-               return this.second;
-       }
-
-       /**
-        * Sets the second encapsulated pair element to the specified value.
-        * 
-        * @param second
-        *        The new value of the second encapsulated pair element.
-        */
-       public void setSecond(final V second) {
-               if (second == null) {
-                       throw new NullPointerException("second must not be 
null");
-               }
-
-               this.second = second;
-       }
-
-       @Override
-       public String toString() {
-               return "<" + this.first.toString() + "|" + 
this.second.toString() + ">";
-       }
-
-       @Override
-       public void read(final DataInputView in) throws IOException {
-               this.first.read(in);
-               this.second.read(in);
-       }
-
-       @Override
-       public void write(final DataOutputView out) throws IOException {
-               this.first.write(out);
-               this.second.write(out);
-       }
-
-       @Override
-       public int compareTo(Pair<U, V> o) {
-               int result = this.first.compareTo(o.first);
-               if (result == 0) {
-                       result = this.second.compareTo(o.second);
-               }
-               return result;
-       }
-
-       @Override
-       public int hashCode() {
-               final int prime = 31;
-               int result = 1;
-               result = prime * result + this.first.hashCode();
-               result = prime * result + this.second.hashCode();
-               return result;
-       }
-
-       @Override
-       public boolean equals(final Object obj) {
-               if (this == obj) {
-                       return true;
-               }
-               if (obj == null) {
-                       return false;
-               }
-               if (this.getClass() != obj.getClass()) {
-                       return false;
-               }
-               final Pair<?, ?> other = (Pair<?, ?>) obj;
-               return this.first.equals(other.first) && 
this.second.equals(other.second);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java 
b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
index b91aaac..970e7e4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * Boxed serializable and comparable short integer type, representing the 
primitive
  * type {@code short}.
- * 
- * @see org.apache.flink.types.Key
  */
 @Public
 public class ShortValue implements NormalizableKey<ShortValue>, 
ResettableValue<ShortValue>, CopyableValue<ShortValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java 
b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index 0f9105c..e20083e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -37,8 +37,7 @@ import com.google.common.base.Preconditions;
  * The mutability allows to reuse the object inside the user code, also across 
invocations. Reusing a StringValue object
  * helps to increase the performance, as string objects are rather 
heavy-weight objects and incur a lot of garbage
  * collection overhead, if created and destroyed in masses.
- * 
- * @see org.apache.flink.types.Key
+ *
  * @see org.apache.flink.types.NormalizableKey
  * @see java.lang.String
  * @see java.lang.CharSequence

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
deleted file mode 100644
index a58e6ab..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.distributions;
-
-import org.apache.flink.api.common.distributions.SimpleDistribution;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.StringValue;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class SimpleDataDistributionTest {
-
-       @Test
-       public void testConstructorSingleKey() {
-
-               // check correct data distribution
-               try {
-                       SimpleDistribution dd = new SimpleDistribution(new 
Key<?>[] {new IntValue(1), new IntValue(2), new IntValue(3)});
-                       Assert.assertEquals(1, dd.getNumberOfFields());
-               }
-               catch (Throwable t) {
-                       Assert.fail();
-               }
-               
-               // check incorrect key types
-               try {
-                       new SimpleDistribution(new Key<?>[] {new IntValue(1), 
new StringValue("ABC"), new IntValue(3)});
-                       Assert.fail("Data distribution accepts inconsistent key 
types");
-               } catch(IllegalArgumentException iae) {
-                       // do nothing
-               }
-               
-               // check inconsistent number of keys
-               try {
-                       new SimpleDistribution(new Key<?>[][] {{new 
IntValue(1)}, {new IntValue(2), new IntValue(2)}, {new IntValue(3)}});
-                       Assert.fail("Data distribution accepts inconsistent 
many keys");
-               } catch(IllegalArgumentException iae) {
-                       // do nothing
-               }
-       }
-       
-       @Test 
-       public void testConstructorMultiKey() {
-               
-               // check correct data distribution
-               SimpleDistribution dd = new SimpleDistribution(
-                               new Key<?>[][] {{new IntValue(1), new 
StringValue("A"), new IntValue(1)}, 
-                                                       {new IntValue(2), new 
StringValue("A"), new IntValue(1)}, 
-                                                       {new IntValue(3), new 
StringValue("A"), new IntValue(1)}});
-               Assert.assertEquals(3, dd.getNumberOfFields());
-               
-               // check inconsistent key types
-               try {
-                       new SimpleDistribution( 
-                                       new Key<?>[][] {{new IntValue(1), new 
StringValue("A"), new DoubleValue(1.3d)}, 
-                                                               {new 
IntValue(2), new StringValue("B"), new IntValue(1)}});
-                       Assert.fail("Data distribution accepts incorrect key 
types");
-               } catch(IllegalArgumentException iae) {
-                       // do nothing
-               }
-               
-               // check inconsistent number of keys
-               try {
-                       new SimpleDistribution(
-                                       new Key<?>[][] {{new IntValue(1), new 
IntValue(2)}, 
-                                                               {new 
IntValue(2), new IntValue(2)}, 
-                                                               {new 
IntValue(3)}});
-                       Assert.fail("Data distribution accepts bucket 
boundaries with inconsistent many keys");
-               } catch(IllegalArgumentException iae) {
-                       // do nothing
-               }
-               
-       }
-       
-       @Test
-       public void testWriteRead() {
-               
-               SimpleDistribution ddWrite = new SimpleDistribution(
-                               new Key<?>[][] {{new IntValue(1), new 
StringValue("A"), new IntValue(1)}, 
-                                                       {new IntValue(2), new 
StringValue("A"), new IntValue(1)}, 
-                                                       {new IntValue(2), new 
StringValue("B"), new IntValue(4)},
-                                                       {new IntValue(2), new 
StringValue("B"), new IntValue(3)},
-                                                       {new IntValue(2), new 
StringValue("B"), new IntValue(2)}});
-               Assert.assertEquals(3, ddWrite.getNumberOfFields());
-               
-               final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               
-               try {
-                       ddWrite.write(new DataOutputViewStreamWrapper(baos));
-               } catch (IOException e) {
-                       Assert.fail("Error serializing the DataDistribution: " 
+ e.getMessage());
-               }
-
-               byte[] seralizedDD = baos.toByteArray();
-               
-               final ByteArrayInputStream bais = new 
ByteArrayInputStream(seralizedDD);
-               
-               SimpleDistribution ddRead = new SimpleDistribution();
-               
-               try {
-                       ddRead.read(new DataInputViewStreamWrapper(bais));
-               } catch (Exception ex) {
-                       Assert.fail("The deserialization of the encoded data 
distribution caused an error");
-               }
-               
-               Assert.assertEquals(3, ddRead.getNumberOfFields());
-               
-               // compare written and read distributions
-               for(int i=0;i<6;i++) {
-                       Key<?>[] recW = ddWrite.getBucketBoundary(0, 6);
-                       Key<?>[] recR = ddWrite.getBucketBoundary(0, 6);
-                       
-                       Assert.assertEquals(recW[0], recR[0]);
-                       Assert.assertEquals(recW[1], recR[1]);
-                       Assert.assertEquals(recW[2], recR[2]);
-               }
-       }
-       
-       @Test
-       public void testGetBucketBoundary() {
-               
-               SimpleDistribution dd = new SimpleDistribution(
-                               new Key<?>[][] {{new IntValue(1), new 
StringValue("A")}, 
-                                                       {new IntValue(2), new 
StringValue("B")}, 
-                                                       {new IntValue(3), new 
StringValue("C")},
-                                                       {new IntValue(4), new 
StringValue("D")},
-                                                       {new IntValue(5), new 
StringValue("E")},
-                                                       {new IntValue(6), new 
StringValue("F")},
-                                                       {new IntValue(7), new 
StringValue("G")}});
-               
-               Key<?>[] boundRec = dd.getBucketBoundary(0, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 1);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("A"));
-               
-               boundRec = dd.getBucketBoundary(1, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 2);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("B"));
-               
-               boundRec = dd.getBucketBoundary(2, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 3);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("C"));
-               
-               boundRec = dd.getBucketBoundary(3, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("D"));
-               
-               boundRec = dd.getBucketBoundary(4, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 5);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("E"));
-               
-               boundRec = dd.getBucketBoundary(5, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 6);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("F"));
-               
-               boundRec = dd.getBucketBoundary(6, 8);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 7);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("G"));
-               
-               boundRec = dd.getBucketBoundary(0, 4);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 2);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("B"));
-               
-               boundRec = dd.getBucketBoundary(1, 4);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("D"));
-               
-               boundRec = dd.getBucketBoundary(2, 4);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 6);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("F"));
-               
-               boundRec = dd.getBucketBoundary(0, 2);
-               Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4);
-               Assert.assertTrue(((StringValue) 
boundRec[1]).getValue().equals("D"));
-               
-               try {
-                       dd.getBucketBoundary(0, 7);
-                       Assert.fail();
-               } catch(IllegalArgumentException iae) {
-                       // nothing to do
-               }
-               
-               try {
-                       dd.getBucketBoundary(3, 4);
-                       Assert.fail();
-               } catch(IllegalArgumentException iae) {
-                       // nothing to do
-               }
-               
-               try {
-                       dd.getBucketBoundary(-1, 4);
-                       Assert.fail();
-               } catch(IllegalArgumentException iae) {
-                       // nothing to do
-               }
-               
-               try {
-                       dd.getBucketBoundary(0, 0);
-                       Assert.fail();
-               } catch(IllegalArgumentException iae) {
-                       // nothing to do
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java 
b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
deleted file mode 100644
index b61dd6e..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map.Entry;
-
-public class CollectionsDataTypeTest {
-       
-       private DataOutputView out;
-
-       private DataInputView in;
-
-       @Before
-       public void setup() throws Exception {
-               PipedInputStream input = new PipedInputStream(1000);
-               in = new DataInputViewStreamWrapper(input);
-               out = new DataOutputViewStreamWrapper(new 
PipedOutputStream(input));
-       }
-
-       @Test
-       public void testPair() {
-               NfIntStringPair pair1 = new NfIntStringPair();
-
-               pair1.setFirst(new IntValue(10));
-               pair1.setSecond(new StringValue("This is a string"));
-
-               // test data retrieval
-               Assert.assertEquals(pair1.getFirst(), new IntValue(10));
-               Assert.assertEquals(pair1.getSecond(), new StringValue("This is 
a string"));
-
-               // test serialization
-               try {
-                       NfIntStringPair mPairActual = new NfIntStringPair();
-
-                       pair1.write(out);
-                       mPairActual.read(in);
-
-                       Assert.assertEquals(pair1, mPairActual);
-               } catch (IOException e) {
-                       Assert.fail("Unexpected IOException");
-               }
-
-               // test comparison
-               NfIntStringPair pair2 = new NfIntStringPair();
-               NfIntStringPair pair3 = new NfIntStringPair();
-               NfIntStringPair pair4 = new NfIntStringPair();
-               NfIntStringPair pair5 = new NfIntStringPair();
-               NfIntStringPair pair6 = new NfIntStringPair();
-
-               pair2.setFirst(new IntValue(10));
-               pair2.setSecond(new StringValue("This is a string"));
-
-               pair3.setFirst(new IntValue(5));
-               pair3.setSecond(new StringValue("This is a string"));
-
-               pair4.setFirst(new IntValue(15));
-               pair4.setSecond(new StringValue("This is a string"));
-
-               pair5.setFirst(new IntValue(10));
-               pair5.setSecond(new StringValue("This is a strina"));
-
-               pair6.setFirst(new IntValue(10));
-               pair6.setSecond(new StringValue("This is a strinz"));
-
-               Assert.assertTrue(pair1.compareTo(pair2) == 0);
-               Assert.assertTrue(pair1.compareTo(pair3) > 0);
-               Assert.assertTrue(pair1.compareTo(pair4) < 0);
-               Assert.assertTrue(pair1.compareTo(pair5) > 0);
-               Assert.assertTrue(pair1.compareTo(pair6) < 0);
-
-               Assert.assertTrue(pair1.equals(pair2));
-               Assert.assertFalse(pair1.equals(pair3));
-               Assert.assertFalse(pair1.equals(pair4));
-               Assert.assertFalse(pair1.equals(pair5));
-               Assert.assertFalse(pair1.equals(pair6));
-
-               // test incorrect comparison
-               NfDoubleStringPair mPair7 = new NfDoubleStringPair();
-               mPair7.setFirst(new DoubleValue(2.3));
-
-               // this is caught by the compiler now
-//             try {
-//                     pair1.compareTo(mPair7);
-//                     Assert.fail();
-//             } catch (Exception e) {
-//                     Assert.assertTrue(e instanceof ClassCastException);
-//             }
-
-               // test sorting
-               NfIntStringPair[] pairs = new NfIntStringPair[5];
-
-               pairs[0] = pair1;
-               pairs[1] = pair2;
-               pairs[2] = pair3;
-               pairs[3] = pair4;
-               pairs[4] = pair5;
-
-               Arrays.sort(pairs);
-
-               NfIntStringPair p1, p2;
-
-               for (int i = 1; i < 5; i++) {
-                       p1 = pairs[i - 1];
-                       p2 = pairs[i];
-
-                       Assert.assertTrue(p1.compareTo(p2) <= 0);
-               }
-
-               // test hashing
-               HashSet<NfIntStringPair> pairSet = new 
HashSet<NfIntStringPair>();
-
-               Assert.assertTrue(pairSet.add(pair2));
-               Assert.assertTrue(pairSet.add(pair3));
-               Assert.assertTrue(pairSet.add(pair4));
-               Assert.assertTrue(pairSet.add(pair5));
-               Assert.assertTrue(pairSet.add(pair6));
-               Assert.assertFalse(pairSet.add(pair1));
-
-               Assert.assertTrue(pairSet.containsAll(Arrays.asList(pairs)));
-       }
-
-       @Test
-       public void testPactMap() {
-               NfIntStringMap map0 = new NfIntStringMap();
-               map0.put(new IntValue(10), new StringValue("20"));
-
-               // test data retrieval
-               for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-                       Assert.assertEquals(entry.getValue(), new 
StringValue("20"));
-                       Assert.assertEquals(entry.getKey(), new IntValue(10));
-               }
-
-               // test data overwriting
-               map0.put(new IntValue(10), new StringValue("10"));
-               for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-                       Assert.assertEquals(entry.getValue(), new 
StringValue("10"));
-                       Assert.assertEquals(entry.getKey(), new IntValue(10));
-               }
-
-               // now test data retrieval of multiple values
-               map0.put(new IntValue(20), new StringValue("20"));
-               map0.put(new IntValue(30), new StringValue("30"));
-               map0.put(new IntValue(40), new StringValue("40"));
-
-               // construct an inverted map
-               NfStringIntMap mapinv = new NfStringIntMap();
-               for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-                       Assert.assertEquals(entry.getKey().getValue(), 
Integer.parseInt(entry.getValue().toString()));
-                       mapinv.put(entry.getValue(), entry.getKey());
-               }
-
-               for (Entry<StringValue, IntValue> entry : mapinv.entrySet()) {
-                       Assert.assertEquals(entry.getValue().getValue(), 
Integer.parseInt(entry.getKey().toString()));
-               }
-
-               // now test data transfer
-               NfIntStringMap nMap = new NfIntStringMap();
-               try {
-                       map0.write(out);
-                       nMap.read(in);
-               } catch (Exception e) {
-                       Assert.fail();
-               }
-               for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
-                       Assert.assertEquals(entry.getKey().getValue(), 
Integer.parseInt(entry.getValue().toString()));
-               }
-       }
-
-       @Test
-       public void testPactList() {
-               NfStringList list = new NfStringList();
-               list.add(new StringValue("Hello!"));
-
-               for (StringValue value : list) {
-                       Assert.assertEquals(value, new StringValue("Hello!"));
-               }
-
-               list.add(new StringValue("Hello2!"));
-               list.add(new StringValue("Hello3!"));
-               list.add(new StringValue("Hello4!"));
-
-               // test data transfer
-               NfStringList mList2 = new NfStringList();
-               try {
-                       list.write(out);
-                       mList2.read(in);
-               }
-               catch (Exception e) {
-                       Assert.fail();
-               }
-               Assert.assertTrue(list.equals(mList2));
-       }
-
-       private class NfIntStringPair extends Pair<IntValue, StringValue> {
-               private static final long serialVersionUID = 1L;
-       }
-
-       private class NfDoubleStringPair extends Pair<DoubleValue, StringValue> 
{
-               private static final long serialVersionUID = 1L;
-       }
-
-       private class NfStringList extends ListValue<StringValue> {
-               private static final long serialVersionUID = 1L;
-       }
-
-       private class NfIntStringMap extends MapValue<IntValue, StringValue> {
-               private static final long serialVersionUID = 1L;
-       }
-
-       private class NfStringIntMap extends MapValue<StringValue, IntValue> {
-               private static final long serialVersionUID = 1L;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
deleted file mode 100644
index 1fc4c34..0000000
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.types.Key;
-
-
-public class PostPassUtils {
-
-       public static <X> Class<? extends Key<?>>[] 
getKeys(AbstractSchema<Class< ? extends X>> schema, int[] fields) throws 
MissingFieldTypeInfoException {
-               @SuppressWarnings("unchecked")
-               Class<? extends Key<?>>[] keyTypes = new Class[fields.length];
-               
-               for (int i = 0; i < fields.length; i++) {
-                       Class<? extends X> type = schema.getType(fields[i]);
-                       if (type == null) {
-                               throw new MissingFieldTypeInfoException(i);
-                       } else if (Key.class.isAssignableFrom(type)) {
-                               @SuppressWarnings("unchecked")
-                               Class<? extends Key<?>> keyType = (Class<? 
extends Key<?>>) type;
-                               keyTypes[i] = keyType;
-                       } else {
-                               throw new CompilerException("The field type " + 
type.getName() +
-                                               " cannot be used as a key 
because it does not implement the interface 'Key'");
-                       }
-               }
-               
-               return keyTypes;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
deleted file mode 100644
index 1545d6f..0000000
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.types.Key;
-
-/**
- * Class encapsulating a schema map (int column position -&gt; column type) 
and a reference counter.
- */
-public class SparseKeySchema extends AbstractSchema<Class<? extends Key<?>>> {
-       
-       private final Map<Integer, Class<? extends Key<?>>> schema;
-       
-       
-       public SparseKeySchema() {
-               this.schema = new HashMap<Integer, Class<? extends Key<?>>>();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void addType(int key, Class<? extends Key<?>> type) throws 
ConflictingFieldTypeInfoException  {
-               Class<? extends Key<?>> previous = this.schema.put(key, type);
-               if (previous != null && previous != type) {
-                       throw new ConflictingFieldTypeInfoException(key, 
previous, type);
-               }
-       }
-       
-       @Override
-       public Class<? extends Key<?>> getType(int field) {
-               return this.schema.get(field);
-       }
-       
-       @Override
-       public Iterator<Entry<Integer, Class<? extends Key<?>>>> iterator() {
-               return this.schema.entrySet().iterator();
-       }
-       
-       public int getNumTypes() {
-               return this.schema.size();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int hashCode() {
-               return this.schema.hashCode() ^ 
getNumConnectionsThatContributed();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof SparseKeySchema) {
-                       SparseKeySchema other = (SparseKeySchema) obj;
-                       return this.schema.equals(other.schema) && 
-                                       this.getNumConnectionsThatContributed() 
== other.getNumConnectionsThatContributed();
-               } else {
-                       return false;
-               }
-       }
-       
-       @Override
-       public String toString() {
-               return "<" + getNumConnectionsThatContributed() + "> : " + 
this.schema.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
index 74126f8..483bc51 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
@@ -21,7 +21,6 @@ package org.apache.flink.optimizer.dataproperties;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
 
 import java.io.IOException;
 
@@ -29,8 +28,8 @@ import java.io.IOException;
 public class MockDistribution implements DataDistribution {
 
        @Override
-       public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-               return new Key<?>[0];
+       public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+               return new Object[0];
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index 9ccb899..c3c898d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -33,8 +33,8 @@ import 
org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Value;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -48,11 +48,11 @@ public class CachedMatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record,
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        private final List<Record> outList = new ArrayList<Record>();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index a4e4fd5..21e7dc4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -27,7 +28,6 @@ import 
org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactor
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -38,11 +38,11 @@ public class CoGroupTaskExternalITCase extends 
DriverTestBase<CoGroupFunction<Re
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        private final CountingOutputCollector output = new 
CountingOutputCollector();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index bf7d467..f178e6b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -33,7 +34,6 @@ import 
org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -44,11 +44,11 @@ public class CoGroupTaskTest extends 
DriverTestBase<CoGroupFunction<Record, Reco
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        private final CountingOutputCollector output = new 
CountingOutputCollector();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 1699f79..f5a61a8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -32,7 +33,6 @@ import 
org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ 
IntValue.class });
 
        public CombineTaskExternalITCase(ExecutionConfig config) {
                super(config, COMBINE_MEM, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
index 5dc3772..21f6ac2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
@@ -27,7 +28,6 @@ import 
org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactor
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -46,11 +46,11 @@ public class JoinTaskExternalITCase extends 
DriverTestBase<FlatJoinFunction<Reco
 
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        private final CountingOutputCollector output = new 
CountingOutputCollector();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
index 4ce4fd1..753863c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -34,8 +34,8 @@ import 
org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -59,11 +59,11 @@ public class JoinTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Record
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ 
IntValue.class });
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ 
IntValue.class });
        
        private final List<Record> outList = new ArrayList<>();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index 6ebafee..1d7fdb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +36,6 @@ import 
org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -45,7 +46,7 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        private final List<Record> outList = new ArrayList<>();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 904b81e..e05f7d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +41,6 @@ import 
org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -51,7 +51,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
        
        @SuppressWarnings("unchecked")
        private final RecordComparator comparator = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
+               new int[]{0}, (Class<? extends Value>[])new Class[]{ 
IntValue.class });
        
        private final List<Record> outList = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index d9d6a25..dd6b4c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -48,9 +48,9 @@ import 
org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import 
org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,7 +81,7 @@ public class HashTableITCase {
        {
                final int[] keyPos = new int[] {0};
                @SuppressWarnings("unchecked")
-               final Class<? extends Key<?>>[] keyType = (Class<? extends 
Key<?>>[]) new Class[] { IntValue.class };
+               final Class<? extends Value>[] keyType = (Class<? extends 
Value>[]) new Class[] { IntValue.class };
                
                this.recordBuildSideAccesssor = RecordSerializer.get();
                this.recordProbeSideAccesssor = RecordSerializer.get();

Reply via email to