Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on PR #24182: URL: https://github.com/apache/flink/pull/24182#issuecomment-1908265269 > Thanks for the update. +1 for merging. From the unstable UT, I just realized that the config option has to be a list of pairs instead of a map of pairs since the order is important for registering POJO and Kryo serializers. I'll update with a fix PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1464765997 ## flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java: ## @@ -255,6 +256,51 @@ public class PipelineOptions { + " sure that only tags are written.") .build()); +public static final ConfigOption> SERIALIZATION_CONFIG = Review Comment: ![image](https://github.com/apache/flink/assets/22020529/b4dfe47e-be15-4ac8-9851-9bc63ab70169) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
JunRuiLee commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1464718228 ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES) .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered.")) .ifPresent(c -> this.registeredKryoTypes = c); + +try { +configuration +.getOptional(PipelineOptions.SERIALIZATION_CONFIG) +.ifPresent(c -> parseSerializationConfigWithExceptionHandling(classLoader, c)); +} catch (Exception e) { +if (!GlobalConfiguration.isStandardYaml()) { +throw new UnsupportedOperationException( +String.format( +"%s is only supported with the standard YAML config parser, please use \"config.yaml\" as the config file.", +PipelineOptions.SERIALIZATION_CONFIG.key())); +} Review Comment: make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1464658457 ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES) .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered.")) .ifPresent(c -> this.registeredKryoTypes = c); + +try { +configuration +.getOptional(PipelineOptions.SERIALIZATION_CONFIG) +.ifPresent(c -> parseSerializationConfigWithExceptionHandling(classLoader, c)); +} catch (Exception e) { +if (!GlobalConfiguration.isStandardYaml()) { +throw new UnsupportedOperationException( +String.format( +"%s is only supported with the standard YAML config parser, please use \"config.yaml\" as the config file.", +PipelineOptions.SERIALIZATION_CONFIG.key())); +} Review Comment: Seems unnecessary, we are certain that it won't work when using the legacy parser. So, this is already a concise root cause -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
reswqa commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1464593996 ## flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java: ## @@ -255,6 +256,51 @@ public class PipelineOptions { + " sure that only tags are written.") .build()); +public static final ConfigOption> SERIALIZATION_CONFIG = Review Comment: Could you please post a screenshot corresponding to the rendered config option. ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable { private LinkedHashSet> registeredPojoTypes = new LinkedHashSet<>(); -private LinkedHashMap, Class>> registeredTypeFactories = -new LinkedHashMap<>(); +// Order is not required as we will traverse the type hierarchy up to find the closest type +// information factory +// when extracting the type information. Review Comment: This new line seems unnecessary. ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -408,4 +431,104 @@ private T loadClass( throw new IllegalArgumentException(errorMessage, e); } } + +private void parseSerializationConfigWithExceptionHandling( +ClassLoader classLoader, Map serializationConfigs) { +try { +parseSerializationConfig(classLoader, serializationConfigs); +} catch (Exception e) { +throw new IllegalArgumentException( +String.format("Could not configure serializers from %s.", serializationConfigs), +e); +} +} + +private void parseSerializationConfig( +ClassLoader classLoader, Map serializationConfigs) { +final Map, Map> serializationConfigByClass = +serializationConfigs.entrySet().stream() +.collect( +Collectors.toMap( +e -> +loadClass( +e.getKey(), +classLoader, +"Could not load class for serialization config"), +e -> ConfigurationUtils.parseStringToMap(e.getValue(; +for (Map.Entry, Map> entry : +serializationConfigByClass.entrySet()) { +Class type = entry.getKey(); +Map config = entry.getValue(); +String configType = config.get("type"); Review Comment: We may need to throw more explicit exceptions for Null case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
JunRuiLee commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1464545618 ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -408,4 +431,104 @@ private T loadClass( throw new IllegalArgumentException(errorMessage, e); } } + +private void parseSerializationConfigWithExceptionHandling( +ClassLoader classLoader, Map serializationConfigs) { +try { +parseSerializationConfig(classLoader, serializationConfigs); +} catch (Exception e) { +throw new IllegalArgumentException( +String.format("Could not configure serializers from %s.", serializationConfigs), +e); +} +} + +private void parseSerializationConfig( +ClassLoader classLoader, Map serializationConfigs) { +final Map, Map> serializationConfigByClass = +serializationConfigs.entrySet().stream() +.collect( +Collectors.toMap( +e -> +loadClass( +e.getKey(), +classLoader, +"Could not load class for serialization config"), +e -> ConfigurationUtils.parseStringToMap(e.getValue(; +for (Map.Entry, Map> entry : +serializationConfigByClass.entrySet()) { +Class type = entry.getKey(); +Map config = entry.getValue(); +String configType = config.get("type"); +switch (configType) { +case "pojo": +registerPojoType(type); +break; +case "kryo": +parseAndRegisterKryoType(classLoader, type, config); +break; +case "typeinfo": +parseAndRegisterTypeFactory(classLoader, type, config); +break; +default: +throw new IllegalArgumentException("Unsupported type: " + configType); +} +} +} + +private void parseAndRegisterKryoType( +ClassLoader classLoader, Class t, Map m) { +String kryoType = m.get("kryo-type"); +if (kryoType == null) { +registerKryoType(t); +} else { +switch (kryoType) { +case "default": +addDefaultKryoSerializer( +t, +loadClass( +m.get("class"), +classLoader, +"Could not load serializer's class")); +break; +case "registered": +registerTypeWithKryoSerializer( +t, +loadClass( +m.get("class"), +classLoader, +"Could not load serializer's class")); +break; +default: +break; +} +} +} + +private void parseAndRegisterTypeFactory( +ClassLoader classLoader, Class t, Map m) { +Class> factoryClass = +loadClass(m.get("class"), classLoader, "Could not load TypeInfoFactory's class"); +// Register in the global static factory map of TypeExtractor for now so that it can be +// accessed from +// the static methods of TypeExtractor where SerializerConfig is currently not accessible Review Comment: ditto ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable { private LinkedHashSet> registeredPojoTypes = new LinkedHashSet<>(); -private LinkedHashMap, Class>> registeredTypeFactories = -new LinkedHashMap<>(); +// Order is not required as we will traverse the type hierarchy up to find the closest type +// information factory +// when extracting the type information. Review Comment: can be included as a single line ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES) .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered.")) .ifPresent(c -> this.registeredKryoTypes = c); + +
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on PR #24182: URL: https://github.com/apache/flink/pull/24182#issuecomment-1907215961 @reswqa Could you help review it? Thx~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
flinkbot commented on PR #24182: URL: https://github.com/apache/flink/pull/24182#issuecomment-1906494842 ## CI report: * db0ed2d1e4f59006cb2f6ef491d143390a5504a9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1463601347 ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -229,8 +233,9 @@ public LinkedHashSet> getRegisteredPojoTypes() { } /** Returns the registered type info factories. */ -public LinkedHashMap, Class>> getRegisteredTypeFactories() { -return registeredTypeFactories; +public LinkedHashMap, Class>> Review Comment: The type should be `Class>` instead of `Class>` for proper functioning. Updated in the FLIP as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1463599214 ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -229,8 +233,9 @@ public LinkedHashSet> getRegisteredPojoTypes() { } /** Returns the registered type info factories. */ -public LinkedHashMap, Class>> getRegisteredTypeFactories() { -return registeredTypeFactories; +public LinkedHashMap, Class>> +getRegisteredTypeInfoFactories() { +return registeredTypeInfoFactories; Review Comment: `getRegisteredTypeFactories` -> `getRegisteredTypeInfoFactories`: this is a typo in the method name from the previous MR, fix it here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh commented on code in PR #24182: URL: https://github.com/apache/flink/pull/24182#discussion_r1463599214 ## flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java: ## @@ -229,8 +233,9 @@ public LinkedHashSet> getRegisteredPojoTypes() { } /** Returns the registered type info factories. */ -public LinkedHashMap, Class>> getRegisteredTypeFactories() { -return registeredTypeFactories; +public LinkedHashMap, Class>> +getRegisteredTypeInfoFactories() { +return registeredTypeInfoFactories; Review Comment: This is a typo in the method name from the previous MR, fix it here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34120] Introduce unified serialization config option for all Kryo, POJO and customized serializers [flink]
X-czh opened a new pull request, #24182: URL: https://github.com/apache/flink/pull/24182 ## What is the purpose of the change Introduce unified serialization config option for all Kryo, POJO and customized serializers for parameterized serialization config based on pure config. ## Brief change log 1. Introduce `pipeline.serialization-config`, a unified serialization config option for all Kryo, POJO and customized serializers. 2. For POJO & Kryo serializers, the config is parsed and dispatched to the existing `SerializerConfig#registerPojoType`, `SerializerConfig#registerKryoType`, `SerializerConfig#addDefaultKryoSerializer`, and `SerializerConfig#registerTypeWithKryoSerializer` methods. 3. For customized serializers, the specified type info factories is registered in the global static factory map of `TypeExtractor `for now so that it can be accessed from the static methods of `TypeExtractor` where `SerializerConfig` is currently not accessible. Plan to migrate the static methods to take `SerializerConfig` as one of the arguments in v1.20. ## Verifying this change Added tests in `SerializerConfigTest` on parsing different types of serializer config, both legal and illegal. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) yes - The serializers: (yes / no / don't know) yes - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) yes - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org