http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java deleted file mode 100644 index 19fac43..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new AvroSerializer<T>(type); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java deleted file mode 100644 index df1ff60..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new AvroSerializer<T>(type); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java deleted file mode 100644 index 8a89410..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.reflect.Nullable; -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.junit.Test; - -public class AvroSerializerEmptyArrayTest { - - @Test - public void testBookSerialization() { - try { - Book b = new Book(123, "This is a test book", 26382648); - AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class); - SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b); - test.testAll(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSerialization() { - try { - List<String> titles = new ArrayList<String>(); - - List<Book> books = new ArrayList<Book>(); - books.add(new Book(123, "This is a test book", 1)); - books.add(new Book(24234234, "This is a test book", 1)); - books.add(new Book(1234324, "This is a test book", 3)); - - BookAuthor a = new BookAuthor(1, titles, "Test Author"); - a.books = books; - a.bookType = BookAuthor.BookType.journal; - - AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class); - - SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a); - test.testAll(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - public static class Book { - - long bookId; - @Nullable - String title; - long authorId; - - public Book() {} - - public Book(long bookId, String title, long authorId) { - this.bookId = bookId; - this.title = title; - this.authorId = authorId; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (authorId ^ (authorId >>> 32)); - result = prime * result + (int) (bookId ^ (bookId >>> 32)); - result = prime * result + ((title == null) ? 0 : title.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Book other = (Book) obj; - if (authorId != other.authorId) - return false; - if (bookId != other.bookId) - return false; - if (title == null) { - if (other.title != null) - return false; - } else if (!title.equals(other.title)) - return false; - return true; - } - } - - public static class BookAuthor { - - enum BookType { - book, - article, - journal - } - - long authorId; - - @Nullable - List<String> bookTitles; - - @Nullable - List<Book> books; - - String authorName; - - BookType bookType; - - public BookAuthor() {} - - public BookAuthor(long authorId, List<String> bookTitles, String authorName) { - this.authorId = authorId; - this.bookTitles = bookTitles; - this.authorName = authorName; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (authorId ^ (authorId >>> 32)); - result = prime * result + ((authorName == null) ? 0 : authorName.hashCode()); - result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode()); - result = prime * result + ((bookType == null) ? 0 : bookType.hashCode()); - result = prime * result + ((books == null) ? 0 : books.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - BookAuthor other = (BookAuthor) obj; - if (authorId != other.authorId) - return false; - if (authorName == null) { - if (other.authorName != null) - return false; - } else if (!authorName.equals(other.authorName)) - return false; - if (bookTitles == null) { - if (other.bookTitles != null) - return false; - } else if (!bookTitles.equals(other.bookTitles)) - return false; - if (bookType != other.bookType) - return false; - if (books == null) { - if (other.books != null) - return false; - } else if (!books.equals(other.books)) - return false; - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java deleted file mode 100644 index b317275..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.types.StringValue; - -public class CopyableValueComparatorTest extends ComparatorTestBase<StringValue> { - - StringValue[] data = new StringValue[]{ - new StringValue(""), - new StringValue("Lorem Ipsum Dolor Omit Longer"), - new StringValue("aaaa"), - new StringValue("abcd"), - new StringValue("abce"), - new StringValue("abdd"), - new StringValue("accd"), - new StringValue("bbcd") - }; - - @Override - protected TypeComparator<StringValue> createComparator(boolean ascending) { - return new CopyableValueComparator<StringValue>(ascending, StringValue.class); - } - - @Override - protected TypeSerializer<StringValue> createSerializer() { - return new CopyableValueSerializer<StringValue>(StringValue.class); - } - - @Override - protected StringValue[] getSortedTestData() { - return data; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java deleted file mode 100644 index e4672cc..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import static org.apache.flink.api.java.typeutils.Either.Left; -import static org.apache.flink.api.java.typeutils.Either.Right; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.Either; -import org.apache.flink.api.java.typeutils.EitherTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.junit.Test; - -public class EitherSerializerTest { - - @SuppressWarnings("unchecked") - @Test - public void testStringDoubleEither() { - - Either<String, Double>[] testData = new Either[] { - Left("banana"), - Left(""), - Right(32.0), - Right(Double.MIN_VALUE), - Right(Double.MAX_VALUE)}; - - EitherTypeInfo<String, Double> eitherTypeInfo = (EitherTypeInfo<String, Double>) new EitherTypeInfo<String, Double>( - BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO); - EitherSerializer<String, Double> eitherSerializer = - (EitherSerializer<String, Double>) eitherTypeInfo.createSerializer(new ExecutionConfig()); - SerializerTestInstance<Either<String, Double>> testInstance = - new EitherSerializerTestInstance<Either<String, Double>>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData); - testInstance.testAll(); - } - - @SuppressWarnings("unchecked") - @Test - public void testEitherWithTuple() { - - Either<Tuple2<Long, Long>, Double>[] testData = new Either[] { - Either.Left(new Tuple2<>(2l, 9l)), - new Left<>(new Tuple2<>(Long.MIN_VALUE, Long.MAX_VALUE)), - new Right<>(32.0), - Right(Double.MIN_VALUE), - Right(Double.MAX_VALUE)}; - - EitherTypeInfo<Tuple2<Long, Long>, Double> eitherTypeInfo = (EitherTypeInfo<Tuple2<Long, Long>, Double>) - new EitherTypeInfo<Tuple2<Long, Long>, Double>( - new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO), - BasicTypeInfo.DOUBLE_TYPE_INFO); - EitherSerializer<Tuple2<Long, Long>, Double> eitherSerializer = - (EitherSerializer<Tuple2<Long, Long>, Double>) eitherTypeInfo.createSerializer(new ExecutionConfig()); - SerializerTestInstance<Either<Tuple2<Long, Long>, Double>> testInstance = - new EitherSerializerTestInstance<Either<Tuple2<Long, Long>, Double>>( - eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData); - testInstance.testAll(); - } - - /** - * {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()} - * checks that the type of the created instance is the same as the type class parameter. - * Since we arbitrarily create always create a Left instance we override this test. - */ - private class EitherSerializerTestInstance<T> extends SerializerTestInstance<T> { - - public EitherSerializerTestInstance(TypeSerializer<T> serializer, - Class<T> typeClass, int length, T[] testData) { - super(serializer, typeClass, length, testData); - } - - @Override - @Test - public void testInstantiate() { - try { - TypeSerializer<T> serializer = getSerializer(); - - T instance = serializer.createInstance(); - assertNotNull("The created instance must not be null.", instance); - - Class<T> type = getTypeClass(); - assertNotNull("The test is corrupt: type class is null.", type); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java deleted file mode 100644 index 06330d3..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.GenericPairComparator; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase; - -public class GenericPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> { - - @SuppressWarnings("unchecked") - private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, String, Double>(4, "hello", 20.0), - new Tuple3<Integer, String, Double>(4, "world", 23.2), - new Tuple3<Integer, String, Double>(5, "hello", 18.0), - new Tuple3<Integer, String, Double>(5, "world", 19.2), - new Tuple3<Integer, String, Double>(6, "hello", 16.0), - new Tuple3<Integer, String, Double>(6, "world", 17.2), - new Tuple3<Integer, String, Double>(7, "hello", 14.0), - new Tuple3<Integer, String, Double>(7, "world", 15.2) - }; - - @SuppressWarnings("unchecked") - private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{ - new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0), - new Tuple4<Integer, Float, Long, Double>(4, 0.221f, 15L, 23.2), - new Tuple4<Integer, Float, Long, Double>(5, 0.33f, 15L, 18.0), - new Tuple4<Integer, Float, Long, Double>(5, 0.44f, 20L, 19.2), - new Tuple4<Integer, Float, Long, Double>(6, 0.55f, 20L, 16.0), - new Tuple4<Integer, Float, Long, Double>(6, 0.66f, 29L, 17.2), - new Tuple4<Integer, Float, Long, Double>(7, 0.77f, 29L, 14.0), - new Tuple4<Integer, Float, Long, Double>(7, 0.88f, 34L, 15.2) - }; - - @SuppressWarnings("rawtypes") - @Override - protected GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) { - int[] fields1 = new int[]{0, 2}; - int[] fields2 = new int[]{0, 3}; - TypeComparator[] comps1 = new TypeComparator[]{ - new IntComparator(ascending), - new DoubleComparator(ascending) - }; - TypeComparator[] comps2 = new TypeComparator[]{ - new IntComparator(ascending), - new DoubleComparator(ascending) - }; - TypeSerializer[] sers1 = new TypeSerializer[]{ - IntSerializer.INSTANCE, - DoubleSerializer.INSTANCE - }; - TypeSerializer[] sers2= new TypeSerializer[]{ - IntSerializer.INSTANCE, - DoubleSerializer.INSTANCE - }; - TypeComparator<Tuple3<Integer, String, Double>> comp1 = new TupleComparator<Tuple3<Integer, String, Double>>(fields1, comps1, sers1); - TypeComparator<Tuple4<Integer, Float, Long, Double>> comp2 = new TupleComparator<Tuple4<Integer, Float, Long, Double>>(fields2, comps2, sers2); - return new GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(comp1, comp2); - } - - @Override - protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() { - return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java deleted file mode 100644 index 1c97816..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.junit.Test; - - -/** - * A serialization test for multidimensional arrays. - */ -public class MultidimensionalArraySerializerTest { - - @Test - public void testStringArray() { - String[][] array = new String[][]{{null,"b"},{"c","d"},{"e","f"},{"g","h"},null}; - TypeInformation<String[][]> ti = TypeExtractor.getForClass(String[][].class); - - SerializerTestInstance<String[][]> testInstance = new SerializerTestInstance<String[][]>(ti.createSerializer(new ExecutionConfig()), String[][].class, -1, array); - testInstance.testAll(); - } - - @Test - public void testPrimitiveArray() { - int[][] array = new int[][]{{12,1},{48,42},{23,80},{484,849},{987,4}}; - TypeInformation<int[][]> ti = TypeExtractor.getForClass(int[][].class); - - SerializerTestInstance<int[][]> testInstance = new SerializerTestInstance<int[][]>(ti.createSerializer(new ExecutionConfig()), int[][].class, -1, array); - testInstance.testAll(); - } - - public static class MyPojo { - public String field1; - public int field2; - - public MyPojo(String field1, int field2) { - this.field1 = field1; - this.field2 = field2; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof MyPojo)) { - return false; - } - MyPojo other = (MyPojo) obj; - return ((field1 == null && other.field1 == null) || (field1 != null && field1.equals(other.field1))) - && field2 == other.field2; - } - } - - @Test - public void testObjectArrays() { - Integer[][] array = new Integer[][]{{0,1}, null, {null, 42}}; - TypeInformation<Integer[][]> ti = TypeExtractor.getForClass(Integer[][].class); - - SerializerTestInstance<Integer[][]> testInstance = new SerializerTestInstance<Integer[][]>(ti.createSerializer(new ExecutionConfig()), Integer[][].class, -1, array); - testInstance.testAll(); - - MyPojo[][] array2 = new MyPojo[][]{{new MyPojo(null, 42), new MyPojo("test2", -1)}, {null, null}, null}; - TypeInformation<MyPojo[][]> ti2 = TypeExtractor.getForClass(MyPojo[][].class); - - SerializerTestInstance<MyPojo[][]> testInstance2 = new SerializerTestInstance<MyPojo[][]>(ti2.createSerializer(new ExecutionConfig()), MyPojo[][].class, -1, array2); - testInstance2.testAll(); - } - - public static class MyGenericPojo<T> { - public T[][] field; - - public MyGenericPojo() { - // nothing to do - } - - public MyGenericPojo(T[][] field) { - this.field = field; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof MyGenericPojo)) { - return false; - } - MyGenericPojo<?> other = (MyGenericPojo<?>) obj; - return (field == null && other.field == null) || (field != null && field.length == other.field.length); - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericObjectArrays() { - MyGenericPojo<String>[][] array = (MyGenericPojo<String>[][]) new MyGenericPojo[][]{ - { new MyGenericPojo<String>(new String[][]{{"a", "b"},{"c", "d"}}), null} - }; - TypeInformation ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.runtime.MultidimensionalArraySerializerTest$MyGenericPojo<field=String[][]>[][]"); - - SerializerTestInstance testInstance = new SerializerTestInstance(ti.createSerializer(new ExecutionConfig()), MyGenericPojo[][].class, -1, (Object) array); - testInstance.testAll(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java deleted file mode 100644 index 1baf443..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.util.Arrays; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.junit.Assert; - - -public class PojoComparatorTest extends ComparatorTestBase<PojoContainingTuple> { - TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class); - - PojoContainingTuple[] data = new PojoContainingTuple[]{ - new PojoContainingTuple(1, 1L, 1L), - new PojoContainingTuple(2, 2L, 2L), - new PojoContainingTuple(8519, 85190L, 85190L), - new PojoContainingTuple(8520, 85191L, 85191L), - }; - - @Override - protected TypeComparator<PojoContainingTuple> createComparator(boolean ascending) { - Assert.assertTrue(type instanceof CompositeType); - CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type; - ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType); - boolean[] orders = new boolean[keys.getNumberOfKeyFields()]; - Arrays.fill(orders, ascending); - return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new ExecutionConfig()); - } - - @Override - protected TypeSerializer<PojoContainingTuple> createSerializer() { - return type.createSerializer(new ExecutionConfig()); - } - - @Override - protected PojoContainingTuple[] getSortedTestData() { - return data; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java deleted file mode 100644 index ca17ee0..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.java.tuple.Tuple2; - -/** - * This file belongs to the PojoComparatorTest test - * - */ -public class PojoContainingTuple { - public int someInt; - public String someString = "abc"; - public Tuple2<Long, Long> theTuple; - public PojoContainingTuple() {} - public PojoContainingTuple(int i, long l1, long l2) { - someInt = i; - theTuple = new Tuple2<Long, Long>(l1, l2); - } - - @Override - public boolean equals(Object obj) { - if(obj instanceof PojoContainingTuple) { - PojoContainingTuple other = (PojoContainingTuple) obj; - return someInt == other.someInt && theTuple.equals(other.theTuple); - } - return false; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java deleted file mode 100644 index d405412..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -public class PojoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - TypeInformation<T> typeInfo = TypeExtractor.getForClass(type); - return typeInfo.createSerializer(new ExecutionConfig()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java deleted file mode 100644 index b1467b9..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Random; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.base.Objects; - -/** - * A test for the {@link PojoSerializer}. - */ -public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> { - private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); - - @Override - protected TypeSerializer<TestUserClass> createSerializer() { - TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); - assert(serializer instanceof PojoSerializer); - return serializer; - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class<TestUserClass> getTypeClass() { - return TestUserClass.class; - } - - @Override - protected TestUserClass[] getTestData() { - Random rnd = new Random(874597969123412341L); - - return new TestUserClass[]{ - new TestUserClass(rnd.nextInt(), "foo", rnd.nextDouble(), new int[]{1, 2, 3}, new Date(), - new NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 11, 12})), - new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6}, null, - new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22})), - new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null, null), - new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6}, new Date(), - new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22})) - }; - - } - - // User code class for testing the serializer - public static class TestUserClass { - public int dumm1; - public String dumm2; - public double dumm3; - public int[] dumm4; - public Date dumm5; - - public NestedTestUserClass nestedClass; - - public TestUserClass() { - } - - public TestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4, Date dumm5, NestedTestUserClass nestedClass) { - this.dumm1 = dumm1; - this.dumm2 = dumm2; - this.dumm3 = dumm3; - this.dumm4 = dumm4; - this.dumm5 = dumm5; - this.nestedClass = nestedClass; - } - - @Override - public int hashCode() { - return Objects.hashCode(dumm1, dumm2, dumm3, dumm4, nestedClass); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClass)) { - return false; - } - TestUserClass otherTUC = (TestUserClass) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if ((dumm2 == null && otherTUC.dumm2 != null) - || (dumm2 != null && !dumm2.equals(otherTUC.dumm2))) { - return false; - } - if (dumm3 != otherTUC.dumm3) { - return false; - } - if ((dumm4 != null && otherTUC.dumm4 == null) - || (dumm4 == null && otherTUC.dumm4 != null) - || (dumm4 != null && otherTUC.dumm4 != null && dumm4.length != otherTUC.dumm4.length)) { - return false; - } - if (dumm4 != null && otherTUC.dumm4 != null) { - for (int i = 0; i < dumm4.length; i++) { - if (dumm4[i] != otherTUC.dumm4[i]) { - return false; - } - } - } - - if ((nestedClass == null && otherTUC.nestedClass != null) - || (nestedClass != null && !nestedClass.equals(otherTUC.nestedClass))) { - return false; - } - return true; - } - } - - public static class NestedTestUserClass { - public int dumm1; - public String dumm2; - public double dumm3; - public int[] dumm4; - - public NestedTestUserClass() { - } - - public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) { - this.dumm1 = dumm1; - this.dumm2 = dumm2; - this.dumm3 = dumm3; - this.dumm4 = dumm4; - } - - @Override - public int hashCode() { - return Objects.hashCode(dumm1, dumm2, dumm3, dumm4); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof NestedTestUserClass)) { - return false; - } - NestedTestUserClass otherTUC = (NestedTestUserClass) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - if (dumm3 != otherTUC.dumm3) { - return false; - } - if (dumm4.length != otherTUC.dumm4.length) { - return false; - } - for (int i = 0; i < dumm4.length; i++) { - if (dumm4[i] != otherTUC.dumm4[i]) { - return false; - } - } - return true; - } - } - - /** - * This tests if the hashes returned by the pojo and tuple comparators are the same - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testTuplePojoTestEquality() { - - // test with a simple, string-key first. - PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type; - List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>(); - pType.getFlatFields("nestedClass.dumm2", 0, result); - int[] fields = new int[1]; // see below - fields[0] = result.get(0).getPosition(); - TypeComparator<TestUserClass> pojoComp = pType.createComparator( fields, new boolean[]{true}, 0, new ExecutionConfig()); - - TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, new int[] {1,2,3}, new Date(), new NestedTestUserClass(1, "haha", 4d, new int[] {5,4,3})); - int pHash = pojoComp.hash(pojoTestRecord); - - Tuple1<String> tupleTest = new Tuple1<String>("haha"); - TupleTypeInfo<Tuple1<String>> tType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor.getForObject(tupleTest); - TypeComparator<Tuple1<String>> tupleComp = tType.createComparator(new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); - - int tHash = tupleComp.hash(tupleTest); - - Assert.assertTrue("The hashing for tuples and pojos must be the same, so that they are mixable", pHash == tHash); - - Tuple3<Integer, String, Double> multiTupleTest = new Tuple3<Integer, String, Double>(1, "haha", 4d); // its important here to use the same values. - TupleTypeInfo<Tuple3<Integer, String, Double>> multiTupleType = (TupleTypeInfo<Tuple3<Integer, String, Double>>)TypeExtractor.getForObject(multiTupleTest); - - ExpressionKeys fieldKey = new ExpressionKeys(new int[]{1,0,2}, multiTupleType); - ExpressionKeys expressKey = new ExpressionKeys(new String[] {"nestedClass.dumm2", "nestedClass.dumm1", "nestedClass.dumm3"}, pType); - try { - Assert.assertTrue("Expecting the keys to be compatible", fieldKey.areCompatible(expressKey)); - } catch (IncompatibleKeysException e) { - e.printStackTrace(); - Assert.fail("Keys must be compatible: "+e.getMessage()); - } - TypeComparator<TestUserClass> multiPojoComp = pType.createComparator( expressKey.computeLogicalKeyPositions(), new boolean[]{true, true, true}, 0, new ExecutionConfig()); - int multiPojoHash = multiPojoComp.hash(pojoTestRecord); - - - // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double). - TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp = multiTupleType.createComparator(fieldKey.computeLogicalKeyPositions(), new boolean[] {true, true,true}, 0, new ExecutionConfig()); - int multiTupleHash = multiTupleComp.hash(multiTupleTest); - - Assert.assertTrue("The hashing for tuples and pojos must be the same, so that they are mixable. Also for those with multiple key fields", multiPojoHash == multiTupleHash); - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java deleted file mode 100644 index 3a03683..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.junit.Assert; - -import java.util.Arrays; - - -public class PojoSubclassComparatorTest extends ComparatorTestBase<PojoContainingTuple> { - TypeInformation<PojoContainingTuple> type = TypeExtractor.getForClass(PojoContainingTuple.class); - - PojoContainingTuple[] data = new PojoContainingTuple[]{ - new Subclass(1, 1L, 1L, 17L), - new Subclass(2, 2L, 2L, 42L), - new Subclass(8519, 85190L, 85190L, 117L), - new Subclass(8520, 85191L, 85191L, 93L), - }; - - @Override - protected TypeComparator<PojoContainingTuple> createComparator(boolean ascending) { - Assert.assertTrue(type instanceof CompositeType); - CompositeType<PojoContainingTuple> cType = (CompositeType<PojoContainingTuple>) type; - ExpressionKeys<PojoContainingTuple> keys = new ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType); - boolean[] orders = new boolean[keys.getNumberOfKeyFields()]; - Arrays.fill(orders, ascending); - return cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new ExecutionConfig()); - } - - @Override - protected TypeSerializer<PojoContainingTuple> createSerializer() { - return type.createSerializer(new ExecutionConfig()); - } - - @Override - protected PojoContainingTuple[] getSortedTestData() { - return data; - } - - public static class Subclass extends PojoContainingTuple { - - public long additionalField; - - public Subclass() { - } - - public Subclass(int i, long l1, long l2, long additionalField) { - super(i, l1, l2); - this.additionalField = additionalField; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java deleted file mode 100644 index 8c61a19..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.google.common.base.Objects; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.junit.Test; - -import java.util.Random; - -/** - * A test for the {@link PojoSerializer}. - */ -public class PojoSubclassSerializerTest extends SerializerTestBase<PojoSubclassSerializerTest.TestUserClassBase> { - private TypeInformation<TestUserClassBase> type = TypeExtractor.getForClass(TestUserClassBase.class); - - @Override - protected TypeSerializer<TestUserClassBase> createSerializer() { - // only register one of the three child classes, the third child class is NO POJO - ExecutionConfig conf = new ExecutionConfig(); - conf.registerPojoType(TestUserClass1.class); - TypeSerializer<TestUserClassBase> serializer = type.createSerializer(conf); - assert(serializer instanceof PojoSerializer); - return serializer; - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class<TestUserClassBase> getTypeClass() { - return TestUserClassBase.class; - } - - @Override - protected TestUserClassBase[] getTestData() { - Random rnd = new Random(874597969123412341L); - - return new TestUserClassBase[]{ - new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), - new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()), - new TestUserClass3(rnd.nextInt(), "bar", rnd.nextFloat()) - }; - - } - - @Override - @Test - public void testInstantiate() { - // don't do anything, since the PojoSerializer with subclass will return null - } - - // User code class for testing the serializer - public static abstract class TestUserClassBase { - public int dumm1; - public String dumm2; - - - public TestUserClassBase() { - } - - public TestUserClassBase(int dumm1, String dumm2) { - this.dumm1 = dumm1; - this.dumm2 = dumm2; - } - - @Override - public int hashCode() { - return Objects.hashCode(dumm1, dumm2); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClassBase)) { - return false; - } - TestUserClassBase otherTUC = (TestUserClassBase) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - return true; - } - } - - public static class TestUserClass1 extends TestUserClassBase { - public long dumm3; - - public TestUserClass1() { - } - - public TestUserClass1(int dumm1, String dumm2, long dumm3) { - super(dumm1, dumm2); - this.dumm3 = dumm3; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClass1)) { - return false; - } - TestUserClass1 otherTUC = (TestUserClass1) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - if (dumm3 != otherTUC.dumm3) { - return false; - } - return true; - } - } - - public static class TestUserClass2 extends TestUserClassBase { - public float dumm4; - - public TestUserClass2() { - } - - public TestUserClass2(int dumm1, String dumm2, float dumm4) { - super(dumm1, dumm2); - this.dumm4 = dumm4; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClass2)) { - return false; - } - TestUserClass2 otherTUC = (TestUserClass2) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - if (dumm4 != otherTUC.dumm4) { - return false; - } - return true; - } - } - - public static class TestUserClass3 extends TestUserClassBase { - public float dumm4; - - public TestUserClass3(int dumm1, String dumm2, float dumm4) { - super(dumm1, dumm2); - this.dumm4 = dumm4; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClass3)) { - return false; - } - TestUserClass3 otherTUC = (TestUserClass3) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - if (dumm4 != otherTUC.dumm4) { - return false; - } - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java deleted file mode 100644 index 7c608f2..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Writable; - -public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> { - - private String[] array = new String[0]; - - public StringArrayWritable() { - super(); - } - - public StringArrayWritable(String[] array) { - this.array = array; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(this.array.length); - - for(String str : this.array) { - byte[] b = str.getBytes(); - out.writeInt(b.length); - out.write(b); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - this.array = new String[in.readInt()]; - - for(int i = 0; i < this.array.length; i++) { - byte[] b = new byte[in.readInt()]; - in.readFully(b); - this.array[i] = new String(b); - } - } - - @Override - public int compareTo(StringArrayWritable o) { - if(this.array.length != o.array.length) { - return this.array.length - o.array.length; - } - - for(int i = 0; i < this.array.length; i++) { - int comp = this.array[i].compareTo(o.array[i]); - if(comp != 0) { - return comp; - } - } - return 0; - } - - @Override - public boolean equals(Object obj) { - if(!(obj instanceof StringArrayWritable)) { - return false; - } - return this.compareTo((StringArrayWritable) obj) == 0; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java deleted file mode 100644 index b797090..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.google.common.base.Objects; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.junit.Test; - -import java.util.Random; - -/** - * Testing the serialization of classes which are subclasses of a class that implements an interface. - */ -public class SubclassFromInterfaceSerializerTest extends SerializerTestBase<SubclassFromInterfaceSerializerTest.TestUserInterface> { - private TypeInformation<TestUserInterface> type = TypeExtractor.getForClass(TestUserInterface.class); - - @Override - protected TypeSerializer<TestUserInterface> createSerializer() { - // only register one of the two child classes - ExecutionConfig conf = new ExecutionConfig(); - conf.registerPojoType(TestUserClass2.class); - TypeSerializer<TestUserInterface> serializer = type.createSerializer(conf); - assert(serializer instanceof KryoSerializer); - return serializer; - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class<TestUserInterface> getTypeClass() { - return TestUserInterface.class; - } - - @Override - protected TestUserInterface[] getTestData() { - Random rnd = new Random(874597969123412341L); - - return new TestUserInterface[]{ - new TestUserClass1(rnd.nextInt(), "foo", rnd.nextLong()), - new TestUserClass2(rnd.nextInt(), "bar", rnd.nextFloat()) - }; - - } - - @Override - @Test - public void testInstantiate() { - // don't do anything, since the PojoSerializer with subclass will return null - } - - public interface TestUserInterface {} - - // User code class for testing the serializer - public static class TestUserClassBase implements TestUserInterface { - public int dumm1; - public String dumm2; - - - public TestUserClassBase() { - } - - public TestUserClassBase(int dumm1, String dumm2) { - this.dumm1 = dumm1; - this.dumm2 = dumm2; - } - - @Override - public int hashCode() { - return Objects.hashCode(dumm1, dumm2); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClassBase)) { - return false; - } - TestUserClassBase otherTUC = (TestUserClassBase) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - return true; - } - } - - public static class TestUserClass1 extends TestUserClassBase { - public long dumm3; - - public TestUserClass1() { - } - - public TestUserClass1(int dumm1, String dumm2, long dumm3) { - super(dumm1, dumm2); - this.dumm3 = dumm3; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClass1)) { - return false; - } - TestUserClass1 otherTUC = (TestUserClass1) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - if (dumm3 != otherTUC.dumm3) { - return false; - } - return true; - } - } - - public static class TestUserClass2 extends TestUserClassBase { - public float dumm4; - - public TestUserClass2() { - } - - public TestUserClass2(int dumm1, String dumm2, float dumm4) { - super(dumm1, dumm2); - this.dumm4 = dumm4; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TestUserClass2)) { - return false; - } - TestUserClass2 otherTUC = (TestUserClass2) other; - if (dumm1 != otherTUC.dumm1) { - return false; - } - if (!dumm2.equals(otherTUC.dumm2)) { - return false; - } - if (dumm4 != otherTUC.dumm4) { - return false; - } - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java deleted file mode 100644 index 87be6db..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; - -public final class TestDataOutputSerializer implements DataOutputView { - - private byte[] buffer; - - private int position; - - private ByteBuffer wrapper; - - private final int maxSize; - - - public TestDataOutputSerializer(int startSize) { - this(startSize, Integer.MAX_VALUE); - } - - public TestDataOutputSerializer(int startSize, int maxSize) { - if (startSize < 1 || startSize > maxSize) { - throw new IllegalArgumentException(); - } - - this.buffer = new byte[startSize]; - this.wrapper = ByteBuffer.wrap(buffer); - this.maxSize = maxSize; - } - - public ByteBuffer wrapAsByteBuffer() { - this.wrapper.position(0); - this.wrapper.limit(this.position); - return this.wrapper; - } - - public byte[] copyByteBuffer() { - byte[] target = new byte[position]; - System.arraycopy(buffer, 0, target, 0, position); - - return target; - } - - public void clear() { - this.position = 0; - } - - public int length() { - return this.position; - } - - @Override - public String toString() { - return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); - } - - // ---------------------------------------------------------------------------------------- - // Data Output - // ---------------------------------------------------------------------------------------- - - @Override - public void write(int b) throws IOException { - if (this.position >= this.buffer.length) { - resize(1); - } - this.buffer[this.position++] = (byte) (b & 0xff); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - if (this.position > this.buffer.length - len) { - resize(len); - } - System.arraycopy(b, off, this.buffer, this.position, len); - this.position += len; - } - - @Override - public void writeBoolean(boolean v) throws IOException { - write(v ? 1 : 0); - } - - @Override - public void writeByte(int v) throws IOException { - write(v); - } - - @Override - public void writeBytes(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - sLen) { - resize(sLen); - } - - for (int i = 0; i < sLen; i++) { - writeByte(s.charAt(i)); - } - this.position += sLen; - } - - @Override - public void writeChar(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) (v >> 8); - this.buffer[this.position++] = (byte) v; - } - - @Override - public void writeChars(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - 2*sLen) { - resize(2*sLen); - } - for (int i = 0; i < sLen; i++) { - writeChar(s.charAt(i)); - } - } - - @Override - public void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - @Override - public void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - @SuppressWarnings("restriction") - @Override - public void writeInt(int v) throws IOException { - if (this.position >= this.buffer.length - 3) { - resize(4); - } - if (LITTLE_ENDIAN) { - v = Integer.reverseBytes(v); - } - UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); - this.position += 4; - } - - @SuppressWarnings("restriction") - @Override - public void writeLong(long v) throws IOException { - if (this.position >= this.buffer.length - 7) { - resize(8); - } - if (LITTLE_ENDIAN) { - v = Long.reverseBytes(v); - } - UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); - this.position += 8; - } - - @Override - public void writeShort(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); - this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff); - } - - @Override - public void writeUTF(String str) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("Encoded string is too long: " + utflen); - } - else if (this.position > this.buffer.length - utflen - 2) { - resize(utflen + 2); - } - - byte[] bytearr = this.buffer; - int count = this.position; - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } - } - - this.position = count; - } - - - private void resize(int minCapacityAdd) throws IOException { - try { - int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); - - if (newLen > maxSize) { - - if (this.buffer.length + minCapacityAdd > maxSize) { - throw new EOFException("Exceeded maximum capacity"); - } - - newLen = maxSize; - } - - final byte[] nb = new byte[newLen]; - System.arraycopy(this.buffer, 0, nb, 0, this.position); - this.buffer = nb; - this.wrapper = ByteBuffer.wrap(this.buffer); - } - catch (NegativeArraySizeException nasex) { - throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); - } - } - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - if(buffer.length - this.position < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - - this.position += numBytes; - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - if(buffer.length - this.position < numBytes){ - throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); - } - - source.read(this.buffer, this.position, numBytes); - this.position += numBytes; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java deleted file mode 100644 index cfc4914..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorILD2Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, Long, Double>(4, 14L, 20.0), - new Tuple3<Integer, Long, Double>(4, 15L, 23.2), - new Tuple3<Integer, Long, Double>(5, 15L, 20.0), - new Tuple3<Integer, Long, Double>(5, 20L, 20.0), - new Tuple3<Integer, Long, Double>(6, 20L, 23.2), - new Tuple3<Integer, Long, Double>(6, 29L, 20.0), - new Tuple3<Integer, Long, Double>(7, 29L, 20.0), - new Tuple3<Integer, Long, Double>(7, 34L, 23.2) - }; - - @Override - protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, Long, Double>>( - new int[]{0, 1}, - new TypeComparator[]{ - new IntComparator(ascending), - new LongComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, Long, Double>>( - (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new LongSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, Long, Double>[] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java deleted file mode 100644 index e5a0e6c..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorILD3Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 23.2), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 20.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 23.2), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(9), 20.0), - new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 20.0), - new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 23.2) - }; - - @Override - protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, Long, Double>>( - new int[]{0, 1, 2}, - new TypeComparator[]{ - new IntComparator(ascending), - new LongComparator(ascending), - new DoubleComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE, DoubleSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, Long, Double>>( - (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new LongSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, Long, Double>[] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java deleted file mode 100644 index a1e6c40..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorILDC3Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(1), 20.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(2), 20.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(10), 23.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(19), 24.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(20), 24.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(24), 25.0), - new Tuple3<Integer, Long, Double>(5, Long.valueOf(25), 25.0) - }; - - @Override - protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, Long, Double>>( - new int[]{2, 0, 1}, - new TypeComparator[]{ - new DoubleComparator(ascending), - new IntComparator(ascending), - new LongComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE, DoubleSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, Long, Double>>( - (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new LongSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, Long, Double>[] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java deleted file mode 100644 index b5c0c1f..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorILDX1Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(5), 23.2), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(10), 24.0), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(19), 23.2), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(20), 24.0), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(24), 20.0), - new Tuple3<Integer, Long, Double>(4, Long.valueOf(25), 23.2) - }; - - @Override - protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, Long, Double>>( - new int[]{1}, - new TypeComparator[]{ - new LongComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, LongSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, Long, Double>>( - (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new LongSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, Long, Double>[] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java deleted file mode 100644 index 793a2f4..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorILDXC2Test extends TupleComparatorTestBase<Tuple3<Integer, Long, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, Long, Double>(4, 4L, 20.0), - new Tuple3<Integer, Long, Double>(4, 5L, 20.0), - new Tuple3<Integer, Long, Double>(4, 3L, 23.0), - new Tuple3<Integer, Long, Double>(4, 19L, 23.0), - new Tuple3<Integer, Long, Double>(4, 17L, 24.0), - new Tuple3<Integer, Long, Double>(4, 18L, 24.0), - new Tuple3<Integer, Long, Double>(4, 24L, 25.0), - new Tuple3<Integer, Long, Double>(4, 25L, 25.0) - }; - - @Override - protected TupleComparator<Tuple3<Integer, Long, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, Long, Double>>( - new int[]{2, 1}, - new TypeComparator[]{ - new DoubleComparator(ascending), - new LongComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, DoubleSerializer.INSTANCE, LongSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, Long, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, Long, Double>>( - (Class<Tuple3<Integer, Long, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new LongSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, Long, Double>[] getSortedTestData() { - return dataISD; - } - -}