Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-24 Thread via GitHub


X-czh commented on PR #24182:
URL: https://github.com/apache/flink/pull/24182#issuecomment-1908265269

   > Thanks for the update. +1 for merging.
   
   From the unstable UT, I just realized that the config option has to be a 
list of pairs instead of a map of pairs since the order is important for 
registering POJO and Kryo serializers. I'll update with a fix PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-24 Thread via GitHub


X-czh commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464765997


##
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##
@@ -255,6 +256,51 @@ public class PipelineOptions {
 + " sure that only tags 
are written.")
 .build());
 
+public static final ConfigOption> SERIALIZATION_CONFIG 
=

Review Comment:
   
![image](https://github.com/apache/flink/assets/22020529/b4dfe47e-be15-4ac8-9851-9bc63ab70169)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-24 Thread via GitHub


JunRuiLee commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464718228


##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
 .map(c -> loadClasses(c, classLoader, "Could not load kryo 
type to be registered."))
 .ifPresent(c -> this.registeredKryoTypes = c);
+
+try {
+configuration
+.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
+.ifPresent(c -> 
parseSerializationConfigWithExceptionHandling(classLoader, c));
+} catch (Exception e) {
+if (!GlobalConfiguration.isStandardYaml()) {
+throw new UnsupportedOperationException(
+String.format(
+"%s is only supported with the standard YAML 
config parser, please use \"config.yaml\" as the config file.",
+PipelineOptions.SERIALIZATION_CONFIG.key()));
+}

Review Comment:
   make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-24 Thread via GitHub


X-czh commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464658457


##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
 .map(c -> loadClasses(c, classLoader, "Could not load kryo 
type to be registered."))
 .ifPresent(c -> this.registeredKryoTypes = c);
+
+try {
+configuration
+.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
+.ifPresent(c -> 
parseSerializationConfigWithExceptionHandling(classLoader, c));
+} catch (Exception e) {
+if (!GlobalConfiguration.isStandardYaml()) {
+throw new UnsupportedOperationException(
+String.format(
+"%s is only supported with the standard YAML 
config parser, please use \"config.yaml\" as the config file.",
+PipelineOptions.SERIALIZATION_CONFIG.key()));
+}

Review Comment:
   Seems unnecessary, we are certain that it won't work when using the legacy 
parser. So, this is already a concise root cause



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-24 Thread via GitHub


reswqa commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464593996


##
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##
@@ -255,6 +256,51 @@ public class PipelineOptions {
 + " sure that only tags 
are written.")
 .build());
 
+public static final ConfigOption> SERIALIZATION_CONFIG 
=

Review Comment:
   Could you please post a screenshot corresponding to the rendered config 
option.



##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable {
 
 private LinkedHashSet> registeredPojoTypes = new 
LinkedHashSet<>();
 
-private LinkedHashMap, Class>> 
registeredTypeFactories =
-new LinkedHashMap<>();
+// Order is not required as we will traverse the type hierarchy up to find 
the closest type
+// information factory
+// when extracting the type information.

Review Comment:
   This new line seems unnecessary.



##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -408,4 +431,104 @@ private  T loadClass(
 throw new IllegalArgumentException(errorMessage, e);
 }
 }
+
+private void parseSerializationConfigWithExceptionHandling(
+ClassLoader classLoader, Map serializationConfigs) 
{
+try {
+parseSerializationConfig(classLoader, serializationConfigs);
+} catch (Exception e) {
+throw new IllegalArgumentException(
+String.format("Could not configure serializers from %s.", 
serializationConfigs),
+e);
+}
+}
+
+private void parseSerializationConfig(
+ClassLoader classLoader, Map serializationConfigs) 
{
+final Map, Map> serializationConfigByClass =
+serializationConfigs.entrySet().stream()
+.collect(
+Collectors.toMap(
+e ->
+loadClass(
+e.getKey(),
+classLoader,
+"Could not load class 
for serialization config"),
+e -> 
ConfigurationUtils.parseStringToMap(e.getValue(;
+for (Map.Entry, Map> entry :
+serializationConfigByClass.entrySet()) {
+Class type = entry.getKey();
+Map config = entry.getValue();
+String configType = config.get("type");

Review Comment:
   We may need to throw more explicit exceptions for Null case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-24 Thread via GitHub


JunRuiLee commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464545618


##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -408,4 +431,104 @@ private  T loadClass(
 throw new IllegalArgumentException(errorMessage, e);
 }
 }
+
+private void parseSerializationConfigWithExceptionHandling(
+ClassLoader classLoader, Map serializationConfigs) 
{
+try {
+parseSerializationConfig(classLoader, serializationConfigs);
+} catch (Exception e) {
+throw new IllegalArgumentException(
+String.format("Could not configure serializers from %s.", 
serializationConfigs),
+e);
+}
+}
+
+private void parseSerializationConfig(
+ClassLoader classLoader, Map serializationConfigs) 
{
+final Map, Map> serializationConfigByClass =
+serializationConfigs.entrySet().stream()
+.collect(
+Collectors.toMap(
+e ->
+loadClass(
+e.getKey(),
+classLoader,
+"Could not load class 
for serialization config"),
+e -> 
ConfigurationUtils.parseStringToMap(e.getValue(;
+for (Map.Entry, Map> entry :
+serializationConfigByClass.entrySet()) {
+Class type = entry.getKey();
+Map config = entry.getValue();
+String configType = config.get("type");
+switch (configType) {
+case "pojo":
+registerPojoType(type);
+break;
+case "kryo":
+parseAndRegisterKryoType(classLoader, type, config);
+break;
+case "typeinfo":
+parseAndRegisterTypeFactory(classLoader, type, config);
+break;
+default:
+throw new IllegalArgumentException("Unsupported type: " + 
configType);
+}
+}
+}
+
+private void parseAndRegisterKryoType(
+ClassLoader classLoader, Class t, Map m) {
+String kryoType = m.get("kryo-type");
+if (kryoType == null) {
+registerKryoType(t);
+} else {
+switch (kryoType) {
+case "default":
+addDefaultKryoSerializer(
+t,
+loadClass(
+m.get("class"),
+classLoader,
+"Could not load serializer's class"));
+break;
+case "registered":
+registerTypeWithKryoSerializer(
+t,
+loadClass(
+m.get("class"),
+classLoader,
+"Could not load serializer's class"));
+break;
+default:
+break;
+}
+}
+}
+
+private void parseAndRegisterTypeFactory(
+ClassLoader classLoader, Class t, Map m) {
+Class> factoryClass =
+loadClass(m.get("class"), classLoader, "Could not load 
TypeInfoFactory's class");
+// Register in the global static factory map of TypeExtractor for now 
so that it can be
+// accessed from
+// the static methods of TypeExtractor where SerializerConfig is 
currently not accessible

Review Comment:
   ditto



##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable {
 
 private LinkedHashSet> registeredPojoTypes = new 
LinkedHashSet<>();
 
-private LinkedHashMap, Class>> 
registeredTypeFactories =
-new LinkedHashMap<>();
+// Order is not required as we will traverse the type hierarchy up to find 
the closest type
+// information factory
+// when extracting the type information.

Review Comment:
   can be included as a single line



##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
 .map(c -> loadClasses(c, classLoader, "Could not load kryo 
type to be registered."))
 .ifPresent(c -> this.registeredKryoTypes = c);
+
+

Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-23 Thread via GitHub


X-czh commented on PR #24182:
URL: https://github.com/apache/flink/pull/24182#issuecomment-1907215961

   @reswqa Could you help review it? Thx~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-23 Thread via GitHub


flinkbot commented on PR #24182:
URL: https://github.com/apache/flink/pull/24182#issuecomment-1906494842

   
   ## CI report:
   
   * db0ed2d1e4f59006cb2f6ef491d143390a5504a9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-23 Thread via GitHub


X-czh commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1463601347


##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -229,8 +233,9 @@ public LinkedHashSet> getRegisteredPojoTypes() {
 }
 
 /** Returns the registered type info factories. */
-public LinkedHashMap, Class>> 
getRegisteredTypeFactories() {
-return registeredTypeFactories;
+public LinkedHashMap, Class>>

Review Comment:
   The type should be `Class>` instead of 
`Class>` for proper functioning. Updated in the FLIP as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-23 Thread via GitHub


X-czh commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1463599214


##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -229,8 +233,9 @@ public LinkedHashSet> getRegisteredPojoTypes() {
 }
 
 /** Returns the registered type info factories. */
-public LinkedHashMap, Class>> 
getRegisteredTypeFactories() {
-return registeredTypeFactories;
+public LinkedHashMap, Class>>
+getRegisteredTypeInfoFactories() {
+return registeredTypeInfoFactories;

Review Comment:
   `getRegisteredTypeFactories` -> `getRegisteredTypeInfoFactories`: this is a 
typo in the method name from the previous MR, fix it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-23 Thread via GitHub


X-czh commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1463599214


##
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##
@@ -229,8 +233,9 @@ public LinkedHashSet> getRegisteredPojoTypes() {
 }
 
 /** Returns the registered type info factories. */
-public LinkedHashMap, Class>> 
getRegisteredTypeFactories() {
-return registeredTypeFactories;
+public LinkedHashMap, Class>>
+getRegisteredTypeInfoFactories() {
+return registeredTypeInfoFactories;

Review Comment:
   This is a typo in the method name from the previous MR, fix it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]

2024-01-23 Thread via GitHub


X-czh opened a new pull request, #24182:
URL: https://github.com/apache/flink/pull/24182

   
   
   ## What is the purpose of the change
   
   Introduce unified serialization config option for all Kryo, POJO and 
customized serializers for parameterized serialization config based on pure 
config.
   
   ## Brief change log
   
   1. Introduce `pipeline.serialization-config`, a unified serialization config 
option for all Kryo, POJO and customized serializers.
   2. For POJO & Kryo serializers, the config is parsed and dispatched to the 
existing `SerializerConfig#registerPojoType`, 
`SerializerConfig#registerKryoType`, 
`SerializerConfig#addDefaultKryoSerializer`, and 
`SerializerConfig#registerTypeWithKryoSerializer` methods.
   3. For customized serializers, the specified type info factories is 
registered in the global static factory map of `TypeExtractor `for now so that 
it can be accessed from the static methods of `TypeExtractor` where 
`SerializerConfig` is currently not accessible. Plan to migrate the static 
methods to take `SerializerConfig` as one of the arguments in v1.20.
   
   ## Verifying this change
   
   Added tests in  `SerializerConfigTest` on parsing different types of 
serializer config, both legal and illegal.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) yes
 - The serializers: (yes / no / don't know) yes
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) yes
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org