http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java deleted file mode 100644 index 8cdee9b..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorISD1Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, String, Double>(4, "hello", 20.0), - new Tuple3<Integer, String, Double>(5, "hello", 23.2), - new Tuple3<Integer, String, Double>(6, "world", 20.0), - new Tuple3<Integer, String, Double>(7, "hello", 20.0), - new Tuple3<Integer, String, Double>(8, "hello", 23.2), - new Tuple3<Integer, String, Double>(9, "world", 20.0), - new Tuple3<Integer, String, Double>(10, "hello", 20.0), - new Tuple3<Integer, String, Double>(11, "hello", 23.2) - }; - - @Override - protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, String, Double>>( - new int[]{0}, - new TypeComparator[]{ new IntComparator(ascending) }, - new TypeSerializer[]{ IntSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, String, Double>>( - (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new StringSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, String, Double>[] getSortedTestData() { - return dataISD; - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java deleted file mode 100644 index 06c292f..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringComparator; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorISD2Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, String, Double>(4, "hello", 20.0), - new Tuple3<Integer, String, Double>(4, "world", 23.2), - new Tuple3<Integer, String, Double>(5, "hello", 20.0), - new Tuple3<Integer, String, Double>(5, "world", 20.0), - new Tuple3<Integer, String, Double>(6, "hello", 23.2), - new Tuple3<Integer, String, Double>(6, "world", 20.0), - new Tuple3<Integer, String, Double>(7, "hello", 20.0), - new Tuple3<Integer, String, Double>(7, "world", 23.2) - }; - - @Override - protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, String, Double>>( - new int[]{0, 1}, - new TypeComparator[]{ - new IntComparator(ascending), - new StringComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, StringSerializer.INSTANCE, DoubleSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, String, Double>>( - (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new StringSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, String, Double>[] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java deleted file mode 100644 index d823a29..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringComparator; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple3; - -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorISD3Test extends TupleComparatorTestBase<Tuple3<Integer, String, Double>> { - - @SuppressWarnings("unchecked") - Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{ - new Tuple3<Integer, String, Double>(4, "hello", 20.0), - new Tuple3<Integer, String, Double>(4, "hello", 23.2), - new Tuple3<Integer, String, Double>(4, "world", 20.0), - new Tuple3<Integer, String, Double>(5, "hello", 20.0), - new Tuple3<Integer, String, Double>(5, "hello", 23.2), - new Tuple3<Integer, String, Double>(5, "world", 20.0), - new Tuple3<Integer, String, Double>(6, "hello", 20.0), - new Tuple3<Integer, String, Double>(6, "hello", 23.2) - }; - - @Override - protected TupleComparator<Tuple3<Integer, String, Double>> createComparator(boolean ascending) { - return new TupleComparator<Tuple3<Integer, String, Double>>( - new int[]{0, 1, 2}, - new TypeComparator[]{ - new IntComparator(ascending), - new StringComparator(ascending), - new DoubleComparator(ascending) - }, - new TypeSerializer[]{ IntSerializer.INSTANCE, StringSerializer.INSTANCE, DoubleSerializer.INSTANCE }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() { - return new TupleSerializer<Tuple3<Integer, String, Double>>( - (Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new IntSerializer(), - new StringSerializer(), - new DoubleSerializer()}); - } - - @Override - protected Tuple3<Integer, String, Double>[] getSortedTestData() { - return dataISD; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java deleted file mode 100644 index cf73be2..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.assertEquals; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringComparator; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorTTT1Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> { - - @SuppressWarnings("unchecked") - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{ - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L)) - - }; - - @SuppressWarnings("unchecked") - @Override - protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator( - boolean ascending) { - return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( - new int[] { 0 }, - new TypeComparator[] { - new TupleComparator<Tuple2<String, Double>>( - new int[] { 0, 1 }, - new TypeComparator[] { - new StringComparator(ascending), - new DoubleComparator(ascending) }, - new TypeSerializer[] { - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE }) }, - new TypeSerializer[] { - new TupleSerializer<Tuple2<String, Double>>( - (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE }), - new TupleSerializer<Tuple2<Long, Long>>( - (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - LongSerializer.INSTANCE, - LongSerializer.INSTANCE }), - new TupleSerializer<Tuple2<Integer, Long>>( - (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - IntSerializer.INSTANCE, - LongSerializer.INSTANCE }) }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() { - return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( - (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new TupleSerializer<Tuple2<String, Double>> ( - (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE - }), - new TupleSerializer<Tuple2<Long, Long>> ( - (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - LongSerializer.INSTANCE, - LongSerializer.INSTANCE - }), - new TupleSerializer<Tuple2<Integer, Long>> ( - (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - IntSerializer.INSTANCE, - LongSerializer.INSTANCE - }) - }); - } - - @Override - protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() { - return this.dataISD; - } - - @Override - protected void deepEquals( - String message, - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should, - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) { - - for (int x = 0; x < should.getArity(); x++) { - // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. - if(should.getField(x) instanceof Tuple2) { - this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x)); - } - else { - assertEquals(message, should.getField(x), is.getField(x)); - } - }// For - } - - protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) { - for (int x = 0; x < should.getArity(); x++) { - assertEquals(message, should.getField(x), is.getField(x)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java deleted file mode 100644 index 4b07c61..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.assertEquals; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringComparator; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorTTT2Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> { - - @SuppressWarnings("unchecked") - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{ - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L)) - - }; - - @SuppressWarnings("unchecked") - @Override - protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator( - boolean ascending) { - return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( - new int[] { 0, 2 }, - new TypeComparator[] { - new TupleComparator<Tuple2<String, Double>>( - new int[] { 0, 1 }, - new TypeComparator[] { - new StringComparator(ascending), - new DoubleComparator(ascending) }, - new TypeSerializer[] { - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE }), - new TupleComparator<Tuple2<Integer, Long>>( - new int[] { 0, 1 }, - new TypeComparator[] { - new IntComparator(ascending), - new LongComparator(ascending) }, - new TypeSerializer[] { - IntSerializer.INSTANCE, - LongSerializer.INSTANCE }) }, - new TypeSerializer[] { - new TupleSerializer<Tuple2<String, Double>>( - (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE }), - new TupleSerializer<Tuple2<Long, Long>>( - (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - LongSerializer.INSTANCE, - LongSerializer.INSTANCE }), - new TupleSerializer<Tuple2<Integer, Long>>( - (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - IntSerializer.INSTANCE, - LongSerializer.INSTANCE }) }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() { - return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( - (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new TupleSerializer<Tuple2<String, Double>> ( - (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE}), - new TupleSerializer<Tuple2<Long, Long>> ( - (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - LongSerializer.INSTANCE, - LongSerializer.INSTANCE}), - new TupleSerializer<Tuple2<Integer, Long>> ( - (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - IntSerializer.INSTANCE, - LongSerializer.INSTANCE}) - }); - } - - @Override - protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() { - return this.dataISD; - } - - @Override - protected void deepEquals( - String message, - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should, - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) { - - for (int x = 0; x < should.getArity(); x++) { - // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. - if(should.getField(x) instanceof Tuple2) { - this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x)); - } - else { - assertEquals(message, should.getField(x), is.getField(x)); - } - }// For - } - - protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) { - for (int x = 0; x < should.getArity(); x++) { - assertEquals(message, should.getField(x), is.getField(x)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java deleted file mode 100644 index 0dfc094..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.assertEquals; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.DoubleComparator; -import org.apache.flink.api.common.typeutils.base.DoubleSerializer; -import org.apache.flink.api.common.typeutils.base.IntComparator; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongComparator; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringComparator; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase; - -public class TupleComparatorTTT3Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>{ - @SuppressWarnings("unchecked") - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{ - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)), - new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L)) - - }; - - @SuppressWarnings("unchecked") - @Override - protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator( - boolean ascending) { - return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( - new int[] { 0, 1, 2 }, - new TypeComparator[] { - new TupleComparator<Tuple2<String, Double>>( - new int[] { 0, 1 }, - new TypeComparator[] { - new StringComparator(ascending), - new DoubleComparator(ascending) }, - new TypeSerializer[] { - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE }), - new TupleComparator<Tuple2<Long, Long>>( - new int[] { 0, 1 }, - new TypeComparator[] { - new LongComparator(ascending), - new LongComparator(ascending) }, - new TypeSerializer[] { - LongSerializer.INSTANCE, - LongSerializer.INSTANCE }), - new TupleComparator<Tuple2<Integer, Long>>( - new int[] { 0, 1 }, - new TypeComparator[] { - new IntComparator(ascending), - new LongComparator(ascending) }, - new TypeSerializer[] { - IntSerializer.INSTANCE, - LongSerializer.INSTANCE }) }, - new TypeSerializer[] { - new TupleSerializer<Tuple2<String, Double>>( - (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE }), - new TupleSerializer<Tuple2<Long, Long>>( - (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - LongSerializer.INSTANCE, - LongSerializer.INSTANCE }), - new TupleSerializer<Tuple2<Integer, Long>>( - (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[] { - IntSerializer.INSTANCE, - LongSerializer.INSTANCE }) }); - } - - @SuppressWarnings("unchecked") - @Override - protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() { - return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>( - (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class, - new TypeSerializer[]{ - new TupleSerializer<Tuple2<String, Double>> ( - (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - StringSerializer.INSTANCE, - DoubleSerializer.INSTANCE - }), - new TupleSerializer<Tuple2<Long, Long>> ( - (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - LongSerializer.INSTANCE, - LongSerializer.INSTANCE - }), - new TupleSerializer<Tuple2<Integer, Long>> ( - (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class, - new TypeSerializer[]{ - IntSerializer.INSTANCE, - LongSerializer.INSTANCE - }) - }); - } - - @Override - protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() { - return this.dataISD; - } - - @Override - protected void deepEquals( - String message, - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should, - Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) { - - for (int x = 0; x < should.getArity(); x++) { - // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields. - if(should.getField(x) instanceof Tuple2) { - this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x)); - } - else { - assertEquals(message, should.getField(x), is.getField(x)); - } - }// For - } - - protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) { - for (int x = 0; x < should.getArity(); x++) { - assertEquals(message, should.getField(x), is.getField(x)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java deleted file mode 100644 index 017eb44..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import java.util.ArrayList; -import java.util.Random; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.util.StringUtils; -import org.junit.Assert; -import org.junit.Test; - -public class TupleSerializerTest { - - @Test - public void testTuple0() { - Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, Tuple0.INSTANCE, Tuple0.INSTANCE }; - - runTests(testTuples); - } - - @Test - public void testTuple1Int() { - @SuppressWarnings({"unchecked", "rawtypes"}) - Tuple1<Integer>[] testTuples = new Tuple1[] { - new Tuple1<Integer>(42), new Tuple1<Integer>(1), new Tuple1<Integer>(0), new Tuple1<Integer>(-1), - new Tuple1<Integer>(Integer.MAX_VALUE), new Tuple1<Integer>(Integer.MIN_VALUE) - }; - - runTests(testTuples); - } - - @Test - public void testTuple1String() { - Random rnd = new Random(68761564135413L); - - @SuppressWarnings({"unchecked", "rawtypes"}) - Tuple1<String>[] testTuples = new Tuple1[] { - new Tuple1<String>(StringUtils.getRandomString(rnd, 10, 100)), - new Tuple1<String>("abc"), - new Tuple1<String>(""), - new Tuple1<String>(StringUtils.getRandomString(rnd, 30, 170)), - new Tuple1<String>(StringUtils.getRandomString(rnd, 15, 50)), - new Tuple1<String>("") - }; - - runTests(testTuples); - } - - @Test - public void testTuple1StringArray() { - Random rnd = new Random(289347567856686223L); - - String[] arr1 = new String[] {"abc", "", - StringUtils.getRandomString(rnd, 10, 100), - StringUtils.getRandomString(rnd, 15, 50), - StringUtils.getRandomString(rnd, 30, 170), - StringUtils.getRandomString(rnd, 14, 15), - ""}; - - String[] arr2 = new String[] {"foo", "", - StringUtils.getRandomString(rnd, 10, 100), - StringUtils.getRandomString(rnd, 1000, 5000), - StringUtils.getRandomString(rnd, 30000, 35000), - StringUtils.getRandomString(rnd, 100*1024, 105*1024), - "bar"}; - - @SuppressWarnings("unchecked") - Tuple1<String[]>[] testTuples = new Tuple1[] { - new Tuple1<String[]>(arr1), - new Tuple1<String[]>(arr2) - }; - - runTests(testTuples); - } - - @Test - public void testTuple2StringDouble() { - Random rnd = new Random(807346528946L); - - @SuppressWarnings("unchecked") - Tuple2<String, Double>[] testTuples = new Tuple2[] { - new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), - new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), - new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), - new Tuple2<String, Double>("", rnd.nextDouble()), - new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()), - new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()) - }; - - runTests(testTuples); - } - - @Test - public void testTuple2StringStringArray() { - Random rnd = new Random(289347567856686223L); - - String[] arr1 = new String[] {"abc", "", - StringUtils.getRandomString(rnd, 10, 100), - StringUtils.getRandomString(rnd, 15, 50), - StringUtils.getRandomString(rnd, 30, 170), - StringUtils.getRandomString(rnd, 14, 15), - ""}; - - String[] arr2 = new String[] {"foo", "", - StringUtils.getRandomString(rnd, 10, 100), - StringUtils.getRandomString(rnd, 1000, 5000), - StringUtils.getRandomString(rnd, 30000, 35000), - StringUtils.getRandomString(rnd, 100*1024, 105*1024), - "bar"}; - - @SuppressWarnings("unchecked") - Tuple2<String, String[]>[] testTuples = new Tuple2[] { - new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1), - new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2), - new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr1), - new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2), - new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2) - }; - - runTests(testTuples); - } - - - @Test - public void testTuple5CustomObjects() { - Random rnd = new Random(807346528946L); - - SimpleTypes a = new SimpleTypes(); - SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), - StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); - SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), - StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); - SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), - StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); - SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), - StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); - SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), - StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); - SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(), - StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble()); - - ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435); - ComplexNestedObject1 o2 = new ComplexNestedObject1(76923); - ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100); - ComplexNestedObject1 o4 = new ComplexNestedObject1(0); - ComplexNestedObject1 o5 = new ComplexNestedObject1(44); - - ComplexNestedObject2 co1 = new ComplexNestedObject2(rnd); - ComplexNestedObject2 co2 = new ComplexNestedObject2(); - ComplexNestedObject2 co3 = new ComplexNestedObject2(rnd); - ComplexNestedObject2 co4 = new ComplexNestedObject2(rnd); - - Book b1 = new Book(976243875L, "The Serialization Odysse", 42); - Book b2 = new Book(0L, "Debugging byte streams", 1337); - Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE); - Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF); - Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00); - Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L); - - ArrayList<String> list = new ArrayList<String>(); - list.add("A"); - list.add("B"); - list.add("C"); - list.add("D"); - list.add("E"); - - BookAuthor ba1 = new BookAuthor(976243875L, list, "Arno Nym"); - - ArrayList<String> list2 = new ArrayList<String>(); - BookAuthor ba2 = new BookAuthor(987654321L, list2, "The Saurus"); - - - @SuppressWarnings("unchecked") - Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>[] testTuples = new Tuple5[] { - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(a, b1, o1, ba1, co1), - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(b, b2, o2, ba2, co2), - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(c, b3, o3, ba1, co3), - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(d, b2, o4, ba1, co4), - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(e, b4, o5, ba2, co4), - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(f, b5, o1, ba2, co4), - new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2) - }; - - runTests(testTuples); - } - - private <T extends Tuple> void runTests(T... instances) { - try { - TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]); - TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig()); - - Class<T> tupleClass = tupleTypeInfo.getTypeClass(); - - int length = -1; - if(tupleClass == Tuple0.class) { - length = 1; - } - TupleSerializerTestInstance<T> test = new TupleSerializerTestInstance<T>(serializer, tupleClass, length, instances); - test.testAll(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java deleted file mode 100644 index a196984..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; - -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple; -import org.junit.Assert; - -public class TupleSerializerTestInstance<T extends Tuple> extends SerializerTestInstance<T> { - - public TupleSerializerTestInstance(TypeSerializer<T> serializer, Class<T> typeClass, int length, T[] testData) { - super(serializer, typeClass, length, testData); - } - - protected void deepEquals(String message, T shouldTuple, T isTuple) { - Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity()); - - for (int i = 0; i < shouldTuple.getArity(); i++) { - Object should = shouldTuple.getField(i); - Object is = isTuple.getField(i); - - if (should.getClass().isArray()) { - if (should instanceof boolean[]) { - Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is)); - } - else if (should instanceof byte[]) { - assertArrayEquals(message, (byte[]) should, (byte[]) is); - } - else if (should instanceof short[]) { - assertArrayEquals(message, (short[]) should, (short[]) is); - } - else if (should instanceof int[]) { - assertArrayEquals(message, (int[]) should, (int[]) is); - } - else if (should instanceof long[]) { - assertArrayEquals(message, (long[]) should, (long[]) is); - } - else if (should instanceof float[]) { - assertArrayEquals(message, (float[]) should, (float[]) is, 0.0f); - } - else if (should instanceof double[]) { - assertArrayEquals(message, (double[]) should, (double[]) is, 0.0); - } - else if (should instanceof char[]) { - assertArrayEquals(message, (char[]) should, (char[]) is); - } - else { - assertArrayEquals(message, (Object[]) should, (Object[]) is); - } - } - else { - assertEquals(message, should, is); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java deleted file mode 100644 index cf9874d..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.types.StringValue; - -public class ValueComparatorTest extends ComparatorTestBase<StringValue> { - - StringValue[] data = new StringValue[]{ - new StringValue(""), - new StringValue("Lorem Ipsum Dolor Omit Longer"), - new StringValue("aaaa"), - new StringValue("abcd"), - new StringValue("abce"), - new StringValue("abdd"), - new StringValue("accd"), - new StringValue("bbcd") - }; - - @Override - protected TypeComparator<StringValue> createComparator(boolean ascending) { - return new ValueComparator<StringValue>(ascending, StringValue.class); - } - - @Override - protected TypeSerializer<StringValue> createSerializer() { - return new ValueSerializer<StringValue>(StringValue.class); - } - - @Override - protected StringValue[] getSortedTestData() { - return data; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java deleted file mode 100644 index a2c3e1e..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.UUID; - -public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> { - @Override - protected TypeComparator<ValueID> createComparator(boolean ascending) { - return new ValueComparator<>(ascending, ValueID.class); - } - - @Override - protected TypeSerializer<ValueID> createSerializer() { - return new ValueSerializer<>(ValueID.class); - } - - @Override - protected ValueID[] getSortedTestData() { - return new ValueID[] { - new ValueID(new UUID(0, 0)), - new ValueID(new UUID(1, 0)), - new ValueID(new UUID(1, 1)) - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java deleted file mode 100644 index d644485..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Value; - -import java.io.IOException; -import java.util.UUID; - -public class ValueID implements Value, Comparable<ValueID> { - private static final long serialVersionUID = -562791433077971752L; - - private UUID id; - - public ValueID() { - id = UUID.randomUUID(); - } - - public ValueID(UUID id) { - this.id = id; - } - - @Override - public int compareTo(ValueID o) { - return id.compareTo(o.id); - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeLong(id.getMostSignificantBits()); - out.writeLong(id.getLeastSignificantBits()); - } - - @Override - public void read(DataInputView in) throws IOException { - id = new UUID(in.readLong(), in.readLong()); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ValueID) { - ValueID other = (ValueID) obj; - - return id.equals(other.id); - } else { - return false; - } - } - - @Override - public int hashCode() { - return id.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java deleted file mode 100644 index 9c07a5e..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.UUID; - -public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> { - @Override - protected TypeSerializer<ValueID> createSerializer() { - return new ValueSerializer<>(ValueID.class); - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class<ValueID> getTypeClass() { - return ValueID.class; - } - - @Override - protected ValueID[] getTestData() { - return new ValueID[] { - new ValueID(new UUID(0, 0)), - new ValueID(new UUID(1, 0)), - new ValueID(new UUID(1, 1)) - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java deleted file mode 100644 index f5a90b7..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> { - - StringArrayWritable[] data = new StringArrayWritable[]{ - new StringArrayWritable(new String[]{}), - new StringArrayWritable(new String[]{""}), - new StringArrayWritable(new String[]{"a","a"}), - new StringArrayWritable(new String[]{"a","b"}), - new StringArrayWritable(new String[]{"c","c"}), - new StringArrayWritable(new String[]{"d","f"}), - new StringArrayWritable(new String[]{"d","m"}), - new StringArrayWritable(new String[]{"z","x"}), - new StringArrayWritable(new String[]{"a","a", "a"}) - }; - - @Override - protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) { - return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class); - } - - @Override - protected TypeSerializer<StringArrayWritable> createSerializer() { - return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class); - } - - @Override - protected StringArrayWritable[] getSortedTestData() { - return data; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java deleted file mode 100644 index 94e759d..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.UUID; - -public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> { - @Override - protected TypeComparator<WritableID> createComparator(boolean ascending) { - return new WritableComparator<>(ascending, WritableID.class); - } - - @Override - protected TypeSerializer<WritableID> createSerializer() { - return new WritableSerializer<>(WritableID.class); - } - - @Override - protected WritableID[] getSortedTestData() { - return new WritableID[] { - new WritableID(new UUID(0, 0)), - new WritableID(new UUID(1, 0)), - new WritableID(new UUID(1, 1)) - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java deleted file mode 100644 index 4274cf6..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.hadoop.io.WritableComparable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.UUID; - -public class WritableID implements WritableComparable<WritableID> { - private UUID uuid; - - public WritableID() { - this.uuid = UUID.randomUUID(); - } - - public WritableID(UUID uuid) { - this.uuid = uuid; - } - - @Override - public int compareTo(WritableID o) { - return this.uuid.compareTo(o.uuid); - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeLong(uuid.getMostSignificantBits()); - dataOutput.writeLong(uuid.getLeastSignificantBits()); - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - this.uuid = new UUID(dataInput.readLong(), dataInput.readLong()); - } - - @Override - public String toString() { - return uuid.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - WritableID id = (WritableID) o; - - return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null); - } - - @Override - public int hashCode() { - return uuid != null ? uuid.hashCode() : 0; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java deleted file mode 100644 index bb5f4d4..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.junit.Test; - -public class WritableSerializerTest { - - @Test - public void testStringArrayWritable() { - StringArrayWritable[] data = new StringArrayWritable[]{ - new StringArrayWritable(new String[]{}), - new StringArrayWritable(new String[]{""}), - new StringArrayWritable(new String[]{"a","a"}), - new StringArrayWritable(new String[]{"a","b"}), - new StringArrayWritable(new String[]{"c","c"}), - new StringArrayWritable(new String[]{"d","f"}), - new StringArrayWritable(new String[]{"d","m"}), - new StringArrayWritable(new String[]{"z","x"}), - new StringArrayWritable(new String[]{"a","a", "a"}) - }; - - WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]); - WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig()); - - SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data); - - testInstance.testAll(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java deleted file mode 100644 index 2af7730..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.UUID; - -public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> { - @Override - protected TypeSerializer<WritableID> createSerializer() { - return new WritableSerializer<>(WritableID.class); - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class<WritableID> getTypeClass() { - return WritableID.class; - } - - @Override - protected WritableID[] getTestData() { - return new WritableID[] { - new WritableID(new UUID(0, 0)), - new WritableID(new UUID(1, 0)), - new WritableID(new UUID(1, 1)) - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java deleted file mode 100644 index 7572408..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; - -public class KryoClearedBufferTest { - - /** - * Tests that the kryo output buffer is cleared in case of an exception. Flink uses the - * EOFException to signal that a buffer is full. In such a case, the record which was tried - * to be written will be rewritten. Therefore, eventually buffered data of this record has - * to be cleared. - */ - @Test - public void testOutputBufferedBeingClearedInCaseOfException() throws Exception { - ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.registerTypeWithKryoSerializer(TestRecord.class, new TestRecordSerializer()); - executionConfig.registerKryoType(TestRecord.class); - - KryoSerializer<TestRecord> kryoSerializer = new KryoSerializer<TestRecord>( - TestRecord.class, - executionConfig); - - int size = 94; - int bufferSize = 150; - - TestRecord testRecord = new TestRecord(size); - - TestDataOutputView target = new TestDataOutputView(bufferSize); - - kryoSerializer.serialize(testRecord, target); - - try { - kryoSerializer.serialize(testRecord, target); - Assert.fail("Expected an EOFException."); - } catch(EOFException eofException) { - // expected exception - // now the Kryo Output should have been cleared - } - - TestRecord actualRecord = kryoSerializer.deserialize( - new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer()))); - - Assert.assertEquals(testRecord, actualRecord); - - target.clear(); - - // if the kryo output has been cleared then we can serialize our test record into the target - // because the target buffer 150 bytes can host one TestRecord (total serialization size 100) - kryoSerializer.serialize(testRecord, target); - - byte[] buffer = target.getBuffer(); - int counter = 0; - - for (int i = 0; i < buffer.length; i++) { - if(buffer[i] == 42) { - counter++; - } - } - - Assert.assertEquals(size, counter); - } - - public static class TestRecord { - private byte[] buffer; - - public TestRecord(int size) { - buffer = new byte[size]; - - Arrays.fill(buffer, (byte)42); - } - - public TestRecord(byte[] buffer){ - this.buffer = buffer; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TestRecord) { - TestRecord record = (TestRecord) obj; - - return Arrays.equals(buffer, record.buffer); - } else { - return false; - } - } - } - - public static class TestRecordSerializer extends Serializer<TestRecord> implements Serializable { - - private static final long serialVersionUID = 6971996565421454985L; - - @Override - public void write(Kryo kryo, Output output, TestRecord object) { - output.writeInt(object.buffer.length); - output.write(object.buffer); - } - - @Override - public TestRecord read(Kryo kryo, Input input, Class<TestRecord> type) { - int length = input.readInt(); - byte[] buffer = input.readBytes(length); - - return new TestRecord(buffer); - } - } - - public static class TestDataOutputView implements DataOutputView { - - private byte[] buffer; - private int position; - - public TestDataOutputView(int size) { - buffer = new byte[size]; - position = 0; - } - - public void clear() { - position = 0; - } - - public byte[] getBuffer() { - return buffer; - } - - public void checkSize(int numBytes) throws EOFException { - if (position + numBytes > buffer.length) { - throw new EOFException(); - } - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - checkSize(numBytes); - - position += numBytes; - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - checkSize(numBytes); - - byte[] tempBuffer = new byte[numBytes]; - - source.read(tempBuffer); - - System.arraycopy(tempBuffer, 0, buffer, position, numBytes); - - position += numBytes; - } - - @Override - public void write(int b) throws IOException { - checkSize(4); - - position += 4; - } - - @Override - public void write(byte[] b) throws IOException { - checkSize(b.length); - - System.arraycopy(b, 0, buffer, position, b.length); - position += b.length; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - checkSize(len); - - System.arraycopy(b, off, buffer, position, len); - - position += len; - } - - @Override - public void writeBoolean(boolean v) throws IOException { - checkSize(1); - position += 1; - } - - @Override - public void writeByte(int v) throws IOException { - checkSize(1); - - buffer[position] = (byte)v; - - position++; - } - - @Override - public void writeShort(int v) throws IOException { - checkSize(2); - - position += 2; - } - - @Override - public void writeChar(int v) throws IOException { - checkSize(1); - position++; - } - - @Override - public void writeInt(int v) throws IOException { - checkSize(4); - - position += 4; - } - - @Override - public void writeLong(long v) throws IOException { - checkSize(8); - position += 8; - } - - @Override - public void writeFloat(float v) throws IOException { - checkSize(4); - position += 4; - } - - @Override - public void writeDouble(double v) throws IOException { - checkSize(8); - position += 8; - } - - @Override - public void writeBytes(String s) throws IOException { - byte[] sBuffer = s.getBytes(); - checkSize(sBuffer.length); - System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length); - position += sBuffer.length; - } - - @Override - public void writeChars(String s) throws IOException { - byte[] sBuffer = s.getBytes(); - checkSize(sBuffer.length); - System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length); - position += sBuffer.length; - } - - @Override - public void writeUTF(String s) throws IOException { - byte[] sBuffer = s.getBytes(); - checkSize(sBuffer.length); - System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length); - position += sBuffer.length; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java deleted file mode 100644 index 9d3eef1..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest; - -public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest { - @Override - protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { - return new KryoSerializer<T>(type, new ExecutionConfig()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java deleted file mode 100644 index 19aed78..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest; - -public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type, new ExecutionConfig()); - } -} \ No newline at end of file