[FLINK-3169] Remove Key class This closes #1667.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe117afd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe117afd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe117afd Branch: refs/heads/master Commit: fe117afd31e823006f680d145c8f75033816ed17 Parents: d7aa989 Author: zentol <s.mo...@web.de> Authored: Tue Feb 16 12:23:41 2016 +0100 Committer: zentol <s.mo...@web.de> Committed: Tue Mar 8 13:49:01 2016 +0100 ---------------------------------------------------------------------- .../distributions/SimpleDistribution.java | 182 -------------- .../SimpleIntegerDistribution.java | 160 ------------ .../UniformDoubleDistribution.java | 65 ----- .../UniformIntegerDistribution.java | 66 ----- .../flink/api/common/operators/Ordering.java | 11 +- .../org/apache/flink/types/BooleanValue.java | 2 - .../java/org/apache/flink/types/ByteValue.java | 2 - .../java/org/apache/flink/types/CharValue.java | 2 - .../org/apache/flink/types/DoubleValue.java | 4 +- .../java/org/apache/flink/types/FloatValue.java | 4 +- .../java/org/apache/flink/types/IntValue.java | 2 - .../main/java/org/apache/flink/types/Key.java | 59 ----- .../java/org/apache/flink/types/LongValue.java | 2 - .../org/apache/flink/types/NormalizableKey.java | 4 +- .../java/org/apache/flink/types/NullValue.java | 2 - .../main/java/org/apache/flink/types/Pair.java | 177 -------------- .../java/org/apache/flink/types/ShortValue.java | 2 - .../org/apache/flink/types/StringValue.java | 3 +- .../SimpleDataDistributionTest.java | 228 ----------------- .../flink/types/CollectionsDataTypeTest.java | 243 ------------------- .../flink/optimizer/postpass/PostPassUtils.java | 47 ---- .../optimizer/postpass/SparseKeySchema.java | 86 ------- .../dataproperties/MockDistribution.java | 5 +- .../runtime/operators/CachedMatchTaskTest.java | 6 +- .../operators/CoGroupTaskExternalITCase.java | 6 +- .../runtime/operators/CoGroupTaskTest.java | 6 +- .../operators/CombineTaskExternalITCase.java | 4 +- .../operators/JoinTaskExternalITCase.java | 6 +- .../flink/runtime/operators/JoinTaskTest.java | 6 +- .../operators/ReduceTaskExternalITCase.java | 5 +- .../flink/runtime/operators/ReduceTaskTest.java | 4 +- .../runtime/operators/hash/HashTableITCase.java | 4 +- .../testutils/recordutils/RecordComparator.java | 46 ++-- .../recordutils/RecordComparatorFactory.java | 16 +- .../recordutils/RecordPairComparator.java | 18 +- .../RecordPairComparatorFactory.java | 12 +- .../org/apache/flink/test/util/CoordVector.java | 4 +- 37 files changed, 84 insertions(+), 1417 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java deleted file mode 100644 index bf759d9..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleDistribution.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.distributions; - -import java.io.IOException; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Key; -import org.apache.flink.util.InstantiationUtil; - -@PublicEvolving -public class SimpleDistribution implements DataDistribution { - - private static final long serialVersionUID = 1L; - - protected Key<?>[][] boundaries; - - protected int dim; - - - public SimpleDistribution() { - boundaries = new Key[0][]; - } - - public SimpleDistribution(Key<?>[] bucketBoundaries) { - if (bucketBoundaries == null) { - throw new IllegalArgumentException("Bucket boundaries must not be null."); - } - if (bucketBoundaries.length == 0) { - throw new IllegalArgumentException("Bucket boundaries must not be empty."); - } - - // dimensionality is one in this case - dim = 1; - - @SuppressWarnings("unchecked") - Class<? extends Key<?>> clazz = (Class<? extends Key<?>>) bucketBoundaries[0].getClass(); - - // make the array 2-dimensional - boundaries = new Key[bucketBoundaries.length][]; - for (int i = 0; i < bucketBoundaries.length; i++) { - if (bucketBoundaries[i].getClass() != clazz) { - throw new IllegalArgumentException("The bucket boundaries are of different class types."); - } - - boundaries[i] = new Key[] { bucketBoundaries[i] }; - } - } - - @SuppressWarnings("unchecked") - public SimpleDistribution(Key<?>[][] bucketBoundaries) { - if (bucketBoundaries == null) { - throw new IllegalArgumentException("Bucket boundaries must not be null."); - } - if (bucketBoundaries.length == 0) { - throw new IllegalArgumentException("Bucket boundaries must not be empty."); - } - - // dimensionality is one in this case - dim = bucketBoundaries[0].length; - - Class<? extends Key<?>>[] types = new Class[dim]; - for (int i = 0; i < dim; i++) { - types[i] = (Class<? extends Key<?>>) bucketBoundaries[0][i].getClass(); - } - - // check the array - for (int i = 1; i < bucketBoundaries.length; i++) { - if (bucketBoundaries[i].length != dim) { - throw new IllegalArgumentException("All bucket boundaries must have the same dimensionality."); - } - for (int d = 0; d < dim; d++) { - if (types[d] != bucketBoundaries[i][d].getClass()) { - throw new IllegalArgumentException("The bucket boundaries are of different class types."); - } - } - } - - boundaries = bucketBoundaries; - } - - @Override - public int getNumberOfFields() { - return this.dim; - } - - @Override - public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - // check validity of arguments - if(bucketNum < 0) { - throw new IllegalArgumentException("Requested bucket must be greater than or equal to 0."); - } else if(bucketNum >= (totalNumBuckets - 1)) { - throw new IllegalArgumentException("Request bucket must be smaller than the total number of buckets minus 1."); - } - if(totalNumBuckets < 1) { - throw new IllegalArgumentException("Total number of bucket must be larger than 0."); - } - - final int maxNumBuckets = boundaries.length + 1; - - // check if max number of buckets is equal to or an even multiple of the requested number of buckets - if((maxNumBuckets % totalNumBuckets) == 0) { - // easy case, just use each n-th boundary - final int n = maxNumBuckets / totalNumBuckets; - final int bucketId = bucketNum * n + (n - 1); - - return boundaries[bucketId]; - } else { - throw new IllegalArgumentException("Interpolation of bucket boundaries currently not supported. " + - "Please use an even divider of the maximum possible buckets (here: "+maxNumBuckets+") as totalBuckets."); - // TODO: might be relaxed if much more boundary records are available than requested - } - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.dim); - out.writeInt(boundaries.length); - - // write types - for (int i = 0; i < dim; i++) { - out.writeUTF(boundaries[0][i].getClass().getName()); - } - - for (int i = 0; i < boundaries.length; i++) { - for (int d = 0; d < dim; d++) { - boundaries[i][d].write(out); - } - } - } - - @SuppressWarnings("unchecked") - @Override - public void read(DataInputView in) throws IOException { - this.dim = in.readInt(); - final int len = in.readInt(); - - boundaries = new Key[len][]; - - // read types - Class<? extends Key<?>>[] types = new Class[dim]; - for (int i = 0; i < dim; i++) { - String className = in.readUTF(); - try { - types[i] = (Class<? extends Key<?>>) Class.forName(className, true, getClass().getClassLoader()).asSubclass(Key.class); - } catch (ClassNotFoundException e) { - throw new IOException("Could not load type class '" + className + "'."); - } catch (Throwable t) { - throw new IOException("Error loading type class '" + className + "'.", t); - } - } - - for (int i = 0; i < len; i++) { - Key<?>[] bucket = new Key[dim]; - for (int d = 0; d < dim; d++) { - Key<?> val = InstantiationUtil.instantiate(types[d], Key.class); - val.read(in); - bucket[d] = val; - } - - boundaries[i] = bucket; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java deleted file mode 100644 index cd1a8c5..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/SimpleIntegerDistribution.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.distributions; - -import java.io.IOException; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.IntValue; - -@PublicEvolving -public class SimpleIntegerDistribution extends SimpleDistribution { - - private static final long serialVersionUID = 1L; - - - public SimpleIntegerDistribution() { - boundaries = new IntValue[0][]; - } - - public SimpleIntegerDistribution(int[] bucketBoundaries) { - if (bucketBoundaries == null) { - throw new IllegalArgumentException("Bucket boundaries must not be null."); - } - if (bucketBoundaries.length == 0) { - throw new IllegalArgumentException("Bucket boundaries must not be empty."); - } - - // dimensionality is one in this case - dim = 1; - - // make the array 2-dimensional - boundaries = packIntegers(bucketBoundaries); - } - - public SimpleIntegerDistribution(IntValue[] bucketBoundaries) { - if (bucketBoundaries == null) { - throw new IllegalArgumentException("Bucket boundaries must not be null."); - } - if (bucketBoundaries.length == 0) { - throw new IllegalArgumentException("Bucket boundaries must not be empty."); - } - - // dimensionality is one in this case - dim = 1; - - // make the array 2-dimensional - boundaries = new IntValue[bucketBoundaries.length][]; - for (int i = 0; i < bucketBoundaries.length; i++) { - boundaries[i] = new IntValue[] { bucketBoundaries[i] }; - } - } - - public SimpleIntegerDistribution(IntValue[][] bucketBoundaries) { - if (bucketBoundaries == null) { - throw new IllegalArgumentException("Bucket boundaries must not be null."); - } - if (bucketBoundaries.length == 0) { - throw new IllegalArgumentException("Bucket boundaries must not be empty."); - } - - // dimensionality is one in this case - dim = bucketBoundaries[0].length; - - // check the array - for (int i = 1; i < bucketBoundaries.length; i++) { - if (bucketBoundaries[i].length != dim) { - throw new IllegalArgumentException("All bucket boundaries must have the same dimensionality."); - } - } - } - - @Override - public int getNumberOfFields() { - return this.dim; - } - - @Override - public IntValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - // check validity of arguments - if(bucketNum < 0) { - throw new IllegalArgumentException("Requested bucket must be greater than or equal to 0."); - } else if(bucketNum >= (totalNumBuckets - 1)) { - throw new IllegalArgumentException("Request bucket must be smaller than the total number of buckets minus 1."); - } - if(totalNumBuckets < 1) { - throw new IllegalArgumentException("Total number of bucket must be larger than 0."); - } - - final int maxNumBuckets = boundaries.length + 1; - - // check if max number of buckets is equal to or an even multiple of the requested number of buckets - if((maxNumBuckets % totalNumBuckets) == 0) { - // easy case, just use each n-th boundary - final int n = maxNumBuckets / totalNumBuckets; - final int bucketId = bucketNum * n + (n - 1); - - return (IntValue[]) boundaries[bucketId]; - } else { - throw new IllegalArgumentException("Interpolation of bucket boundaries currently not supported. " + - "Please use an even divider of the maximum possible buckets (here: "+maxNumBuckets+") as totalBuckets."); - // TODO: might be relaxed if much more boundary records are available than requested - } - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.dim); - out.writeInt(boundaries.length); - - for (int i = 0; i < boundaries.length; i++) { - for (int d = 0; d < dim; d++) { - out.writeInt(((IntValue) boundaries[i][d]).getValue()); - } - } - } - - @Override - public void read(DataInputView in) throws IOException { - this.dim = in.readInt(); - final int len = in.readInt(); - - boundaries = new IntValue[len][]; - - for (int i = 0; i < len; i++) { - IntValue[] bucket = new IntValue[dim]; - for (int d = 0; d < dim; d++) { - bucket[d] = new IntValue(in.readInt()); - } - - boundaries[i] = bucket; - } - } - - private static IntValue[][] packIntegers(int[] values) { - IntValue[][] packed = new IntValue[values.length][]; - for (int i = 0; i < values.length; i++) { - packed[i] = new IntValue[] { new IntValue(values[i]) }; - } - - return packed; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java deleted file mode 100644 index df1f095..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformDoubleDistribution.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.distributions; - -import java.io.IOException; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.DoubleValue; - -@PublicEvolving -public class UniformDoubleDistribution implements DataDistribution { - - private static final long serialVersionUID = 1L; - - private double min, max; - - - public UniformDoubleDistribution() {} - - public UniformDoubleDistribution(double min, double max) { - this.min = min; - this.max = max; - } - - @Override - public DoubleValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - double bucketSize = (max - min) / totalNumBuckets; - return new DoubleValue[] {new DoubleValue(min + (bucketNum+1) * bucketSize) }; - } - - @Override - public int getNumberOfFields() { - return 1; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeDouble(min); - out.writeDouble(max); - } - - @Override - public void read(DataInputView in) throws IOException { - min = in.readDouble(); - max = in.readDouble(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java deleted file mode 100644 index 504f65b..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/UniformIntegerDistribution.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.distributions; - -import java.io.IOException; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.IntValue; - -@PublicEvolving -public class UniformIntegerDistribution implements DataDistribution { - - private static final long serialVersionUID = 1L; - - private int min, max; - - - public UniformIntegerDistribution() {} - - public UniformIntegerDistribution(int min, int max) { - this.min = min; - this.max = max; - } - - @Override - public IntValue[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - long diff = ((long) max) - ((long) min) + 1; - double bucketSize = diff / ((double) totalNumBuckets); - return new IntValue[] {new IntValue(min + (int) ((bucketNum+1) * bucketSize)) }; - } - - @Override - public int getNumberOfFields() { - return 1; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(min); - out.writeInt(max); - } - - @Override - public void read(DataInputView in) throws IOException { - min = in.readInt(); - max = in.readInt(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java index 23928b3..7332698 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.types.Key; /** * This class represents an ordering on a set of fields. It specifies the fields and order direction @@ -34,7 +33,7 @@ public class Ordering implements Cloneable { protected FieldList indexes = new FieldList(); - protected final ArrayList<Class<? extends Key<?>>> types = new ArrayList<Class<? extends Key<?>>>(); + protected final ArrayList<Class<? extends Comparable<?>>> types = new ArrayList<Class<? extends Comparable<?>>>(); protected final ArrayList<Order> orders = new ArrayList<Order>(); @@ -50,7 +49,7 @@ public class Ordering implements Cloneable { * @param type * @param order */ - public Ordering(int index, Class<? extends Key<?>> type, Order order) { + public Ordering(int index, Class<? extends Comparable<?>> type, Order order) { appendOrdering(index, type, order); } @@ -63,7 +62,7 @@ public class Ordering implements Cloneable { * * @return This ordering with an additional appended order requirement. */ - public Ordering appendOrdering(Integer index, Class<? extends Key<?>> type, Order order) { + public Ordering appendOrdering(Integer index, Class<? extends Comparable<?>> type, Order order) { if (index.intValue() < 0) { throw new IllegalArgumentException("The key index must not be negative."); } @@ -97,7 +96,7 @@ public class Ordering implements Cloneable { return this.indexes.get(index); } - public Class<? extends Key<?>> getType(int index) { + public Class<? extends Comparable<?>> getType(int index) { if (index < 0 || index >= this.types.size()) { throw new IndexOutOfBoundsException(String.valueOf(index)); } @@ -114,7 +113,7 @@ public class Ordering implements Cloneable { // -------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") - public Class<? extends Key<?>>[] getTypes() { + public Class<? extends Comparable<?>>[] getTypes() { return this.types.toArray(new Class[this.types.size()]); } http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java index 10ca069..4bc387b 100644 --- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java @@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Boxed serializable and comparable boolean type, representing the primitive * type {@code boolean}. - * - * @see org.apache.flink.types.Key */ @Public public class BooleanValue implements NormalizableKey<BooleanValue>, ResettableValue<BooleanValue>, CopyableValue<BooleanValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/ByteValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java index a1a9e32..cb1c669 100644 --- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java @@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Boxed serializable and comparable byte type, representing the primitive * type {@code byte} (signed 8 bit integer). - * - * @see org.apache.flink.types.Key */ @Public public class ByteValue implements NormalizableKey<ByteValue>, ResettableValue<ByteValue>, CopyableValue<ByteValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/CharValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java b/flink-core/src/main/java/org/apache/flink/types/CharValue.java index 8b2ab29..f800832 100644 --- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java @@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Boxed serializable and comparable character type, representing the primitive * type {@code char}. - * - * @see org.apache.flink.types.Key */ @Public public class CharValue implements NormalizableKey<CharValue>, ResettableValue<CharValue>, CopyableValue<CharValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java index 80fb77b..7dba25b 100644 --- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java @@ -28,11 +28,9 @@ import org.apache.flink.core.memory.DataOutputView; /** * Boxed serializable and comparable double precision floating point type, representing the primitive * type {@code double}. - * - * @see org.apache.flink.types.Key */ @Public -public class DoubleValue implements Key<DoubleValue>, ResettableValue<DoubleValue>, CopyableValue<DoubleValue> { +public class DoubleValue implements Comparable<DoubleValue>, ResettableValue<DoubleValue>, CopyableValue<DoubleValue> { private static final long serialVersionUID = 1L; private double value; http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/FloatValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java index 371ce52..e3e49ce 100644 --- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java @@ -28,11 +28,9 @@ import org.apache.flink.core.memory.DataOutputView; /** * Boxed serializable and comparable single precision floating point type, representing the primitive * type {@code float}. - * - * @see org.apache.flink.types.Key */ @Public -public class FloatValue implements Key<FloatValue>, ResettableValue<FloatValue>, CopyableValue<FloatValue> { +public class FloatValue implements Comparable<FloatValue>, ResettableValue<FloatValue>, CopyableValue<FloatValue> { private static final long serialVersionUID = 1L; private float value; http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/IntValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java b/flink-core/src/main/java/org/apache/flink/types/IntValue.java index 0f63117..347fd1d 100644 --- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java @@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Boxed serializable and comparable integer type, representing the primitive * type {@code int}. - * - * @see org.apache.flink.types.Key */ @Public public class IntValue implements NormalizableKey<IntValue>, ResettableValue<IntValue>, CopyableValue<IntValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/Key.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Key.java b/flink-core/src/main/java/org/apache/flink/types/Key.java deleted file mode 100644 index c1e0626..0000000 --- a/flink-core/src/main/java/org/apache/flink/types/Key.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.types; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * This interface has to be implemented by all data types that act as key. Keys are used to establish - * relationships between values. A key must always be {@link java.lang.Comparable} to other keys of - * the same type. In addition, keys must implement a correct {@link java.lang.Object#hashCode()} method - * and {@link java.lang.Object#equals(Object)} method to ensure that grouping on keys works properly. - * <p> - * This interface extends {@link org.apache.flink.types.Value} and requires to implement - * the serialization of its value. - * - * @see org.apache.flink.types.Value - * @see org.apache.flink.core.io.IOReadableWritable - * @see java.lang.Comparable - * - * @deprecated The Key type is a relict of a deprecated and removed API and will be removed - * in future versions as well. - */ -@Deprecated -@PublicEvolving -public interface Key<T> extends Value, Comparable<T> { - - /** - * All keys must override the hash-code function to generate proper deterministic hash codes, - * based on their contents. - * - * @return The hash code of the key - */ - public int hashCode(); - - /** - * Compares the object on equality with another object. - * - * @param other The other object to compare against. - * - * @return True, iff this object is identical to the other object, false otherwise. - */ - public boolean equals(Object other); -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/LongValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java b/flink-core/src/main/java/org/apache/flink/types/LongValue.java index ef93864..dfb7845 100644 --- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java @@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Boxed serializable and comparable long integer type, representing the primitive * type {@code long}. - * - * @see org.apache.flink.types.Key */ @Public public class LongValue implements NormalizableKey<LongValue>, ResettableValue<LongValue>, CopyableValue<LongValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java b/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java index 6c5e204..e040599 100644 --- a/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java +++ b/flink-core/src/main/java/org/apache/flink/types/NormalizableKey.java @@ -36,8 +36,8 @@ import org.apache.flink.core.memory.MemorySegment; * key length. */ @Public -public interface NormalizableKey<T> extends Key<T> { - +public interface NormalizableKey<T> extends Comparable<T> { + /** * Gets the maximal length of normalized keys that the data type would produce to determine * the order of instances solely by the normalized key. A value of {@link java.lang.Integer}.MAX_VALUE http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/NullValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java b/flink-core/src/main/java/org/apache/flink/types/NullValue.java index 9a3885d..e188929 100644 --- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java @@ -28,8 +28,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Null base type for programs that implements the Key interface. - * - * @see org.apache.flink.types.Key */ @Public public final class NullValue implements NormalizableKey<NullValue>, CopyableValue<NullValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/Pair.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Pair.java b/flink-core/src/main/java/org/apache/flink/types/Pair.java deleted file mode 100644 index 9bf2387..0000000 --- a/flink-core/src/main/java/org/apache/flink/types/Pair.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.types; - -import java.io.IOException; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.ReflectionUtil; - -/** - * Generic pair base type. - * - * @see org.apache.flink.types.Key - * - * @param <U> Type of the pair's first element. - * @param <V> Type of the pair's second element. - */ -@PublicEvolving -public abstract class Pair<U extends Key<U>, V extends Key<V>> implements Key<Pair<U, V>> { - private static final long serialVersionUID = 1L; - - // class of the first pair element - private final Class<U> firstClass; - // class of the second pair element - private final Class<V> secondClass; - - // the first pair element - private U first; - // the second pair element - private V second; - - /** - * Initializes both encapsulated pair elements with empty objects. - */ - public Pair() { - this.firstClass = ReflectionUtil.<U> getTemplateType1(this.getClass()); - this.secondClass = ReflectionUtil.<V> getTemplateType2(this.getClass()); - - try { - this.first = this.firstClass.newInstance(); - this.second = this.secondClass.newInstance(); - } catch (final InstantiationException e) { - throw new RuntimeException(e); - } catch (final IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - /** - * Initializes the encapsulated pair elements with the provided values. - * - * @param first Initial value of the first encapsulated pair element. - * @param second Initial value of the second encapsulated pair element. - */ - public Pair(U first, V second) { - this.firstClass = ReflectionUtil.<U> getTemplateType1(this.getClass()); - this.secondClass = ReflectionUtil.<V> getTemplateType1(this.getClass()); - - this.first = first; - this.second = second; - } - - /** - * Returns the first encapsulated pair element. - * - * @return The first encapsulated pair element. - */ - public U getFirst() { - return this.first; - } - - /** - * Sets the first encapsulated pair element to the specified value. - * - * @param first - * The new value of the first encapsulated pair element. - */ - public void setFirst(final U first) { - if (first == null) { - throw new NullPointerException("first must not be null"); - } - - this.first = first; - } - - /** - * Returns the second encapsulated pair element. - * - * @return The second encapsulated pair element. - */ - public V getSecond() { - return this.second; - } - - /** - * Sets the second encapsulated pair element to the specified value. - * - * @param second - * The new value of the second encapsulated pair element. - */ - public void setSecond(final V second) { - if (second == null) { - throw new NullPointerException("second must not be null"); - } - - this.second = second; - } - - @Override - public String toString() { - return "<" + this.first.toString() + "|" + this.second.toString() + ">"; - } - - @Override - public void read(final DataInputView in) throws IOException { - this.first.read(in); - this.second.read(in); - } - - @Override - public void write(final DataOutputView out) throws IOException { - this.first.write(out); - this.second.write(out); - } - - @Override - public int compareTo(Pair<U, V> o) { - int result = this.first.compareTo(o.first); - if (result == 0) { - result = this.second.compareTo(o.second); - } - return result; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + this.first.hashCode(); - result = prime * result + this.second.hashCode(); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (this.getClass() != obj.getClass()) { - return false; - } - final Pair<?, ?> other = (Pair<?, ?>) obj; - return this.first.equals(other.first) && this.second.equals(other.second); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/ShortValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java index b91aaac..970e7e4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java @@ -29,8 +29,6 @@ import org.apache.flink.core.memory.MemorySegment; /** * Boxed serializable and comparable short integer type, representing the primitive * type {@code short}. - * - * @see org.apache.flink.types.Key */ @Public public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue<ShortValue>, CopyableValue<ShortValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/main/java/org/apache/flink/types/StringValue.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java index 0f9105c..e20083e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java @@ -37,8 +37,7 @@ import com.google.common.base.Preconditions; * The mutability allows to reuse the object inside the user code, also across invocations. Reusing a StringValue object * helps to increase the performance, as string objects are rather heavy-weight objects and incur a lot of garbage * collection overhead, if created and destroyed in masses. - * - * @see org.apache.flink.types.Key + * * @see org.apache.flink.types.NormalizableKey * @see java.lang.String * @see java.lang.CharSequence http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java b/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java deleted file mode 100644 index a58e6ab..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.distributions; - -import org.apache.flink.api.common.distributions.SimpleDistribution; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; -import org.apache.flink.types.StringValue; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -public class SimpleDataDistributionTest { - - @Test - public void testConstructorSingleKey() { - - // check correct data distribution - try { - SimpleDistribution dd = new SimpleDistribution(new Key<?>[] {new IntValue(1), new IntValue(2), new IntValue(3)}); - Assert.assertEquals(1, dd.getNumberOfFields()); - } - catch (Throwable t) { - Assert.fail(); - } - - // check incorrect key types - try { - new SimpleDistribution(new Key<?>[] {new IntValue(1), new StringValue("ABC"), new IntValue(3)}); - Assert.fail("Data distribution accepts inconsistent key types"); - } catch(IllegalArgumentException iae) { - // do nothing - } - - // check inconsistent number of keys - try { - new SimpleDistribution(new Key<?>[][] {{new IntValue(1)}, {new IntValue(2), new IntValue(2)}, {new IntValue(3)}}); - Assert.fail("Data distribution accepts inconsistent many keys"); - } catch(IllegalArgumentException iae) { - // do nothing - } - } - - @Test - public void testConstructorMultiKey() { - - // check correct data distribution - SimpleDistribution dd = new SimpleDistribution( - new Key<?>[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, - {new IntValue(2), new StringValue("A"), new IntValue(1)}, - {new IntValue(3), new StringValue("A"), new IntValue(1)}}); - Assert.assertEquals(3, dd.getNumberOfFields()); - - // check inconsistent key types - try { - new SimpleDistribution( - new Key<?>[][] {{new IntValue(1), new StringValue("A"), new DoubleValue(1.3d)}, - {new IntValue(2), new StringValue("B"), new IntValue(1)}}); - Assert.fail("Data distribution accepts incorrect key types"); - } catch(IllegalArgumentException iae) { - // do nothing - } - - // check inconsistent number of keys - try { - new SimpleDistribution( - new Key<?>[][] {{new IntValue(1), new IntValue(2)}, - {new IntValue(2), new IntValue(2)}, - {new IntValue(3)}}); - Assert.fail("Data distribution accepts bucket boundaries with inconsistent many keys"); - } catch(IllegalArgumentException iae) { - // do nothing - } - - } - - @Test - public void testWriteRead() { - - SimpleDistribution ddWrite = new SimpleDistribution( - new Key<?>[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, - {new IntValue(2), new StringValue("A"), new IntValue(1)}, - {new IntValue(2), new StringValue("B"), new IntValue(4)}, - {new IntValue(2), new StringValue("B"), new IntValue(3)}, - {new IntValue(2), new StringValue("B"), new IntValue(2)}}); - Assert.assertEquals(3, ddWrite.getNumberOfFields()); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - try { - ddWrite.write(new DataOutputViewStreamWrapper(baos)); - } catch (IOException e) { - Assert.fail("Error serializing the DataDistribution: " + e.getMessage()); - } - - byte[] seralizedDD = baos.toByteArray(); - - final ByteArrayInputStream bais = new ByteArrayInputStream(seralizedDD); - - SimpleDistribution ddRead = new SimpleDistribution(); - - try { - ddRead.read(new DataInputViewStreamWrapper(bais)); - } catch (Exception ex) { - Assert.fail("The deserialization of the encoded data distribution caused an error"); - } - - Assert.assertEquals(3, ddRead.getNumberOfFields()); - - // compare written and read distributions - for(int i=0;i<6;i++) { - Key<?>[] recW = ddWrite.getBucketBoundary(0, 6); - Key<?>[] recR = ddWrite.getBucketBoundary(0, 6); - - Assert.assertEquals(recW[0], recR[0]); - Assert.assertEquals(recW[1], recR[1]); - Assert.assertEquals(recW[2], recR[2]); - } - } - - @Test - public void testGetBucketBoundary() { - - SimpleDistribution dd = new SimpleDistribution( - new Key<?>[][] {{new IntValue(1), new StringValue("A")}, - {new IntValue(2), new StringValue("B")}, - {new IntValue(3), new StringValue("C")}, - {new IntValue(4), new StringValue("D")}, - {new IntValue(5), new StringValue("E")}, - {new IntValue(6), new StringValue("F")}, - {new IntValue(7), new StringValue("G")}}); - - Key<?>[] boundRec = dd.getBucketBoundary(0, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 1); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("A")); - - boundRec = dd.getBucketBoundary(1, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 2); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("B")); - - boundRec = dd.getBucketBoundary(2, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 3); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("C")); - - boundRec = dd.getBucketBoundary(3, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D")); - - boundRec = dd.getBucketBoundary(4, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 5); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("E")); - - boundRec = dd.getBucketBoundary(5, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 6); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("F")); - - boundRec = dd.getBucketBoundary(6, 8); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 7); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("G")); - - boundRec = dd.getBucketBoundary(0, 4); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 2); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("B")); - - boundRec = dd.getBucketBoundary(1, 4); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D")); - - boundRec = dd.getBucketBoundary(2, 4); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 6); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("F")); - - boundRec = dd.getBucketBoundary(0, 2); - Assert.assertEquals(((IntValue) boundRec[0]).getValue(), 4); - Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D")); - - try { - dd.getBucketBoundary(0, 7); - Assert.fail(); - } catch(IllegalArgumentException iae) { - // nothing to do - } - - try { - dd.getBucketBoundary(3, 4); - Assert.fail(); - } catch(IllegalArgumentException iae) { - // nothing to do - } - - try { - dd.getBucketBoundary(-1, 4); - Assert.fail(); - } catch(IllegalArgumentException iae) { - // nothing to do - } - - try { - dd.getBucketBoundary(0, 0); - Assert.fail(); - } catch(IllegalArgumentException iae) { - // nothing to do - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java deleted file mode 100644 index b61dd6e..0000000 --- a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.types; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map.Entry; - -public class CollectionsDataTypeTest { - - private DataOutputView out; - - private DataInputView in; - - @Before - public void setup() throws Exception { - PipedInputStream input = new PipedInputStream(1000); - in = new DataInputViewStreamWrapper(input); - out = new DataOutputViewStreamWrapper(new PipedOutputStream(input)); - } - - @Test - public void testPair() { - NfIntStringPair pair1 = new NfIntStringPair(); - - pair1.setFirst(new IntValue(10)); - pair1.setSecond(new StringValue("This is a string")); - - // test data retrieval - Assert.assertEquals(pair1.getFirst(), new IntValue(10)); - Assert.assertEquals(pair1.getSecond(), new StringValue("This is a string")); - - // test serialization - try { - NfIntStringPair mPairActual = new NfIntStringPair(); - - pair1.write(out); - mPairActual.read(in); - - Assert.assertEquals(pair1, mPairActual); - } catch (IOException e) { - Assert.fail("Unexpected IOException"); - } - - // test comparison - NfIntStringPair pair2 = new NfIntStringPair(); - NfIntStringPair pair3 = new NfIntStringPair(); - NfIntStringPair pair4 = new NfIntStringPair(); - NfIntStringPair pair5 = new NfIntStringPair(); - NfIntStringPair pair6 = new NfIntStringPair(); - - pair2.setFirst(new IntValue(10)); - pair2.setSecond(new StringValue("This is a string")); - - pair3.setFirst(new IntValue(5)); - pair3.setSecond(new StringValue("This is a string")); - - pair4.setFirst(new IntValue(15)); - pair4.setSecond(new StringValue("This is a string")); - - pair5.setFirst(new IntValue(10)); - pair5.setSecond(new StringValue("This is a strina")); - - pair6.setFirst(new IntValue(10)); - pair6.setSecond(new StringValue("This is a strinz")); - - Assert.assertTrue(pair1.compareTo(pair2) == 0); - Assert.assertTrue(pair1.compareTo(pair3) > 0); - Assert.assertTrue(pair1.compareTo(pair4) < 0); - Assert.assertTrue(pair1.compareTo(pair5) > 0); - Assert.assertTrue(pair1.compareTo(pair6) < 0); - - Assert.assertTrue(pair1.equals(pair2)); - Assert.assertFalse(pair1.equals(pair3)); - Assert.assertFalse(pair1.equals(pair4)); - Assert.assertFalse(pair1.equals(pair5)); - Assert.assertFalse(pair1.equals(pair6)); - - // test incorrect comparison - NfDoubleStringPair mPair7 = new NfDoubleStringPair(); - mPair7.setFirst(new DoubleValue(2.3)); - - // this is caught by the compiler now -// try { -// pair1.compareTo(mPair7); -// Assert.fail(); -// } catch (Exception e) { -// Assert.assertTrue(e instanceof ClassCastException); -// } - - // test sorting - NfIntStringPair[] pairs = new NfIntStringPair[5]; - - pairs[0] = pair1; - pairs[1] = pair2; - pairs[2] = pair3; - pairs[3] = pair4; - pairs[4] = pair5; - - Arrays.sort(pairs); - - NfIntStringPair p1, p2; - - for (int i = 1; i < 5; i++) { - p1 = pairs[i - 1]; - p2 = pairs[i]; - - Assert.assertTrue(p1.compareTo(p2) <= 0); - } - - // test hashing - HashSet<NfIntStringPair> pairSet = new HashSet<NfIntStringPair>(); - - Assert.assertTrue(pairSet.add(pair2)); - Assert.assertTrue(pairSet.add(pair3)); - Assert.assertTrue(pairSet.add(pair4)); - Assert.assertTrue(pairSet.add(pair5)); - Assert.assertTrue(pairSet.add(pair6)); - Assert.assertFalse(pairSet.add(pair1)); - - Assert.assertTrue(pairSet.containsAll(Arrays.asList(pairs))); - } - - @Test - public void testPactMap() { - NfIntStringMap map0 = new NfIntStringMap(); - map0.put(new IntValue(10), new StringValue("20")); - - // test data retrieval - for (Entry<IntValue, StringValue> entry : map0.entrySet()) { - Assert.assertEquals(entry.getValue(), new StringValue("20")); - Assert.assertEquals(entry.getKey(), new IntValue(10)); - } - - // test data overwriting - map0.put(new IntValue(10), new StringValue("10")); - for (Entry<IntValue, StringValue> entry : map0.entrySet()) { - Assert.assertEquals(entry.getValue(), new StringValue("10")); - Assert.assertEquals(entry.getKey(), new IntValue(10)); - } - - // now test data retrieval of multiple values - map0.put(new IntValue(20), new StringValue("20")); - map0.put(new IntValue(30), new StringValue("30")); - map0.put(new IntValue(40), new StringValue("40")); - - // construct an inverted map - NfStringIntMap mapinv = new NfStringIntMap(); - for (Entry<IntValue, StringValue> entry : map0.entrySet()) { - Assert.assertEquals(entry.getKey().getValue(), Integer.parseInt(entry.getValue().toString())); - mapinv.put(entry.getValue(), entry.getKey()); - } - - for (Entry<StringValue, IntValue> entry : mapinv.entrySet()) { - Assert.assertEquals(entry.getValue().getValue(), Integer.parseInt(entry.getKey().toString())); - } - - // now test data transfer - NfIntStringMap nMap = new NfIntStringMap(); - try { - map0.write(out); - nMap.read(in); - } catch (Exception e) { - Assert.fail(); - } - for (Entry<IntValue, StringValue> entry : map0.entrySet()) { - Assert.assertEquals(entry.getKey().getValue(), Integer.parseInt(entry.getValue().toString())); - } - } - - @Test - public void testPactList() { - NfStringList list = new NfStringList(); - list.add(new StringValue("Hello!")); - - for (StringValue value : list) { - Assert.assertEquals(value, new StringValue("Hello!")); - } - - list.add(new StringValue("Hello2!")); - list.add(new StringValue("Hello3!")); - list.add(new StringValue("Hello4!")); - - // test data transfer - NfStringList mList2 = new NfStringList(); - try { - list.write(out); - mList2.read(in); - } - catch (Exception e) { - Assert.fail(); - } - Assert.assertTrue(list.equals(mList2)); - } - - private class NfIntStringPair extends Pair<IntValue, StringValue> { - private static final long serialVersionUID = 1L; - } - - private class NfDoubleStringPair extends Pair<DoubleValue, StringValue> { - private static final long serialVersionUID = 1L; - } - - private class NfStringList extends ListValue<StringValue> { - private static final long serialVersionUID = 1L; - } - - private class NfIntStringMap extends MapValue<IntValue, StringValue> { - private static final long serialVersionUID = 1L; - } - - private class NfStringIntMap extends MapValue<StringValue, IntValue> { - private static final long serialVersionUID = 1L; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java deleted file mode 100644 index 1fc4c34..0000000 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.postpass; - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.types.Key; - - -public class PostPassUtils { - - public static <X> Class<? extends Key<?>>[] getKeys(AbstractSchema<Class< ? extends X>> schema, int[] fields) throws MissingFieldTypeInfoException { - @SuppressWarnings("unchecked") - Class<? extends Key<?>>[] keyTypes = new Class[fields.length]; - - for (int i = 0; i < fields.length; i++) { - Class<? extends X> type = schema.getType(fields[i]); - if (type == null) { - throw new MissingFieldTypeInfoException(i); - } else if (Key.class.isAssignableFrom(type)) { - @SuppressWarnings("unchecked") - Class<? extends Key<?>> keyType = (Class<? extends Key<?>>) type; - keyTypes[i] = keyType; - } else { - throw new CompilerException("The field type " + type.getName() + - " cannot be used as a key because it does not implement the interface 'Key'"); - } - } - - return keyTypes; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java deleted file mode 100644 index 1545d6f..0000000 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.postpass; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.flink.types.Key; - -/** - * Class encapsulating a schema map (int column position -> column type) and a reference counter. - */ -public class SparseKeySchema extends AbstractSchema<Class<? extends Key<?>>> { - - private final Map<Integer, Class<? extends Key<?>>> schema; - - - public SparseKeySchema() { - this.schema = new HashMap<Integer, Class<? extends Key<?>>>(); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void addType(int key, Class<? extends Key<?>> type) throws ConflictingFieldTypeInfoException { - Class<? extends Key<?>> previous = this.schema.put(key, type); - if (previous != null && previous != type) { - throw new ConflictingFieldTypeInfoException(key, previous, type); - } - } - - @Override - public Class<? extends Key<?>> getType(int field) { - return this.schema.get(field); - } - - @Override - public Iterator<Entry<Integer, Class<? extends Key<?>>>> iterator() { - return this.schema.entrySet().iterator(); - } - - public int getNumTypes() { - return this.schema.size(); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.schema.hashCode() ^ getNumConnectionsThatContributed(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof SparseKeySchema) { - SparseKeySchema other = (SparseKeySchema) obj; - return this.schema.equals(other.schema) && - this.getNumConnectionsThatContributed() == other.getNumConnectionsThatContributed(); - } else { - return false; - } - } - - @Override - public String toString() { - return "<" + getNumConnectionsThatContributed() + "> : " + this.schema.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java index 74126f8..483bc51 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java @@ -21,7 +21,6 @@ package org.apache.flink.optimizer.dataproperties; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Key; import java.io.IOException; @@ -29,8 +28,8 @@ import java.io.IOException; public class MockDistribution implements DataDistribution { @Override - public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - return new Key<?>[0]; + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + return new Object[0]; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java index 9ccb899..c3c898d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java @@ -33,8 +33,8 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.NirvanaOutputList; import org.apache.flink.runtime.operators.testutils.TaskCancelThread; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; +import org.apache.flink.types.Value; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -48,11 +48,11 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, @SuppressWarnings("unchecked") private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); @SuppressWarnings("unchecked") private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); private final List<Record> outList = new ArrayList<Record>(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java index a4e4fd5..21e7dc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.types.Value; import org.junit.Assert; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichCoGroupFunction; @@ -27,7 +28,6 @@ import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactor import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; @@ -38,11 +38,11 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Re @SuppressWarnings("unchecked") private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); @SuppressWarnings("unchecked") private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); private final CountingOutputCollector output = new CountingOutputCollector(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java index bf7d467..f178e6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.types.Value; import org.junit.Assert; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichCoGroupFunction; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.TaskCancelThread; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; @@ -44,11 +44,11 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco @SuppressWarnings("unchecked") private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); @SuppressWarnings("unchecked") private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); private final CountingOutputCollector output = new CountingOutputCollector(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java index 1699f79..f5a61a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.types.Value; import org.apache.flink.util.Collector; import org.junit.Assert; import org.apache.flink.api.common.functions.RichGroupReduceFunction; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.testutils.recordutils.RecordComparator; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.junit.Test; @@ -47,7 +47,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun @SuppressWarnings("unchecked") private final RecordComparator comparator = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ IntValue.class }); public CombineTaskExternalITCase(ExecutionConfig config) { super(config, COMBINE_MEM, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java index 5dc3772..21f6ac2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.types.Value; import org.junit.Assert; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.runtime.testutils.recordutils.RecordComparator; @@ -27,7 +28,6 @@ import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactor import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; @@ -46,11 +46,11 @@ public class JoinTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Reco @SuppressWarnings("unchecked") private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); @SuppressWarnings("unchecked") private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); private final CountingOutputCollector output = new CountingOutputCollector(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java index 4ce4fd1..753863c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java @@ -34,8 +34,8 @@ import org.apache.flink.runtime.operators.testutils.NirvanaOutputList; import org.apache.flink.runtime.operators.testutils.TaskCancelThread; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; +import org.apache.flink.types.Value; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -59,11 +59,11 @@ public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record @SuppressWarnings("unchecked") private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ IntValue.class }); @SuppressWarnings("unchecked") private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class<?>[]{ IntValue.class }); private final List<Record> outList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java index 6ebafee..1d7fdb8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java @@ -21,9 +21,11 @@ package org.apache.flink.runtime.operators; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.types.Value; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +36,6 @@ import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; @@ -45,7 +46,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc @SuppressWarnings("unchecked") private final RecordComparator comparator = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); private final List<Record> outList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java index 904b81e..e05f7d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.types.Value; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,6 @@ import org.apache.flink.runtime.operators.testutils.TaskCancelThread; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; @@ -51,7 +51,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor @SuppressWarnings("unchecked") private final RecordComparator comparator = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); + new int[]{0}, (Class<? extends Value>[])new Class[]{ IntValue.class }); private final List<Record> outList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/fe117afd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index d9d6a25..dd6b4c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -48,9 +48,9 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairComparator; import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator; import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; import org.apache.flink.types.NullKeyFieldException; import org.apache.flink.types.Record; +import org.apache.flink.types.Value; import org.apache.flink.util.MutableObjectIterator; import org.junit.After; import org.junit.Assert; @@ -81,7 +81,7 @@ public class HashTableITCase { { final int[] keyPos = new int[] {0}; @SuppressWarnings("unchecked") - final Class<? extends Key<?>>[] keyType = (Class<? extends Key<?>>[]) new Class[] { IntValue.class }; + final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[] { IntValue.class }; this.recordBuildSideAccesssor = RecordSerializer.get(); this.recordProbeSideAccesssor = RecordSerializer.get();