http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java new file mode 100644 index 0000000..a196984 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; + +import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.junit.Assert; + +public class TupleSerializerTestInstance<T extends Tuple> extends SerializerTestInstance<T> { + + public TupleSerializerTestInstance(TypeSerializer<T> serializer, Class<T> typeClass, int length, T[] testData) { + super(serializer, typeClass, length, testData); + } + + protected void deepEquals(String message, T shouldTuple, T isTuple) { + Assert.assertEquals(shouldTuple.getArity(), isTuple.getArity()); + + for (int i = 0; i < shouldTuple.getArity(); i++) { + Object should = shouldTuple.getField(i); + Object is = isTuple.getField(i); + + if (should.getClass().isArray()) { + if (should instanceof boolean[]) { + Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is)); + } + else if (should instanceof byte[]) { + assertArrayEquals(message, (byte[]) should, (byte[]) is); + } + else if (should instanceof short[]) { + assertArrayEquals(message, (short[]) should, (short[]) is); + } + else if (should instanceof int[]) { + assertArrayEquals(message, (int[]) should, (int[]) is); + } + else if (should instanceof long[]) { + assertArrayEquals(message, (long[]) should, (long[]) is); + } + else if (should instanceof float[]) { + assertArrayEquals(message, (float[]) should, (float[]) is, 0.0f); + } + else if (should instanceof double[]) { + assertArrayEquals(message, (double[]) should, (double[]) is, 0.0); + } + else if (should instanceof char[]) { + assertArrayEquals(message, (char[]) should, (char[]) is); + } + else { + assertArrayEquals(message, (Object[]) should, (Object[]) is); + } + } + else { + assertEquals(message, should, is); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java new file mode 100644 index 0000000..cf9874d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.StringValue; + +public class ValueComparatorTest extends ComparatorTestBase<StringValue> { + + StringValue[] data = new StringValue[]{ + new StringValue(""), + new StringValue("Lorem Ipsum Dolor Omit Longer"), + new StringValue("aaaa"), + new StringValue("abcd"), + new StringValue("abce"), + new StringValue("abdd"), + new StringValue("accd"), + new StringValue("bbcd") + }; + + @Override + protected TypeComparator<StringValue> createComparator(boolean ascending) { + return new ValueComparator<StringValue>(ascending, StringValue.class); + } + + @Override + protected TypeSerializer<StringValue> createSerializer() { + return new ValueSerializer<StringValue>(StringValue.class); + } + + @Override + protected StringValue[] getSortedTestData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java new file mode 100644 index 0000000..a2c3e1e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.UUID; + +public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> { + @Override + protected TypeComparator<ValueID> createComparator(boolean ascending) { + return new ValueComparator<>(ascending, ValueID.class); + } + + @Override + protected TypeSerializer<ValueID> createSerializer() { + return new ValueSerializer<>(ValueID.class); + } + + @Override + protected ValueID[] getSortedTestData() { + return new ValueID[] { + new ValueID(new UUID(0, 0)), + new ValueID(new UUID(1, 0)), + new ValueID(new UUID(1, 1)) + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java new file mode 100644 index 0000000..d644485 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Value; + +import java.io.IOException; +import java.util.UUID; + +public class ValueID implements Value, Comparable<ValueID> { + private static final long serialVersionUID = -562791433077971752L; + + private UUID id; + + public ValueID() { + id = UUID.randomUUID(); + } + + public ValueID(UUID id) { + this.id = id; + } + + @Override + public int compareTo(ValueID o) { + return id.compareTo(o.id); + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + } + + @Override + public void read(DataInputView in) throws IOException { + id = new UUID(in.readLong(), in.readLong()); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ValueID) { + ValueID other = (ValueID) obj; + + return id.equals(other.id); + } else { + return false; + } + } + + @Override + public int hashCode() { + return id.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java new file mode 100644 index 0000000..9c07a5e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.UUID; + +public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> { + @Override + protected TypeSerializer<ValueID> createSerializer() { + return new ValueSerializer<>(ValueID.class); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<ValueID> getTypeClass() { + return ValueID.class; + } + + @Override + protected ValueID[] getTestData() { + return new ValueID[] { + new ValueID(new UUID(0, 0)), + new ValueID(new UUID(1, 0)), + new ValueID(new UUID(1, 1)) + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java new file mode 100644 index 0000000..f5a90b7 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> { + + StringArrayWritable[] data = new StringArrayWritable[]{ + new StringArrayWritable(new String[]{}), + new StringArrayWritable(new String[]{""}), + new StringArrayWritable(new String[]{"a","a"}), + new StringArrayWritable(new String[]{"a","b"}), + new StringArrayWritable(new String[]{"c","c"}), + new StringArrayWritable(new String[]{"d","f"}), + new StringArrayWritable(new String[]{"d","m"}), + new StringArrayWritable(new String[]{"z","x"}), + new StringArrayWritable(new String[]{"a","a", "a"}) + }; + + @Override + protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) { + return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class); + } + + @Override + protected TypeSerializer<StringArrayWritable> createSerializer() { + return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class); + } + + @Override + protected StringArrayWritable[] getSortedTestData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java new file mode 100644 index 0000000..94e759d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.UUID; + +public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> { + @Override + protected TypeComparator<WritableID> createComparator(boolean ascending) { + return new WritableComparator<>(ascending, WritableID.class); + } + + @Override + protected TypeSerializer<WritableID> createSerializer() { + return new WritableSerializer<>(WritableID.class); + } + + @Override + protected WritableID[] getSortedTestData() { + return new WritableID[] { + new WritableID(new UUID(0, 0)), + new WritableID(new UUID(1, 0)), + new WritableID(new UUID(1, 1)) + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java new file mode 100644 index 0000000..4274cf6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.UUID; + +public class WritableID implements WritableComparable<WritableID> { + private UUID uuid; + + public WritableID() { + this.uuid = UUID.randomUUID(); + } + + public WritableID(UUID uuid) { + this.uuid = uuid; + } + + @Override + public int compareTo(WritableID o) { + return this.uuid.compareTo(o.uuid); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(uuid.getMostSignificantBits()); + dataOutput.writeLong(uuid.getLeastSignificantBits()); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + this.uuid = new UUID(dataInput.readLong(), dataInput.readLong()); + } + + @Override + public String toString() { + return uuid.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + WritableID id = (WritableID) o; + + return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null); + } + + @Override + public int hashCode() { + return uuid != null ? uuid.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java new file mode 100644 index 0000000..bb5f4d4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.WritableTypeInfo; +import org.junit.Test; + +public class WritableSerializerTest { + + @Test + public void testStringArrayWritable() { + StringArrayWritable[] data = new StringArrayWritable[]{ + new StringArrayWritable(new String[]{}), + new StringArrayWritable(new String[]{""}), + new StringArrayWritable(new String[]{"a","a"}), + new StringArrayWritable(new String[]{"a","b"}), + new StringArrayWritable(new String[]{"c","c"}), + new StringArrayWritable(new String[]{"d","f"}), + new StringArrayWritable(new String[]{"d","m"}), + new StringArrayWritable(new String[]{"z","x"}), + new StringArrayWritable(new String[]{"a","a", "a"}) + }; + + WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]); + WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig()); + + SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data); + + testInstance.testAll(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java new file mode 100644 index 0000000..2af7730 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.UUID; + +public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> { + @Override + protected TypeSerializer<WritableID> createSerializer() { + return new WritableSerializer<>(WritableID.class); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<WritableID> getTypeClass() { + return WritableID.class; + } + + @Override + protected WritableID[] getTestData() { + return new WritableID[] { + new WritableID(new UUID(0, 0)), + new WritableID(new UUID(1, 0)), + new WritableID(new UUID(1, 1)) + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java new file mode 100644 index 0000000..7572408 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +public class KryoClearedBufferTest { + + /** + * Tests that the kryo output buffer is cleared in case of an exception. Flink uses the + * EOFException to signal that a buffer is full. In such a case, the record which was tried + * to be written will be rewritten. Therefore, eventually buffered data of this record has + * to be cleared. + */ + @Test + public void testOutputBufferedBeingClearedInCaseOfException() throws Exception { + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.registerTypeWithKryoSerializer(TestRecord.class, new TestRecordSerializer()); + executionConfig.registerKryoType(TestRecord.class); + + KryoSerializer<TestRecord> kryoSerializer = new KryoSerializer<TestRecord>( + TestRecord.class, + executionConfig); + + int size = 94; + int bufferSize = 150; + + TestRecord testRecord = new TestRecord(size); + + TestDataOutputView target = new TestDataOutputView(bufferSize); + + kryoSerializer.serialize(testRecord, target); + + try { + kryoSerializer.serialize(testRecord, target); + Assert.fail("Expected an EOFException."); + } catch(EOFException eofException) { + // expected exception + // now the Kryo Output should have been cleared + } + + TestRecord actualRecord = kryoSerializer.deserialize( + new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer()))); + + Assert.assertEquals(testRecord, actualRecord); + + target.clear(); + + // if the kryo output has been cleared then we can serialize our test record into the target + // because the target buffer 150 bytes can host one TestRecord (total serialization size 100) + kryoSerializer.serialize(testRecord, target); + + byte[] buffer = target.getBuffer(); + int counter = 0; + + for (int i = 0; i < buffer.length; i++) { + if(buffer[i] == 42) { + counter++; + } + } + + Assert.assertEquals(size, counter); + } + + public static class TestRecord { + private byte[] buffer; + + public TestRecord(int size) { + buffer = new byte[size]; + + Arrays.fill(buffer, (byte)42); + } + + public TestRecord(byte[] buffer){ + this.buffer = buffer; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TestRecord) { + TestRecord record = (TestRecord) obj; + + return Arrays.equals(buffer, record.buffer); + } else { + return false; + } + } + } + + public static class TestRecordSerializer extends Serializer<TestRecord> implements Serializable { + + private static final long serialVersionUID = 6971996565421454985L; + + @Override + public void write(Kryo kryo, Output output, TestRecord object) { + output.writeInt(object.buffer.length); + output.write(object.buffer); + } + + @Override + public TestRecord read(Kryo kryo, Input input, Class<TestRecord> type) { + int length = input.readInt(); + byte[] buffer = input.readBytes(length); + + return new TestRecord(buffer); + } + } + + public static class TestDataOutputView implements DataOutputView { + + private byte[] buffer; + private int position; + + public TestDataOutputView(int size) { + buffer = new byte[size]; + position = 0; + } + + public void clear() { + position = 0; + } + + public byte[] getBuffer() { + return buffer; + } + + public void checkSize(int numBytes) throws EOFException { + if (position + numBytes > buffer.length) { + throw new EOFException(); + } + } + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + checkSize(numBytes); + + position += numBytes; + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + checkSize(numBytes); + + byte[] tempBuffer = new byte[numBytes]; + + source.read(tempBuffer); + + System.arraycopy(tempBuffer, 0, buffer, position, numBytes); + + position += numBytes; + } + + @Override + public void write(int b) throws IOException { + checkSize(4); + + position += 4; + } + + @Override + public void write(byte[] b) throws IOException { + checkSize(b.length); + + System.arraycopy(b, 0, buffer, position, b.length); + position += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkSize(len); + + System.arraycopy(b, off, buffer, position, len); + + position += len; + } + + @Override + public void writeBoolean(boolean v) throws IOException { + checkSize(1); + position += 1; + } + + @Override + public void writeByte(int v) throws IOException { + checkSize(1); + + buffer[position] = (byte)v; + + position++; + } + + @Override + public void writeShort(int v) throws IOException { + checkSize(2); + + position += 2; + } + + @Override + public void writeChar(int v) throws IOException { + checkSize(1); + position++; + } + + @Override + public void writeInt(int v) throws IOException { + checkSize(4); + + position += 4; + } + + @Override + public void writeLong(long v) throws IOException { + checkSize(8); + position += 8; + } + + @Override + public void writeFloat(float v) throws IOException { + checkSize(4); + position += 4; + } + + @Override + public void writeDouble(double v) throws IOException { + checkSize(8); + position += 8; + } + + @Override + public void writeBytes(String s) throws IOException { + byte[] sBuffer = s.getBytes(); + checkSize(sBuffer.length); + System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length); + position += sBuffer.length; + } + + @Override + public void writeChars(String s) throws IOException { + byte[] sBuffer = s.getBytes(); + checkSize(sBuffer.length); + System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length); + position += sBuffer.length; + } + + @Override + public void writeUTF(String s) throws IOException { + byte[] sBuffer = s.getBytes(); + checkSize(sBuffer.length); + System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length); + position += sBuffer.length; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java new file mode 100644 index 0000000..9d3eef1 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericArraySerializerTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest; + +public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest { + @Override + protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { + return new KryoSerializer<T>(type, new ExecutionConfig()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java new file mode 100644 index 0000000..19aed78 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeComparatorTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest; + +public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { + @Override + protected <T> TypeSerializer<T> createSerializer(Class<T> type) { + return new KryoSerializer<T>(type, new ExecutionConfig()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java new file mode 100644 index 0000000..8ff0b1b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import com.esotericsoftware.kryo.Kryo; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; +import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Random; + +import static org.junit.Assert.*; + +@SuppressWarnings("unchecked") +public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { + + ExecutionConfig ec = new ExecutionConfig(); + + @Test + public void testJavaList(){ + Collection<Integer> a = new ArrayList<>(); + + fillCollection(a); + + runTests(a); + } + + @Test + public void testJavaSet(){ + Collection<Integer> b = new HashSet<>(); + + fillCollection(b); + + runTests(b); + } + + + + @Test + public void testJavaDequeue(){ + Collection<Integer> c = new LinkedList<>(); + fillCollection(c); + runTests(c); + } + + private void fillCollection(Collection<Integer> coll) { + coll.add(42); + coll.add(1337); + coll.add(49); + coll.add(1); + } + + @Override + protected <T> TypeSerializer<T> createSerializer(Class<T> type) { + return new KryoSerializer<T>(type, ec); + } + + /** + * Make sure that the kryo serializer forwards EOF exceptions properly when serializing + */ + @Test + public void testForwardEOFExceptionWhileSerializing() { + try { + // construct a long string + String str; + { + char[] charData = new char[40000]; + Random rnd = new Random(); + + for (int i = 0; i < charData.length; i++) { + charData[i] = (char) rnd.nextInt(10000); + } + + str = new String(charData); + } + + // construct a memory target that is too small for the string + TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); + KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig()); + + try { + serializer.serialize(str, target); + fail("should throw a java.io.EOFException"); + } + catch (java.io.EOFException e) { + // that is how we like it + } + catch (Exception e) { + fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Make sure that the kryo serializer forwards EOF exceptions properly when serializing + */ + @Test + public void testForwardEOFExceptionWhileDeserializing() { + try { + int numElements = 100; + // construct a memory target that is too small for the string + TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements); + KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); + + for(int i = 0; i < numElements; i++){ + serializer.serialize(i, target); + } + + ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer()); + + for(int i = 0; i < numElements; i++){ + int value = serializer.deserialize(source); + assertEquals(i, value); + } + + try { + serializer.deserialize(source); + fail("should throw a java.io.EOFException"); + } + catch (java.io.EOFException e) { + // that is how we like it :-) + } + catch (Exception e) { + fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void validateReferenceMappingDisabled() { + KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); + Kryo kryo = serializer.getKryo(); + assertFalse(kryo.getReferences()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java new file mode 100644 index 0000000..d68afd6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoWithCustomSerializersTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import java.util.Collection; +import java.util.HashSet; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; +import org.joda.time.LocalDate; +import org.junit.Test; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +@SuppressWarnings("unchecked") +public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializerTest { + + + @Test + public void testJodaTime(){ + Collection<LocalDate> b = new HashSet<LocalDate>(); + + b.add(new LocalDate(1L)); + b.add(new LocalDate(2L)); + + runTests(b); + } + + @Override + protected <T> TypeSerializer<T> createSerializer(Class<T> type) { + ExecutionConfig conf = new ExecutionConfig(); + conf.registerTypeWithKryoSerializer(LocalDate.class, LocalDateSerializer.class); + TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type); + return typeInfo.createSerializer(conf); + } + + public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public void write(Kryo kryo, Output output, LocalDate object) { + output.writeInt(object.getYear()); + output.writeInt(object.getMonthOfYear()); + output.writeInt(object.getDayOfMonth()); + } + + @Override + public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) { + return new LocalDate(input.readInt(), input.readInt(), input.readInt()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java new file mode 100644 index 0000000..7c6d023 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.kryo; + +import org.apache.flink.api.common.ExecutionConfig; + +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; + +import static org.junit.Assert.assertTrue; + +public class SerializersTest { + + // recursive + public static class Node { + private Node parent; + } + + public static class FromNested { + Node recurseMe; + } + + public static class FromGeneric1 {} + public static class FromGeneric2 {} + + public static class Nested1 { + private FromNested fromNested; + private Path yodaIntervall; + } + + public static class ClassWithNested { + + Nested1 nested; + int ab; + + ArrayList<FromGeneric1> addGenType; + FromGeneric2[] genericArrayType; + } + + @Test + public void testTypeRegistration() { + ExecutionConfig conf = new ExecutionConfig(); + Serializers.recursivelyRegisterType(ClassWithNested.class, conf, new HashSet<Class<?>>()); + + KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type. + + Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0); + Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0); + Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0); + + // check if the generic type from one field is also registered (its very likely that + // generic types are also used as fields somewhere. + Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0); + Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0); + Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0); + + + // register again and make sure classes are still registered + ExecutionConfig conf2 = new ExecutionConfig(); + Serializers.recursivelyRegisterType(ClassWithNested.class, conf2, new HashSet<Class<?>>()); + KryoSerializer<String> kryo2 = new KryoSerializer<>(String.class, conf); + assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0); + } + + @Test + public void testTypeRegistrationFromTypeInfo() { + ExecutionConfig conf = new ExecutionConfig(); + Serializers.recursivelyRegisterType(new GenericTypeInfo<>(ClassWithNested.class), conf, new HashSet<Class<?>>()); + + KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type. + + assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0); + + // check if the generic type from one field is also registered (its very likely that + // generic types are also used as fields somewhere. + assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java new file mode 100644 index 0000000..faab26a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TupleComparatorTestBase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.tuple.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; + +import static org.junit.Assert.assertEquals; + +public abstract class TupleComparatorTestBase<T extends Tuple> extends ComparatorTestBase<T> { + + @Override + protected void deepEquals(String message, T should, T is) { + for (int x = 0; x < should.getArity(); x++) { + assertEquals(should.getField(x), is.getField(x)); + } + } + + @Override + protected abstract TupleComparator<T> createComparator(boolean ascending); + + @Override + protected abstract TupleSerializer<T> createSerializer(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java new file mode 100644 index 0000000..1d414d8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/tuple/base/TuplePairComparatorTestBase.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime.tuple.base; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +/** + * Abstract test base for TuplePairComparators. + * + * @param <T> + * @param <R> + */ +public abstract class TuplePairComparatorTestBase<T extends Tuple, R extends Tuple> extends TestLogger { + + protected abstract TypePairComparator<T, R> createComparator(boolean ascending); + + protected abstract Tuple2<T[], R[]> getSortedTestData(); + + @Test + public void testEqualityWithReference() { + try { + TypePairComparator<T, R> comparator = getComparator(true); + Tuple2<T[], R[]> data = getSortedData(); + for (int x = 0; x < data.f0.length; x++) { + comparator.setReference(data.f0[x]); + + assertTrue(comparator.equalToReference(data.f1[x])); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Exception in test: " + e.getMessage()); + } + } + + @Test + public void testInequalityWithReference() { + testGreatSmallAscDescWithReference(true); + testGreatSmallAscDescWithReference(false); + } + + protected void testGreatSmallAscDescWithReference(boolean ascending) { + try { + Tuple2<T[], R[]> data = getSortedData(); + + TypePairComparator<T, R> comparator = getComparator(ascending); + + //compares every element in high with every element in low + for (int x = 0; x < data.f0.length - 1; x++) { + for (int y = x + 1; y < data.f1.length; y++) { + comparator.setReference(data.f0[x]); + if (ascending) { + assertTrue(comparator.compareToReference(data.f1[y]) > 0); + } else { + assertTrue(comparator.compareToReference(data.f1[y]) < 0); + } + } + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Exception in test: " + e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + protected TypePairComparator<T, R> getComparator(boolean ascending) { + TypePairComparator<T, R> comparator = createComparator(ascending); + if (comparator == null) { + throw new RuntimeException("Test case corrupt. Returns null as comparator."); + } + return comparator; + } + + protected Tuple2<T[], R[]> getSortedData() { + Tuple2<T[], R[]> data = getSortedTestData(); + if (data == null || data.f0 == null || data.f1 == null) { + throw new RuntimeException("Test case corrupt. Returns null as test data."); + } + if (data.f0.length < 2 || data.f1.length < 2) { + throw new RuntimeException("Test case does not provide enough sorted test data."); + } + + return data; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index a31e89d..3203d75 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -40,22 +40,12 @@ under the License. <artifactId>flink-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>${shading-artifact.name}</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <!-- version is derived from base module --> - </dependency> - - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - </dependency> <dependency> <groupId>org.ow2.asm</groupId> @@ -64,12 +54,6 @@ under the License. </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill-java</artifactId> - <version>${chill.version}</version> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> @@ -88,20 +72,6 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - <version>2.5</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.joda</groupId> - <artifactId>joda-convert</artifactId> - <version>1.7</version> - <scope>test</scope> - </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 6db32c5..6bcdb52 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -63,7 +63,7 @@ import org.apache.flink.api.java.operators.GroupCombineOperator; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.PartitionOperator; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/Utils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index 2edc533..80f8199 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -33,7 +33,6 @@ import org.apache.flink.configuration.Configuration; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.List; import java.util.Random; import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis; @@ -61,24 +60,6 @@ public final class Utils { return String.format("%s(%s:%d)", elem.getMethodName(), elem.getFileName(), elem.getLineNumber()); } - /** - * Returns all GenericTypeInfos contained in a composite type. - * - * @param typeInfo {@link CompositeType} - */ - public static void getContainedGenericTypes(CompositeType<?> typeInfo, List<GenericTypeInfo<?>> target) { - for (int i = 0; i < typeInfo.getArity(); i++) { - TypeInformation<?> type = typeInfo.getTypeAt(i); - if (type instanceof CompositeType) { - getContainedGenericTypes((CompositeType<?>) type, target); - } else if (type instanceof GenericTypeInfo) { - if (!target.contains(type)) { - target.add((GenericTypeInfo<?>) type); - } - } - } - } - // -------------------------------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java deleted file mode 100644 index 3d06c59..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.Function; - -import java.io.Serializable; - -/** - * The {@link KeySelector} allows to use arbitrary objects for operations such as - * reduce, reduceGroup, join, coGoup, etc. - * - * The extractor takes an object and returns the key for that object. - * - * @param <IN> Type of objects to extract the key from. - * @param <KEY> Type of key. - */ -public interface KeySelector<IN, KEY> extends Function, Serializable { - - /** - * User-defined function that extracts the key from an arbitrary object. - * - * For example for a class: - * <pre> - * public class Word { - * String word; - * int count; - * } - * </pre> - * The key extractor could return the word as - * a key to group all Word objects by the String they contain. - * - * The code would look like this - * <pre> - * public String getKey(Word w) { - * return w.word; - * } - * </pre> - * - * @param value The object to get the key from. - * @return The extracted key. - * - * @throws Exception Throwing an exception will cause the execution of the respective task to fail, - * and trigger recovery or cancellation of the program. - */ - KEY getKey(IN value) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index 761eeb3..a75b8e0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -37,7 +37,7 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import java.lang.annotation.Annotation; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java index c44929f..6763cdf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index 00e0d3b..6485936 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 845deb4..6c6b051 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; @@ -40,9 +41,9 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator; import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator; import org.apache.flink.api.java.operators.translation.PlanRightUnwrappingCoGroupOperator; @@ -297,11 +298,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU @SuppressWarnings("unchecked") final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2; - final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1); - final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2); + final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1); + final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2); - final Operator<Tuple2<K, I1>> keyedInput1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1); - final Operator<Tuple2<K, I2>> keyedInput2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2); + final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1); + final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2); final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanBothUnwrappingCoGroupOperator<>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); @@ -324,8 +325,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU @SuppressWarnings("unchecked") final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2; - final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2); - final Operator<Tuple2<K, I2>> keyedInput2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2); + final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2); + final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2); final PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanRightUnwrappingCoGroupOperator<>( @@ -355,8 +356,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU @SuppressWarnings("unchecked") final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1; - final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1); - final Operator<Tuple2<K, I1>> keyedInput1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1); + final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1); + final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1); final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanLeftUnwrappingCoGroupOperator<>( http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java index 30639c3..74f54b8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java @@ -21,11 +21,12 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; /** * A {@link DataSet} that is the result of a CoGroup transformation. http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 915a053..8745271 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -21,6 +21,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.GenericDataSinkBase; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index a302478..7b3001f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.AggregatorRegistry; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java index 6717c6d..4af9108 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIterationResultSet.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.operators; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index 5102c80..9979f59 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -20,12 +20,13 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -116,8 +117,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera @SuppressWarnings("unchecked") final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys; - TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys); - Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys); + TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys); + Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys); PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, typeInfoWithKey, true); http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java index ef0c12f..a43b869 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; @@ -30,7 +31,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -195,8 +196,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU { final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys; - TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys); - Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys); + TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys); + Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys); PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer = new PlanUnwrappingGroupCombineOperator<>(function, keys, name, outputType, typeInfoWithKey); @@ -217,9 +218,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU { final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey; final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>)rawSortingKeys; - TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey, sortingKey); + TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey, sortingKey); - Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey); + Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey); PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer = new PlanUnwrappingSortedGroupCombineOperator<>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey); http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index b1bf844..42553a0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; @@ -28,7 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator; import org.apache.flink.api.java.tuple.Tuple2; @@ -281,9 +282,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT boolean combinable) { SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys; - TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys); + TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys); - Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys); + Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys); PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator(function, keys, name, outputType, typeInfoWithKey, combinable); @@ -305,9 +306,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT { final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey; final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKey; - TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey,sortingKey); + TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(groupingKey,sortingKey); - Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey); + Operator<Tuple3<K1, K2, IN>> inputWithKey = KeyFunctions.appendKeyExtractor(input, groupingKey, sortingKey); PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> reducer = new PlanUnwrappingSortedReduceGroupOperator<>( http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java index c117458..823aee4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.DataSet; /**