This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba752b9e8b3 [FLINK-32380] Support Java Records with 
PojoTypeInfo/Serializer
ba752b9e8b3 is described below

commit ba752b9e8b3fa0fbbe67d6d1bd70cccbc74e6ca0
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Thu Nov 9 23:54:15 2023 -0800

    [FLINK-32380] Support Java Records with PojoTypeInfo/Serializer
---
 .../apache/flink/api/common/typeinfo/Types.java    |   4 +
 .../flink/api/java/typeutils/PojoTypeInfo.java     |   4 +
 .../flink/api/java/typeutils/TypeExtractor.java    |  20 +-
 .../runtime/JavaRecordBuilderFactory.java          | 171 +++++++++++++++
 .../api/java/typeutils/runtime/PojoSerializer.java | 164 +++++++++++----
 .../api/common/typeutils/SerializerTestBase.java   |   8 +-
 .../runtime/Java17PojoRecordSerializerTest.java    | 230 +++++++++++++++++++++
 .../Java17PojoRecordSerializerUpgradeTest.java     |  63 ++++++
 ...oRecordSerializerUpgradeTestSpecifications.java | 151 ++++++++++++++
 .../runtime/Java17RecordBuilderFactoryTest.java    | 112 ++++++++++
 .../serializer-snapshot                            | Bin 0 -> 389 bytes
 .../test-data                                      | Bin 0 -> 14 bytes
 .../serializer-snapshot                            | Bin 0 -> 380 bytes
 .../pojo-serializer-to-record-1.19/test-data       | Bin 0 -> 14 bytes
 pom.xml                                            |  10 +
 tools/maven/suppressions-core.xml                  |   3 +
 16 files changed, 894 insertions(+), 46 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index a5c509b9ad0..a66e3be2b13 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -269,6 +269,10 @@ public class Types {
      *
      * <p>The generic types for all fields of the POJO can be defined in a 
hierarchy of subclasses.
      *
+     * <p>Java Record classes can also be used as valid POJOs (even though 
they don't fulfill some
+     * of the above criteria). In this case Flink will use the record 
canonical constructor to
+     * create the objects.
+     *
      * <p>If Flink's type analyzer is unable to extract a valid POJO type 
information with type
      * information for all fields, an {@link
      * org.apache.flink.api.common.functions.InvalidTypesException} is thrown. 
Alternatively, you
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 021f9dfe333..97aa069b778 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -59,6 +59,10 @@ import static org.apache.flink.util.Preconditions.checkState;
  *       field can be null independent of the field's type.
  * </ul>
  *
+ * Java Record classes can also be used as valid POJOs (even though they don't 
fulfill some of the
+ * above criteria). In this case Flink will use the record canonical 
constructor to create the
+ * objects.
+ *
  * @param <T> The type represented by this type information.
  */
 @Public
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 939c988df62..6fc71cdd5bc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -2072,10 +2072,11 @@ public class TypeExtractor {
             return new GenericTypeInfo<>(clazz);
         }
 
+        boolean isRecord = isRecord(clazz);
         List<PojoField> pojoFields = new ArrayList<>();
         for (Field field : fields) {
             Type fieldType = field.getGenericType();
-            if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != 
Row.class) {
+            if (!isRecord && !isValidPojoField(field, clazz, typeHierarchy) && 
clazz != Row.class) {
                 LOG.info(
                         "Class "
                                 + clazz
@@ -2140,6 +2141,11 @@ public class TypeExtractor {
             }
         }
 
+        if (isRecord) {
+            // no default constructor extraction needs to be applied for Java 
records
+            return pojoType;
+        }
+
         // Try retrieving the default constructor, if it does not have one
         // we cannot use this because the serializer uses it.
         Constructor<OUT> defaultConstructor = null;
@@ -2174,6 +2180,18 @@ public class TypeExtractor {
         return pojoType;
     }
 
+    /**
+     * Determine whether the given class is a valid Java record.
+     *
+     * @param clazz class to check
+     * @return True if the class is a Java record
+     */
+    @PublicEvolving
+    public static boolean isRecord(Class<?> clazz) {
+        return clazz.getSuperclass().getName().equals("java.lang.Record")
+                && (clazz.getModifiers() & Modifier.FINAL) != 0;
+    }
+
     /**
      * Recursively determine all declared fields This is required because 
class.getFields() is not
      * returning fields defined in parent classes.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java
new file mode 100644
index 00000000000..ff5eefe2ee1
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utility class for constructing Java records in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordBuilderFactory<T> {
+
+    private final Constructor<T> canonicalConstructor;
+
+    /**
+     * Record constructor parameter index mapping in case the new constructor 
has a different
+     * parameter order than the serialized data. Used for schema evolution or 
`null` if no schema
+     * evolution is applied for that record class.
+     */
+    @Nullable private final int[] paramIndexMapping;
+
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution.
+     * `null` if no schema evolution is applied for that record class.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordBuilderFactory(Constructor<T> canonicalConstructor) {
+        this(canonicalConstructor, null, null);
+    }
+
+    private JavaRecordBuilderFactory(
+            Constructor<T> canonicalConstructor,
+            @Nullable int[] argIndexMapping,
+            @Nullable Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.canonicalConstructor = canonicalConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    JavaRecordBuilder newBuilder() {
+        return new JavaRecordBuilder();
+    }
+
+    /** Builder class for incremental record construction. */
+    @Internal
+    final class JavaRecordBuilder {
+        private final Object[] args;
+
+        JavaRecordBuilder() {
+            if (defaultConstructorArgs == null) {
+                args = new Object[canonicalConstructor.getParameterCount()];
+            } else {
+                args = Arrays.copyOf(defaultConstructorArgs, 
defaultConstructorArgs.length);
+            }
+        }
+
+        T build() {
+            try {
+                return canonicalConstructor.newInstance(args);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not instantiate record", e);
+            }
+        }
+
+        /**
+         * Set record field by index. If parameter index mapping is provided, 
the index is mapped,
+         * otherwise it is used as is.
+         *
+         * @param i index of field to be set
+         * @param value field value
+         */
+        void setField(int i, Object value) {
+            if (paramIndexMapping != null) {
+                args[paramIndexMapping[i]] = value;
+            } else {
+                args[i] = value;
+            }
+        }
+    }
+
+    static <T> JavaRecordBuilderFactory<T> create(Class<T> clazz, Field[] 
fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // before Java 14
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            // There may be (removed) null fields due to 
schema evolution
+                            .filter(Objects::nonNull)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    // There may be (removed) null fields due to schema 
evolution
+                    argIndexMapping[i] =
+                            field == null ? -1 : 
componentNames.indexOf(fields[i].getName());
+                }
+
+                // We have to initialize newly added primitive fields to their 
correct default value
+                Object[] defaultValues = new Object[componentNames.size()];
+                for (int i = 0; i < componentNames.size(); i++) {
+                    Class<?> fieldType = componentTypes[i];
+                    boolean newPrimitive =
+                            fieldType.isPrimitive()
+                                    && 
!previousFields.contains(componentNames.get(i));
+                    defaultValues[i] = newPrimitive ? 
Defaults.defaultValue(fieldType) : null;
+                }
+                return new JavaRecordBuilderFactory<>(
+                        recordConstructor, argIndexMapping, defaultValues);
+            } else {
+                return new JavaRecordBuilderFactory<>(recordConstructor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Could not find record canonical 
constructor", e);
+        }
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index d7475ea7b11..0531af8a0e1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -26,6 +26,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.CollectionUtil;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -89,6 +91,8 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     private transient ClassLoader cl;
 
+    @Nullable private transient JavaRecordBuilderFactory<T> recordFactory;
+
     /** Constructor to create a new {@link PojoSerializer}. */
     @SuppressWarnings("unchecked")
     public PojoSerializer(
@@ -118,6 +122,9 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                 createRegisteredSubclassSerializers(registeredSubclasses, 
executionConfig);
 
         this.subclassSerializerCache = new HashMap<>();
+        if (TypeExtractor.isRecord(clazz)) {
+            this.recordFactory = JavaRecordBuilderFactory.create(clazz, 
fields);
+        }
     }
 
     /**
@@ -142,6 +149,9 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         this.subclassSerializerCache = checkNotNull(subclassSerializerCache);
         this.executionConfig = checkNotNull(executionConfig);
         this.cl = Thread.currentThread().getContextClassLoader();
+        if (TypeExtractor.isRecord(clazz)) {
+            this.recordFactory = JavaRecordBuilderFactory.create(clazz, 
fields);
+        }
     }
 
     @Override
@@ -149,6 +159,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         return false;
     }
 
+    private boolean isRecord() {
+        return this.recordFactory != null;
+    }
+
     @Override
     public PojoSerializer<T> duplicate() {
         TypeSerializer<Object>[] duplicateFieldSerializers = 
duplicateSerializers(fieldSerializers);
@@ -189,7 +203,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     @Override
     public T createInstance() {
-        if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
+        if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers()) 
|| isRecord()) {
             return null;
         }
         try {
@@ -221,7 +235,20 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         }
 
         Class<?> actualType = from.getClass();
-        if (actualType == clazz) {
+        if (isRecord()) {
+            try {
+                JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = 
recordFactory.newBuilder();
+                for (int i = 0; i < numFields; i++) {
+                    if (fields[i] != null) {
+                        builder.setField(i, copyField(i, from));
+                    }
+                }
+                return builder.build();
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(
+                        "Error during POJO copy, this should not happen since 
we check the fields before.");
+            }
+        } else if (actualType == clazz) {
             T target;
             try {
                 target = (T) from.getClass().newInstance();
@@ -232,13 +259,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
             try {
                 for (int i = 0; i < numFields; i++) {
                     if (fields[i] != null) {
-                        Object value = fields[i].get(from);
-                        if (value != null) {
-                            Object copy = fieldSerializers[i].copy(value);
-                            fields[i].set(target, copy);
-                        } else {
-                            fields[i].set(target, null);
-                        }
+                        fields[i].set(target, copyField(i, from));
                     }
                 }
             } catch (IllegalAccessException e) {
@@ -266,23 +287,24 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
             return copy(from);
         }
 
-        if (actualType == clazz) {
+        if (isRecord()) {
             try {
+                JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = 
recordFactory.newBuilder();
                 for (int i = 0; i < numFields; i++) {
                     if (fields[i] != null) {
-                        Object value = fields[i].get(from);
-                        if (value != null) {
-                            Object reuseValue = fields[i].get(reuse);
-                            Object copy;
-                            if (reuseValue != null) {
-                                copy = fieldSerializers[i].copy(value, 
reuseValue);
-                            } else {
-                                copy = fieldSerializers[i].copy(value);
-                            }
-                            fields[i].set(reuse, copy);
-                        } else {
-                            fields[i].set(reuse, null);
-                        }
+                        builder.setField(i, copyField(reuse, i, from));
+                    }
+                }
+                return builder.build();
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(
+                        "Error during POJO copy, this should not happen since 
we check the fields before.");
+            }
+        } else if (actualType == clazz) {
+            try {
+                for (int i = 0; i < numFields; i++) {
+                    if (fields[i] != null) {
+                        fields[i].set(reuse, copyField(reuse, i, from));
                     }
                 }
             } catch (IllegalAccessException e) {
@@ -298,6 +320,30 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         return reuse;
     }
 
+    private Object copyField(int i, Object from) throws IllegalAccessException 
{
+        Object value = fields[i].get(from);
+        if (value != null) {
+            return fieldSerializers[i].copy(value);
+        } else {
+            return null;
+        }
+    }
+
+    private Object copyField(T reuse, int i, Object from) throws 
IllegalAccessException {
+        Object value = fields[i].get(from);
+        if (value != null) {
+            Object reuseValue = fields[i].get(reuse);
+
+            if (reuseValue != null) {
+                return fieldSerializers[i].copy(value, reuseValue);
+            } else {
+                return fieldSerializers[i].copy(value);
+            }
+        } else {
+            return null;
+        }
+    }
+
     @Override
     public int getLength() {
         return -1;
@@ -400,21 +446,23 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
             target = createInstance();
         }
 
-        if ((flags & NO_SUBCLASS) != 0) {
+        if (isRecord()) {
+            JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = 
recordFactory.newBuilder();
+            for (int i = 0; i < numFields; i++) {
+                boolean isNull = source.readBoolean();
+                Object fieldValue = isNull ? null : 
fieldSerializers[i].deserialize(source);
+                if (fields[i] != null) {
+                    builder.setField(i, fieldValue);
+                }
+            }
+            target = builder.build();
+        } else if ((flags & NO_SUBCLASS) != 0) {
             try {
                 for (int i = 0; i < numFields; i++) {
                     boolean isNull = source.readBoolean();
-
+                    Object fieldValue = isNull ? null : 
fieldSerializers[i].deserialize(source);
                     if (fields[i] != null) {
-                        if (isNull) {
-                            fields[i].set(target, null);
-                        } else {
-                            Object field = 
fieldSerializers[i].deserialize(source);
-                            fields[i].set(target, field);
-                        }
-                    } else if (!isNull) {
-                        // read and dump a pre-existing field value
-                        fieldSerializers[i].deserialize(source);
+                        fields[i].set(target, fieldValue);
                     }
                 }
             } catch (IllegalAccessException e) {
@@ -473,25 +521,41 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
             }
         }
 
-        if ((flags & NO_SUBCLASS) != 0) {
+        if (isRecord()) {
             try {
+                JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = 
recordFactory.newBuilder();
                 for (int i = 0; i < numFields; i++) {
                     boolean isNull = source.readBoolean();
 
                     if (fields[i] != null) {
                         if (isNull) {
-                            fields[i].set(reuse, null);
+                            builder.setField(i, null);
                         } else {
-                            Object field;
+                            Object reuseField = reuse == null ? null : 
fields[i].get(reuse);
+                            builder.setField(i, deserializeField(reuseField, 
i, source));
+                        }
+                    } else if (!isNull) {
+                        // read and dump a pre-existing field value
+                        fieldSerializers[i].deserialize(source);
+                    }
+                }
 
-                            Object reuseField = fields[i].get(reuse);
-                            if (reuseField != null) {
-                                field = 
fieldSerializers[i].deserialize(reuseField, source);
-                            } else {
-                                field = 
fieldSerializers[i].deserialize(source);
-                            }
+                reuse = builder.build();
+            } catch (IllegalAccessException e) {
+                throw new RuntimeException(
+                        "Error during POJO copy, this should not happen since 
we check the fields before.",
+                        e);
+            }
+        } else if ((flags & NO_SUBCLASS) != 0) {
+            try {
+                for (int i = 0; i < numFields; i++) {
+                    boolean isNull = source.readBoolean();
 
-                            fields[i].set(reuse, field);
+                    if (fields[i] != null) {
+                        if (isNull) {
+                            fields[i].set(reuse, null);
+                        } else {
+                            fields[i].set(reuse, 
deserializeField(fields[i].get(reuse), i, source));
                         }
                     } else if (!isNull) {
                         // read and dump a pre-existing field value
@@ -512,6 +576,15 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         return reuse;
     }
 
+    private Object deserializeField(Object reuseField, int i, DataInputView 
source)
+            throws IllegalAccessException, IOException {
+        if (reuseField != null) {
+            return fieldSerializers[i].deserialize(reuseField, source);
+        } else {
+            return fieldSerializers[i].deserialize(source);
+        }
+    }
+
     @Override
     public void copy(DataInputView source, DataOutputView target) throws 
IOException {
         // copy the flags
@@ -617,6 +690,9 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
         cl = Thread.currentThread().getContextClassLoader();
         subclassSerializerCache = new HashMap<>();
+        if (TypeExtractor.isRecord(clazz)) {
+            this.recordFactory = JavaRecordBuilderFactory.create(clazz, 
fields);
+        }
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index e495fb4ff68..6dfdde50e6f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -186,8 +186,11 @@ public abstract class SerializerTestBase<T> {
 
     @Test
     protected void testCopy() {
+        testCopy(getSerializer());
+    }
+
+    protected void testCopy(TypeSerializer<T> serializer) {
         try {
-            TypeSerializer<T> serializer = getSerializer();
             T[] testData = getData();
 
             for (T datum : testData) {
@@ -445,6 +448,9 @@ public abstract class SerializerTestBase<T> {
 
             assertEquals(
                     "The copy of the serializer is not equal to the original 
one.", ser1, ser2);
+
+            // Make sure the serializer can be used after cloning
+            testCopy(ser2);
         } catch (Exception e) {
             System.err.println(e.getMessage());
             e.printStackTrace();
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java
new file mode 100644
index 00000000000..58e2c2a8535
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+        extends 
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+    private final TypeInformation<TestUserClass> type =
+            TypeExtractor.getForClass(TestUserClass.class);
+
+    @Override
+    protected TypeSerializer<TestUserClass> createSerializer() {
+        TypeSerializer<TestUserClass> serializer = type.createSerializer(new 
ExecutionConfig());
+        assertThat(serializer).isInstanceOf(PojoSerializer.class);
+        return serializer;
+    }
+
+    @Override
+    protected boolean allowNullInstances(TypeSerializer<TestUserClass> 
serializer) {
+        return true;
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<TestUserClass> getTypeClass() {
+        return TestUserClass.class;
+    }
+
+    @Override
+    protected TestUserClass[] getTestData() {
+        Random rnd = new Random(874597969123412341L);
+
+        return new TestUserClass[] {
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "foo",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "foo@boo", rnd.nextDouble(), new 
int[] {10, 11, 12})),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    null,
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22})),
+            new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, 
null),
+            new TestUserClass(
+                    rnd.nextInt(),
+                    "bar",
+                    rnd.nextDouble(),
+                    new Date(),
+                    new NestedTestUserClass(
+                            rnd.nextInt(), "bar@bas", rnd.nextDouble(), new 
int[] {20, 21, 22}))
+        };
+    }
+
+    // User code class for testing the serializer
+    public record TestUserClass(
+            int dumm1, String dumm2, double dumm3, Date dumm5, 
NestedTestUserClass nestedClass) {}
+
+    public static class NestedTestUserClass {
+        public int dumm1;
+        public String dumm2;
+        public double dumm3;
+        public int[] dumm4;
+
+        public NestedTestUserClass() {}
+
+        public NestedTestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4) {
+            this.dumm1 = dumm1;
+            this.dumm2 = dumm2;
+            this.dumm3 = dumm3;
+            this.dumm4 = dumm4;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dumm1, dumm2, dumm3, dumm4);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof NestedTestUserClass)) {
+                return false;
+            }
+            NestedTestUserClass otherTUC = (NestedTestUserClass) other;
+            if (dumm1 != otherTUC.dumm1) {
+                return false;
+            }
+            if (!dumm2.equals(otherTUC.dumm2)) {
+                return false;
+            }
+            if (dumm3 != otherTUC.dumm3) {
+                return false;
+            }
+            if (dumm4.length != otherTUC.dumm4.length) {
+                return false;
+            }
+            for (int i = 0; i < dumm4.length; i++) {
+                if (dumm4[i] != otherTUC.dumm4[i]) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /** This tests if the hashes returned by the pojo and tuple comparators 
are the same. */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Test
+    void testTuplePojoTestEquality() throws IncompatibleKeysException {
+
+        // test with a simple, string-key first.
+        PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type;
+        List<FlatFieldDescriptor> result = new ArrayList<>();
+        pType.getFlatFields("nestedClass.dumm2", 0, result);
+        int[] fields = new int[1]; // see below
+        fields[0] = result.get(0).getPosition();
+        TypeComparator<TestUserClass> pojoComp =
+                pType.createComparator(fields, new boolean[] {true}, 0, new 
ExecutionConfig());
+
+        TestUserClass pojoTestRecord =
+                new TestUserClass(
+                        0,
+                        "abc",
+                        3d,
+                        new Date(),
+                        new NestedTestUserClass(1, "haha", 4d, new int[] {5, 
4, 3}));
+        int pHash = pojoComp.hash(pojoTestRecord);
+
+        Tuple1<String> tupleTest = new Tuple1<>("haha");
+        TupleTypeInfo<Tuple1<String>> tType =
+                (TupleTypeInfo<Tuple1<String>>) 
TypeExtractor.getForObject(tupleTest);
+        TypeComparator<Tuple1<String>> tupleComp =
+                tType.createComparator(
+                        new int[] {0}, new boolean[] {true}, 0, new 
ExecutionConfig());
+
+        int tHash = tupleComp.hash(tupleTest);
+
+        assertThat(tHash)
+                .isEqualTo(pHash)
+                .withFailMessage(
+                        "The hashing for tuples and pojos must be the same, so 
that they are mixable");
+
+        Tuple3<Integer, String, Double> multiTupleTest =
+                new Tuple3<>(1, "haha", 4d); // its important here to use the 
same values.
+        TupleTypeInfo<Tuple3<Integer, String, Double>> multiTupleType =
+                (TupleTypeInfo<Tuple3<Integer, String, Double>>)
+                        TypeExtractor.getForObject(multiTupleTest);
+
+        ExpressionKeys fieldKey = new ExpressionKeys(new int[] {1, 0, 2}, 
multiTupleType);
+        ExpressionKeys expressKey =
+                new ExpressionKeys(
+                        new String[] {
+                            "nestedClass.dumm2", "nestedClass.dumm1", 
"nestedClass.dumm3"
+                        },
+                        pType);
+
+        assertThat(fieldKey.areCompatible(expressKey))
+                .isTrue()
+                .withFailMessage("Expecting the keys to be compatible");
+        TypeComparator<TestUserClass> multiPojoComp =
+                pType.createComparator(
+                        expressKey.computeLogicalKeyPositions(),
+                        new boolean[] {true, true, true},
+                        0,
+                        new ExecutionConfig());
+        int multiPojoHash = multiPojoComp.hash(pojoTestRecord);
+
+        // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double).
+        TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp =
+                multiTupleType.createComparator(
+                        fieldKey.computeLogicalKeyPositions(),
+                        new boolean[] {true, true, true},
+                        0,
+                        new ExecutionConfig());
+        int multiTupleHash = multiTupleComp.hash(multiTupleTest);
+
+        assertThat(multiPojoHash)
+                .isEqualTo(multiTupleHash)
+                .withFailMessage(
+                        "The hashing for tuples and pojos must be the same, so 
that they are mixable. Also for those with multiple key fields");
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java
new file mode 100644
index 00000000000..455c2bda826
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */
+class Java17PojoRecordSerializerUpgradeTest
+        extends TypeSerializerUpgradeTestBase<
+                
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationSetup
+                        .RecordBeforeMigration,
+                
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationVerifier
+                        .RecordAfterSchemaUpgrade> {
+
+    @Override
+    public Collection<FlinkVersion> getMigrationVersions() {
+        List<FlinkVersion> testVersions = new ArrayList<>();
+        testVersions.add(FlinkVersion.v1_19);
+        return testVersions;
+    }
+
+    public Collection<TestSpecification<?, ?>> 
createTestSpecifications(FlinkVersion flinkVersion)
+            throws Exception {
+        Collection<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
+        testSpecifications.add(
+                new TestSpecification<>(
+                        "pojo-serializer-to-record",
+                        flinkVersion,
+                        
Java17PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordSetup.class,
+                        
Java17PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier
+                                .class));
+        testSpecifications.add(
+                new TestSpecification<>(
+                        "pojo-serializer-record-migration",
+                        flinkVersion,
+                        
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationSetup
+                                .class,
+                        
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationVerifier
+                                .class));
+        return testSpecifications;
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java
new file mode 100644
index 00000000000..158795a2125
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ClassRelocator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.hamcrest.Matcher;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertSame;
+
+/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */
+class Java17PojoRecordSerializerUpgradeTestSpecifications {
+
+    public static final class PojoToRecordSetup
+            implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<
+                    PojoToRecordSetup.PojoBeforeUpgrade> {
+
+        @ClassRelocator.RelocateClass("TestPojoToRecord")
+        @SuppressWarnings("WeakerAccess")
+        public static class PojoBeforeUpgrade {
+            public int id;
+            public String name;
+
+            public PojoBeforeUpgrade() {}
+
+            public PojoBeforeUpgrade(int id, String name) {
+                this.id = id;
+                this.name = name;
+            }
+        }
+
+        @Override
+        public TypeSerializer<PojoBeforeUpgrade> createPriorSerializer() {
+            TypeSerializer<PojoBeforeUpgrade> serializer =
+                    TypeExtractor.createTypeInfo(PojoBeforeUpgrade.class)
+                            .createSerializer(new ExecutionConfig());
+            assertSame(PojoSerializer.class, serializer.getClass());
+            return serializer;
+        }
+
+        @Override
+        public PojoBeforeUpgrade createTestData() {
+            return new PojoBeforeUpgrade(911108, "Gordon");
+        }
+    }
+
+    public static final class PojoToRecordVerifier
+            implements TypeSerializerUpgradeTestBase.UpgradeVerifier<
+                    PojoToRecordVerifier.PojoAfterUpgrade> {
+
+        @ClassRelocator.RelocateClass("TestPojoToRecord")
+        @SuppressWarnings("WeakerAccess")
+        public record PojoAfterUpgrade(int id, String name) {}
+
+        @Override
+        public TypeSerializer<PojoAfterUpgrade> createUpgradedSerializer() {
+            TypeSerializer<PojoAfterUpgrade> serializer =
+                    TypeExtractor.createTypeInfo(PojoAfterUpgrade.class)
+                            .createSerializer(new ExecutionConfig());
+            assertSame(PojoSerializer.class, serializer.getClass());
+            return serializer;
+        }
+
+        @Override
+        public Matcher<PojoAfterUpgrade> testDataMatcher() {
+            return is(new PojoAfterUpgrade(911108, "Gordon"));
+        }
+
+        @Override
+        public Matcher<TypeSerializerSchemaCompatibility<PojoAfterUpgrade>>
+                schemaCompatibilityMatcher(FlinkVersion version) {
+            return TypeSerializerMatchers.isCompatibleAsIs();
+        }
+    }
+
+    public static final class RecordMigrationSetup
+            implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<
+                    RecordMigrationSetup.RecordBeforeMigration> {
+
+        @ClassRelocator.RelocateClass("TestRecordMigration")
+        @SuppressWarnings("WeakerAccess")
+        public record RecordBeforeMigration(int id, String name) {}
+
+        @Override
+        public TypeSerializer<RecordBeforeMigration> createPriorSerializer() {
+            TypeSerializer<RecordBeforeMigration> serializer =
+                    TypeExtractor.createTypeInfo(RecordBeforeMigration.class)
+                            .createSerializer(new ExecutionConfig());
+            assertSame(PojoSerializer.class, serializer.getClass());
+            return serializer;
+        }
+
+        @Override
+        public RecordBeforeMigration createTestData() {
+            return new RecordBeforeMigration(911108, "Gordon");
+        }
+    }
+
+    public static final class RecordMigrationVerifier
+            implements TypeSerializerUpgradeTestBase.UpgradeVerifier<
+                    RecordMigrationVerifier.RecordAfterSchemaUpgrade> {
+
+        @ClassRelocator.RelocateClass("TestRecordMigration")
+        @SuppressWarnings("WeakerAccess")
+        public record RecordAfterSchemaUpgrade(String name, int age, String 
newField) {}
+
+        @Override
+        public TypeSerializer<RecordAfterSchemaUpgrade> 
createUpgradedSerializer() {
+            TypeSerializer<RecordAfterSchemaUpgrade> serializer =
+                    
TypeExtractor.createTypeInfo(RecordAfterSchemaUpgrade.class)
+                            .createSerializer(new ExecutionConfig());
+            assertSame(PojoSerializer.class, serializer.getClass());
+            return serializer;
+        }
+
+        @Override
+        public Matcher<RecordAfterSchemaUpgrade> testDataMatcher() {
+            return is(new RecordAfterSchemaUpgrade("Gordon", 0, null));
+        }
+
+        @Override
+        public 
Matcher<TypeSerializerSchemaCompatibility<RecordAfterSchemaUpgrade>>
+                schemaCompatibilityMatcher(FlinkVersion version) {
+            return TypeSerializerMatchers.isCompatibleAfterMigration();
+        }
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java
new file mode 100644
index 00000000000..f40f5b02eae
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the @{@link JavaRecordBuilderFactory}. */
+class Java17RecordBuilderFactoryTest {
+
+    Field[] fields;
+
+    record TestRecord(int i1, int i2, String s1, String s2) {}
+
+    @BeforeEach
+    void setup() {
+        fields = TestRecord.class.getDeclaredFields();
+    }
+
+    @Test
+    void testNoDefaultOrParamMapping() {
+        JavaRecordBuilderFactory<TestRecord> helper =
+                JavaRecordBuilderFactory.create(TestRecord.class, fields);
+        JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setField(1, 100);
+        builder.setField(0, 50);
+        builder.setField(3, "test");
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null, 
"test"));
+    }
+
+    @Test
+    void testNewFieldsAdded() {
+        // Test restoring from fields [i2, s1]
+        JavaRecordBuilderFactory<TestRecord> helper =
+                JavaRecordBuilderFactory.create(TestRecord.class, 
Arrays.copyOfRange(fields, 1, 3));
+
+        JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setField(0, 100);
+        builder.setField(1, "test");
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(0, 100, "test", 
null));
+    }
+
+    @Test
+    void testFieldsAddedRemovedAndRearranged() {
+        Field[] oldFields = new Field[] {fields[3], null, fields[0]};
+        JavaRecordBuilderFactory<TestRecord> helper =
+                JavaRecordBuilderFactory.create(TestRecord.class, oldFields);
+
+        JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setField(0, "test");
+        builder.setField(2, 100);
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(100, 0, null, 
"test"));
+    }
+
+    @Test
+    void testReorderFields() {
+        // Swap first and last field
+        Field temp = fields[0];
+        fields[0] = fields[3];
+        fields[3] = temp;
+
+        JavaRecordBuilderFactory<TestRecord> helper =
+                JavaRecordBuilderFactory.create(TestRecord.class, fields);
+
+        JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setField(0, "4");
+        builder.setField(1, 2);
+        builder.setField(2, "3");
+        builder.setField(3, 1);
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(1, 2, "3", "4"));
+    }
+
+    @Test
+    void testMissingRequiredField() {
+        JavaRecordBuilderFactory<TestRecord> helper =
+                JavaRecordBuilderFactory.create(TestRecord.class, fields);
+        JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+
+        builder.setField(0, 50);
+        // Do not set required param 1
+
+        assertThatThrownBy(builder::build)
+                .hasMessage("Could not instantiate record")
+                .hasCause(new IllegalArgumentException());
+    }
+}
diff --git 
a/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot
 
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot
new file mode 100644
index 00000000000..ef1b4cf9203
Binary files /dev/null and 
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot
 differ
diff --git 
a/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data 
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data
new file mode 100644
index 00000000000..c817b845cdb
Binary files /dev/null and 
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data 
differ
diff --git 
a/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot
 
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot
new file mode 100644
index 00000000000..ab14aa6a500
Binary files /dev/null and 
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot
 differ
diff --git 
a/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data 
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data
new file mode 100644
index 00000000000..c817b845cdb
Binary files /dev/null and 
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data differ
diff --git a/pom.xml b/pom.xml
index 66ceba2669c..dedac06e23e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -207,6 +207,7 @@ under the License.
                <!-- Can be set to any value to reproduce a specific build. -->
                <test.randomization.seed/>
                <test.unit.pattern>**/*Test.*</test.unit.pattern>
+               <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern>
        </properties>
 
        <dependencies>
@@ -1104,6 +1105,12 @@ under the License.
 
                <profile>
                        <id>java17-target</id>
+
+                       <!-- Include Java 17 specific tests (by not excluding 
them) -->
+                       <properties>
+                               
<test.exclusion.pattern>nothing</test.exclusion.pattern>
+                       </properties>
+
                        <build>
                                <plugins>
                                        <plugin>
@@ -2060,6 +2067,9 @@ under the License.
                                                        <!-- Prevents 
recompilation due to missing package-info.class, see MCOMPILER-205 -->
                                                        
<arg>-Xpkginfo:always</arg>
                                                </compilerArgs>
+                                               <testExcludes>
+                                                       
<testExclude>${test.exclusion.pattern}</testExclude>
+                                               </testExcludes>
                                        </configuration>
                                </plugin>
 
diff --git a/tools/maven/suppressions-core.xml 
b/tools/maven/suppressions-core.xml
index dc6e275dc62..fffe480a956 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- Suppress for Java17-specific test classes where Java records are 
used. -->
+       <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" 
checks="MethodNameCheck"/>
 </suppressions>

Reply via email to