This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ba752b9e8b3 [FLINK-32380] Support Java Records with PojoTypeInfo/Serializer ba752b9e8b3 is described below commit ba752b9e8b3fa0fbbe67d6d1bd70cccbc74e6ca0 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Nov 9 23:54:15 2023 -0800 [FLINK-32380] Support Java Records with PojoTypeInfo/Serializer --- .../apache/flink/api/common/typeinfo/Types.java | 4 + .../flink/api/java/typeutils/PojoTypeInfo.java | 4 + .../flink/api/java/typeutils/TypeExtractor.java | 20 +- .../runtime/JavaRecordBuilderFactory.java | 171 +++++++++++++++ .../api/java/typeutils/runtime/PojoSerializer.java | 164 +++++++++++---- .../api/common/typeutils/SerializerTestBase.java | 8 +- .../runtime/Java17PojoRecordSerializerTest.java | 230 +++++++++++++++++++++ .../Java17PojoRecordSerializerUpgradeTest.java | 63 ++++++ ...oRecordSerializerUpgradeTestSpecifications.java | 151 ++++++++++++++ .../runtime/Java17RecordBuilderFactoryTest.java | 112 ++++++++++ .../serializer-snapshot | Bin 0 -> 389 bytes .../test-data | Bin 0 -> 14 bytes .../serializer-snapshot | Bin 0 -> 380 bytes .../pojo-serializer-to-record-1.19/test-data | Bin 0 -> 14 bytes pom.xml | 10 + tools/maven/suppressions-core.xml | 3 + 16 files changed, 894 insertions(+), 46 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java index a5c509b9ad0..a66e3be2b13 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java @@ -269,6 +269,10 @@ public class Types { * * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses. * + * <p>Java Record classes can also be used as valid POJOs (even though they don't fulfill some + * of the above criteria). In this case Flink will use the record canonical constructor to + * create the objects. + * * <p>If Flink's type analyzer is unable to extract a valid POJO type information with type * information for all fields, an {@link * org.apache.flink.api.common.functions.InvalidTypesException} is thrown. Alternatively, you diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 021f9dfe333..97aa069b778 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -59,6 +59,10 @@ import static org.apache.flink.util.Preconditions.checkState; * field can be null independent of the field's type. * </ul> * + * Java Record classes can also be used as valid POJOs (even though they don't fulfill some of the + * above criteria). In this case Flink will use the record canonical constructor to create the + * objects. + * * @param <T> The type represented by this type information. */ @Public diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 939c988df62..6fc71cdd5bc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -2072,10 +2072,11 @@ public class TypeExtractor { return new GenericTypeInfo<>(clazz); } + boolean isRecord = isRecord(clazz); List<PojoField> pojoFields = new ArrayList<>(); for (Field field : fields) { Type fieldType = field.getGenericType(); - if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { + if (!isRecord && !isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info( "Class " + clazz @@ -2140,6 +2141,11 @@ public class TypeExtractor { } } + if (isRecord) { + // no default constructor extraction needs to be applied for Java records + return pojoType; + } + // Try retrieving the default constructor, if it does not have one // we cannot use this because the serializer uses it. Constructor<OUT> defaultConstructor = null; @@ -2174,6 +2180,18 @@ public class TypeExtractor { return pojoType; } + /** + * Determine whether the given class is a valid Java record. + * + * @param clazz class to check + * @return True if the class is a Java record + */ + @PublicEvolving + public static boolean isRecord(Class<?> clazz) { + return clazz.getSuperclass().getName().equals("java.lang.Record") + && (clazz.getModifiers() & Modifier.FINAL) != 0; + } + /** * Recursively determine all declared fields This is required because class.getFields() is not * returning fields defined in parent classes. diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java new file mode 100644 index 00000000000..ff5eefe2ee1 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utility class for constructing Java records in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordBuilderFactory<T> { + + private final Constructor<T> canonicalConstructor; + + /** + * Record constructor parameter index mapping in case the new constructor has a different + * parameter order than the serialized data. Used for schema evolution or `null` if no schema + * evolution is applied for that record class. + */ + @Nullable private final int[] paramIndexMapping; + + /** + * Default record args used for newly introduced primitive fields during schema evolution. + * `null` if no schema evolution is applied for that record class. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordBuilderFactory(Constructor<T> canonicalConstructor) { + this(canonicalConstructor, null, null); + } + + private JavaRecordBuilderFactory( + Constructor<T> canonicalConstructor, + @Nullable int[] argIndexMapping, + @Nullable Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.canonicalConstructor = canonicalConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + JavaRecordBuilder newBuilder() { + return new JavaRecordBuilder(); + } + + /** Builder class for incremental record construction. */ + @Internal + final class JavaRecordBuilder { + private final Object[] args; + + JavaRecordBuilder() { + if (defaultConstructorArgs == null) { + args = new Object[canonicalConstructor.getParameterCount()]; + } else { + args = Arrays.copyOf(defaultConstructorArgs, defaultConstructorArgs.length); + } + } + + T build() { + try { + return canonicalConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + /** + * Set record field by index. If parameter index mapping is provided, the index is mapped, + * otherwise it is used as is. + * + * @param i index of field to be set + * @param value field value + */ + void setField(int i, Object value) { + if (paramIndexMapping != null) { + args[paramIndexMapping[i]] = value; + } else { + args[i] = value; + } + } + } + + static <T> JavaRecordBuilderFactory<T> create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // before Java 14 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + // There may be (removed) null fields due to schema evolution + .filter(Objects::nonNull) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + // There may be (removed) null fields due to schema evolution + argIndexMapping[i] = + field == null ? -1 : componentNames.indexOf(fields[i].getName()); + } + + // We have to initialize newly added primitive fields to their correct default value + Object[] defaultValues = new Object[componentNames.size()]; + for (int i = 0; i < componentNames.size(); i++) { + Class<?> fieldType = componentTypes[i]; + boolean newPrimitive = + fieldType.isPrimitive() + && !previousFields.contains(componentNames.get(i)); + defaultValues[i] = newPrimitive ? Defaults.defaultValue(fieldType) : null; + } + return new JavaRecordBuilderFactory<>( + recordConstructor, argIndexMapping, defaultValues); + } else { + return new JavaRecordBuilderFactory<>(recordConstructor); + } + } catch (Exception e) { + throw new RuntimeException("Could not find record canonical constructor", e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index d7475ea7b11..0531af8a0e1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -26,6 +26,8 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.CollectionUtil; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -89,6 +91,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + @Nullable private transient JavaRecordBuilderFactory<T> recordFactory; + /** Constructor to create a new {@link PojoSerializer}. */ @SuppressWarnings("unchecked") public PojoSerializer( @@ -118,6 +122,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { createRegisteredSubclassSerializers(registeredSubclasses, executionConfig); this.subclassSerializerCache = new HashMap<>(); + if (TypeExtractor.isRecord(clazz)) { + this.recordFactory = JavaRecordBuilderFactory.create(clazz, fields); + } } /** @@ -142,6 +149,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { this.subclassSerializerCache = checkNotNull(subclassSerializerCache); this.executionConfig = checkNotNull(executionConfig); this.cl = Thread.currentThread().getContextClassLoader(); + if (TypeExtractor.isRecord(clazz)) { + this.recordFactory = JavaRecordBuilderFactory.create(clazz, fields); + } } @Override @@ -149,6 +159,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { return false; } + private boolean isRecord() { + return this.recordFactory != null; + } + @Override public PojoSerializer<T> duplicate() { TypeSerializer<Object>[] duplicateFieldSerializers = duplicateSerializers(fieldSerializers); @@ -189,7 +203,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { @Override public T createInstance() { - if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { + if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers()) || isRecord()) { return null; } try { @@ -221,7 +235,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } Class<?> actualType = from.getClass(); - if (actualType == clazz) { + if (isRecord()) { + try { + JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordFactory.newBuilder(); + for (int i = 0; i < numFields; i++) { + if (fields[i] != null) { + builder.setField(i, copyField(i, from)); + } + } + return builder.build(); + } catch (IllegalAccessException e) { + throw new RuntimeException( + "Error during POJO copy, this should not happen since we check the fields before."); + } + } else if (actualType == clazz) { T target; try { target = (T) from.getClass().newInstance(); @@ -232,13 +259,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { try { for (int i = 0; i < numFields; i++) { if (fields[i] != null) { - Object value = fields[i].get(from); - if (value != null) { - Object copy = fieldSerializers[i].copy(value); - fields[i].set(target, copy); - } else { - fields[i].set(target, null); - } + fields[i].set(target, copyField(i, from)); } } } catch (IllegalAccessException e) { @@ -266,23 +287,24 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { return copy(from); } - if (actualType == clazz) { + if (isRecord()) { try { + JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordFactory.newBuilder(); for (int i = 0; i < numFields; i++) { if (fields[i] != null) { - Object value = fields[i].get(from); - if (value != null) { - Object reuseValue = fields[i].get(reuse); - Object copy; - if (reuseValue != null) { - copy = fieldSerializers[i].copy(value, reuseValue); - } else { - copy = fieldSerializers[i].copy(value); - } - fields[i].set(reuse, copy); - } else { - fields[i].set(reuse, null); - } + builder.setField(i, copyField(reuse, i, from)); + } + } + return builder.build(); + } catch (IllegalAccessException e) { + throw new RuntimeException( + "Error during POJO copy, this should not happen since we check the fields before."); + } + } else if (actualType == clazz) { + try { + for (int i = 0; i < numFields; i++) { + if (fields[i] != null) { + fields[i].set(reuse, copyField(reuse, i, from)); } } } catch (IllegalAccessException e) { @@ -298,6 +320,30 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { return reuse; } + private Object copyField(int i, Object from) throws IllegalAccessException { + Object value = fields[i].get(from); + if (value != null) { + return fieldSerializers[i].copy(value); + } else { + return null; + } + } + + private Object copyField(T reuse, int i, Object from) throws IllegalAccessException { + Object value = fields[i].get(from); + if (value != null) { + Object reuseValue = fields[i].get(reuse); + + if (reuseValue != null) { + return fieldSerializers[i].copy(value, reuseValue); + } else { + return fieldSerializers[i].copy(value); + } + } else { + return null; + } + } + @Override public int getLength() { return -1; @@ -400,21 +446,23 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { target = createInstance(); } - if ((flags & NO_SUBCLASS) != 0) { + if (isRecord()) { + JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordFactory.newBuilder(); + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); + Object fieldValue = isNull ? null : fieldSerializers[i].deserialize(source); + if (fields[i] != null) { + builder.setField(i, fieldValue); + } + } + target = builder.build(); + } else if ((flags & NO_SUBCLASS) != 0) { try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); - + Object fieldValue = isNull ? null : fieldSerializers[i].deserialize(source); if (fields[i] != null) { - if (isNull) { - fields[i].set(target, null); - } else { - Object field = fieldSerializers[i].deserialize(source); - fields[i].set(target, field); - } - } else if (!isNull) { - // read and dump a pre-existing field value - fieldSerializers[i].deserialize(source); + fields[i].set(target, fieldValue); } } } catch (IllegalAccessException e) { @@ -473,25 +521,41 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } - if ((flags & NO_SUBCLASS) != 0) { + if (isRecord()) { try { + JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordFactory.newBuilder(); for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); if (fields[i] != null) { if (isNull) { - fields[i].set(reuse, null); + builder.setField(i, null); } else { - Object field; + Object reuseField = reuse == null ? null : fields[i].get(reuse); + builder.setField(i, deserializeField(reuseField, i, source)); + } + } else if (!isNull) { + // read and dump a pre-existing field value + fieldSerializers[i].deserialize(source); + } + } - Object reuseField = fields[i].get(reuse); - if (reuseField != null) { - field = fieldSerializers[i].deserialize(reuseField, source); - } else { - field = fieldSerializers[i].deserialize(source); - } + reuse = builder.build(); + } catch (IllegalAccessException e) { + throw new RuntimeException( + "Error during POJO copy, this should not happen since we check the fields before.", + e); + } + } else if ((flags & NO_SUBCLASS) != 0) { + try { + for (int i = 0; i < numFields; i++) { + boolean isNull = source.readBoolean(); - fields[i].set(reuse, field); + if (fields[i] != null) { + if (isNull) { + fields[i].set(reuse, null); + } else { + fields[i].set(reuse, deserializeField(fields[i].get(reuse), i, source)); } } else if (!isNull) { // read and dump a pre-existing field value @@ -512,6 +576,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { return reuse; } + private Object deserializeField(Object reuseField, int i, DataInputView source) + throws IllegalAccessException, IOException { + if (reuseField != null) { + return fieldSerializers[i].deserialize(reuseField, source); + } else { + return fieldSerializers[i].deserialize(source); + } + } + @Override public void copy(DataInputView source, DataOutputView target) throws IOException { // copy the flags @@ -617,6 +690,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { cl = Thread.currentThread().getContextClassLoader(); subclassSerializerCache = new HashMap<>(); + if (TypeExtractor.isRecord(clazz)) { + this.recordFactory = JavaRecordBuilderFactory.create(clazz, fields); + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index e495fb4ff68..6dfdde50e6f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -186,8 +186,11 @@ public abstract class SerializerTestBase<T> { @Test protected void testCopy() { + testCopy(getSerializer()); + } + + protected void testCopy(TypeSerializer<T> serializer) { try { - TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); for (T datum : testData) { @@ -445,6 +448,9 @@ public abstract class SerializerTestBase<T> { assertEquals( "The copy of the serializer is not equal to the original one.", ser1, ser2); + + // Make sure the serializer can be used after cloning + testCopy(ser2); } catch (Exception e) { System.err.println(e.getMessage()); e.printStackTrace(); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java new file mode 100644 index 00000000000..58e2c2a8535 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link PojoSerializer} with Java Records. */ +class Java17PojoRecordSerializerTest + extends SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> { + private final TypeInformation<TestUserClass> type = + TypeExtractor.getForClass(TestUserClass.class); + + @Override + protected TypeSerializer<TestUserClass> createSerializer() { + TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig()); + assertThat(serializer).isInstanceOf(PojoSerializer.class); + return serializer; + } + + @Override + protected boolean allowNullInstances(TypeSerializer<TestUserClass> serializer) { + return true; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<TestUserClass> getTypeClass() { + return TestUserClass.class; + } + + @Override + protected TestUserClass[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new TestUserClass[] { + new TestUserClass( + rnd.nextInt(), + "foo", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[] {10, 11, 12})), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + null, + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})), + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null), + new TestUserClass( + rnd.nextInt(), + "bar", + rnd.nextDouble(), + new Date(), + new NestedTestUserClass( + rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22})) + }; + } + + // User code class for testing the serializer + public record TestUserClass( + int dumm1, String dumm2, double dumm3, Date dumm5, NestedTestUserClass nestedClass) {} + + public static class NestedTestUserClass { + public int dumm1; + public String dumm2; + public double dumm3; + public int[] dumm4; + + public NestedTestUserClass() {} + + public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) { + this.dumm1 = dumm1; + this.dumm2 = dumm2; + this.dumm3 = dumm3; + this.dumm4 = dumm4; + } + + @Override + public int hashCode() { + return Objects.hash(dumm1, dumm2, dumm3, dumm4); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NestedTestUserClass)) { + return false; + } + NestedTestUserClass otherTUC = (NestedTestUserClass) other; + if (dumm1 != otherTUC.dumm1) { + return false; + } + if (!dumm2.equals(otherTUC.dumm2)) { + return false; + } + if (dumm3 != otherTUC.dumm3) { + return false; + } + if (dumm4.length != otherTUC.dumm4.length) { + return false; + } + for (int i = 0; i < dumm4.length; i++) { + if (dumm4[i] != otherTUC.dumm4[i]) { + return false; + } + } + return true; + } + } + + /** This tests if the hashes returned by the pojo and tuple comparators are the same. */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + void testTuplePojoTestEquality() throws IncompatibleKeysException { + + // test with a simple, string-key first. + PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type; + List<FlatFieldDescriptor> result = new ArrayList<>(); + pType.getFlatFields("nestedClass.dumm2", 0, result); + int[] fields = new int[1]; // see below + fields[0] = result.get(0).getPosition(); + TypeComparator<TestUserClass> pojoComp = + pType.createComparator(fields, new boolean[] {true}, 0, new ExecutionConfig()); + + TestUserClass pojoTestRecord = + new TestUserClass( + 0, + "abc", + 3d, + new Date(), + new NestedTestUserClass(1, "haha", 4d, new int[] {5, 4, 3})); + int pHash = pojoComp.hash(pojoTestRecord); + + Tuple1<String> tupleTest = new Tuple1<>("haha"); + TupleTypeInfo<Tuple1<String>> tType = + (TupleTypeInfo<Tuple1<String>>) TypeExtractor.getForObject(tupleTest); + TypeComparator<Tuple1<String>> tupleComp = + tType.createComparator( + new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); + + int tHash = tupleComp.hash(tupleTest); + + assertThat(tHash) + .isEqualTo(pHash) + .withFailMessage( + "The hashing for tuples and pojos must be the same, so that they are mixable"); + + Tuple3<Integer, String, Double> multiTupleTest = + new Tuple3<>(1, "haha", 4d); // its important here to use the same values. + TupleTypeInfo<Tuple3<Integer, String, Double>> multiTupleType = + (TupleTypeInfo<Tuple3<Integer, String, Double>>) + TypeExtractor.getForObject(multiTupleTest); + + ExpressionKeys fieldKey = new ExpressionKeys(new int[] {1, 0, 2}, multiTupleType); + ExpressionKeys expressKey = + new ExpressionKeys( + new String[] { + "nestedClass.dumm2", "nestedClass.dumm1", "nestedClass.dumm3" + }, + pType); + + assertThat(fieldKey.areCompatible(expressKey)) + .isTrue() + .withFailMessage("Expecting the keys to be compatible"); + TypeComparator<TestUserClass> multiPojoComp = + pType.createComparator( + expressKey.computeLogicalKeyPositions(), + new boolean[] {true, true, true}, + 0, + new ExecutionConfig()); + int multiPojoHash = multiPojoComp.hash(pojoTestRecord); + + // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double). + TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp = + multiTupleType.createComparator( + fieldKey.computeLogicalKeyPositions(), + new boolean[] {true, true, true}, + 0, + new ExecutionConfig()); + int multiTupleHash = multiTupleComp.hash(multiTupleTest); + + assertThat(multiPojoHash) + .isEqualTo(multiTupleHash) + .withFailMessage( + "The hashing for tuples and pojos must be the same, so that they are mixable. Also for those with multiple key fields"); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java new file mode 100644 index 00000000000..455c2bda826 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */ +class Java17PojoRecordSerializerUpgradeTest + extends TypeSerializerUpgradeTestBase< + Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationSetup + .RecordBeforeMigration, + Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationVerifier + .RecordAfterSchemaUpgrade> { + + @Override + public Collection<FlinkVersion> getMigrationVersions() { + List<FlinkVersion> testVersions = new ArrayList<>(); + testVersions.add(FlinkVersion.v1_19); + return testVersions; + } + + public Collection<TestSpecification<?, ?>> createTestSpecifications(FlinkVersion flinkVersion) + throws Exception { + Collection<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); + testSpecifications.add( + new TestSpecification<>( + "pojo-serializer-to-record", + flinkVersion, + Java17PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordSetup.class, + Java17PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier + .class)); + testSpecifications.add( + new TestSpecification<>( + "pojo-serializer-record-migration", + flinkVersion, + Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationSetup + .class, + Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationVerifier + .class)); + return testSpecifications; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java new file mode 100644 index 00000000000..158795a2125 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.ClassRelocator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerMatchers; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.hamcrest.Matcher; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertSame; + +/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */ +class Java17PojoRecordSerializerUpgradeTestSpecifications { + + public static final class PojoToRecordSetup + implements TypeSerializerUpgradeTestBase.PreUpgradeSetup< + PojoToRecordSetup.PojoBeforeUpgrade> { + + @ClassRelocator.RelocateClass("TestPojoToRecord") + @SuppressWarnings("WeakerAccess") + public static class PojoBeforeUpgrade { + public int id; + public String name; + + public PojoBeforeUpgrade() {} + + public PojoBeforeUpgrade(int id, String name) { + this.id = id; + this.name = name; + } + } + + @Override + public TypeSerializer<PojoBeforeUpgrade> createPriorSerializer() { + TypeSerializer<PojoBeforeUpgrade> serializer = + TypeExtractor.createTypeInfo(PojoBeforeUpgrade.class) + .createSerializer(new ExecutionConfig()); + assertSame(PojoSerializer.class, serializer.getClass()); + return serializer; + } + + @Override + public PojoBeforeUpgrade createTestData() { + return new PojoBeforeUpgrade(911108, "Gordon"); + } + } + + public static final class PojoToRecordVerifier + implements TypeSerializerUpgradeTestBase.UpgradeVerifier< + PojoToRecordVerifier.PojoAfterUpgrade> { + + @ClassRelocator.RelocateClass("TestPojoToRecord") + @SuppressWarnings("WeakerAccess") + public record PojoAfterUpgrade(int id, String name) {} + + @Override + public TypeSerializer<PojoAfterUpgrade> createUpgradedSerializer() { + TypeSerializer<PojoAfterUpgrade> serializer = + TypeExtractor.createTypeInfo(PojoAfterUpgrade.class) + .createSerializer(new ExecutionConfig()); + assertSame(PojoSerializer.class, serializer.getClass()); + return serializer; + } + + @Override + public Matcher<PojoAfterUpgrade> testDataMatcher() { + return is(new PojoAfterUpgrade(911108, "Gordon")); + } + + @Override + public Matcher<TypeSerializerSchemaCompatibility<PojoAfterUpgrade>> + schemaCompatibilityMatcher(FlinkVersion version) { + return TypeSerializerMatchers.isCompatibleAsIs(); + } + } + + public static final class RecordMigrationSetup + implements TypeSerializerUpgradeTestBase.PreUpgradeSetup< + RecordMigrationSetup.RecordBeforeMigration> { + + @ClassRelocator.RelocateClass("TestRecordMigration") + @SuppressWarnings("WeakerAccess") + public record RecordBeforeMigration(int id, String name) {} + + @Override + public TypeSerializer<RecordBeforeMigration> createPriorSerializer() { + TypeSerializer<RecordBeforeMigration> serializer = + TypeExtractor.createTypeInfo(RecordBeforeMigration.class) + .createSerializer(new ExecutionConfig()); + assertSame(PojoSerializer.class, serializer.getClass()); + return serializer; + } + + @Override + public RecordBeforeMigration createTestData() { + return new RecordBeforeMigration(911108, "Gordon"); + } + } + + public static final class RecordMigrationVerifier + implements TypeSerializerUpgradeTestBase.UpgradeVerifier< + RecordMigrationVerifier.RecordAfterSchemaUpgrade> { + + @ClassRelocator.RelocateClass("TestRecordMigration") + @SuppressWarnings("WeakerAccess") + public record RecordAfterSchemaUpgrade(String name, int age, String newField) {} + + @Override + public TypeSerializer<RecordAfterSchemaUpgrade> createUpgradedSerializer() { + TypeSerializer<RecordAfterSchemaUpgrade> serializer = + TypeExtractor.createTypeInfo(RecordAfterSchemaUpgrade.class) + .createSerializer(new ExecutionConfig()); + assertSame(PojoSerializer.class, serializer.getClass()); + return serializer; + } + + @Override + public Matcher<RecordAfterSchemaUpgrade> testDataMatcher() { + return is(new RecordAfterSchemaUpgrade("Gordon", 0, null)); + } + + @Override + public Matcher<TypeSerializerSchemaCompatibility<RecordAfterSchemaUpgrade>> + schemaCompatibilityMatcher(FlinkVersion version) { + return TypeSerializerMatchers.isCompatibleAfterMigration(); + } + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java new file mode 100644 index 00000000000..f40f5b02eae --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for the @{@link JavaRecordBuilderFactory}. */ +class Java17RecordBuilderFactoryTest { + + Field[] fields; + + record TestRecord(int i1, int i2, String s1, String s2) {} + + @BeforeEach + void setup() { + fields = TestRecord.class.getDeclaredFields(); + } + + @Test + void testNoDefaultOrParamMapping() { + JavaRecordBuilderFactory<TestRecord> helper = + JavaRecordBuilderFactory.create(TestRecord.class, fields); + JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setField(1, 100); + builder.setField(0, 50); + builder.setField(3, "test"); + + assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null, "test")); + } + + @Test + void testNewFieldsAdded() { + // Test restoring from fields [i2, s1] + JavaRecordBuilderFactory<TestRecord> helper = + JavaRecordBuilderFactory.create(TestRecord.class, Arrays.copyOfRange(fields, 1, 3)); + + JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setField(0, 100); + builder.setField(1, "test"); + + assertThat(builder.build()).isEqualTo(new TestRecord(0, 100, "test", null)); + } + + @Test + void testFieldsAddedRemovedAndRearranged() { + Field[] oldFields = new Field[] {fields[3], null, fields[0]}; + JavaRecordBuilderFactory<TestRecord> helper = + JavaRecordBuilderFactory.create(TestRecord.class, oldFields); + + JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setField(0, "test"); + builder.setField(2, 100); + + assertThat(builder.build()).isEqualTo(new TestRecord(100, 0, null, "test")); + } + + @Test + void testReorderFields() { + // Swap first and last field + Field temp = fields[0]; + fields[0] = fields[3]; + fields[3] = temp; + + JavaRecordBuilderFactory<TestRecord> helper = + JavaRecordBuilderFactory.create(TestRecord.class, fields); + + JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setField(0, "4"); + builder.setField(1, 2); + builder.setField(2, "3"); + builder.setField(3, 1); + + assertThat(builder.build()).isEqualTo(new TestRecord(1, 2, "3", "4")); + } + + @Test + void testMissingRequiredField() { + JavaRecordBuilderFactory<TestRecord> helper = + JavaRecordBuilderFactory.create(TestRecord.class, fields); + JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + + builder.setField(0, 50); + // Do not set required param 1 + + assertThatThrownBy(builder::build) + .hasMessage("Could not instantiate record") + .hasCause(new IllegalArgumentException()); + } +} diff --git a/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot new file mode 100644 index 00000000000..ef1b4cf9203 Binary files /dev/null and b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot differ diff --git a/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data new file mode 100644 index 00000000000..c817b845cdb Binary files /dev/null and b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data differ diff --git a/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot new file mode 100644 index 00000000000..ab14aa6a500 Binary files /dev/null and b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot differ diff --git a/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data new file mode 100644 index 00000000000..c817b845cdb Binary files /dev/null and b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data differ diff --git a/pom.xml b/pom.xml index 66ceba2669c..dedac06e23e 100644 --- a/pom.xml +++ b/pom.xml @@ -207,6 +207,7 @@ under the License. <!-- Can be set to any value to reproduce a specific build. --> <test.randomization.seed/> <test.unit.pattern>**/*Test.*</test.unit.pattern> + <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern> </properties> <dependencies> @@ -1104,6 +1105,12 @@ under the License. <profile> <id>java17-target</id> + + <!-- Include Java 17 specific tests (by not excluding them) --> + <properties> + <test.exclusion.pattern>nothing</test.exclusion.pattern> + </properties> + <build> <plugins> <plugin> @@ -2060,6 +2067,9 @@ under the License. <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 --> <arg>-Xpkginfo:always</arg> </compilerArgs> + <testExcludes> + <testExclude>${test.exclusion.pattern}</testExclude> + </testExcludes> </configuration> </plugin> diff --git a/tools/maven/suppressions-core.xml b/tools/maven/suppressions-core.xml index dc6e275dc62..fffe480a956 100644 --- a/tools/maven/suppressions-core.xml +++ b/tools/maven/suppressions-core.xml @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- Suppress for Java17-specific test classes where Java records are used. --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks="MethodNameCheck"/> </suppressions>