This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bc16d52 [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer bc16d52 is described below commit bc16d526b36a37915d01005f849e7c7faf805ee1 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Jun 3 15:31:20 2020 +0200 [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer The open method of SerializationSchema was not called in the universal Kafka producer. This closes #12450 --- .../flink/streaming/connectors/kafka/FlinkKafkaProducer.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index b095195..25b359c 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -759,12 +759,21 @@ public class FlinkKafkaProducer<IN> } if (kafkaSchema != null) { - kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user")); + kafkaSchema.open(createSerializationInitContext()); + } + + if (keyedSchema != null && keyedSchema instanceof KeyedSerializationSchemaWrapper) { + ((KeyedSerializationSchemaWrapper<IN>) keyedSchema).getSerializationSchema() + .open(createSerializationInitContext()); } super.open(configuration); } + private SerializationSchema.InitializationContext createSerializationInitContext() { + return () -> getRuntimeContext().getMetricGroup().addGroup("user"); + } + @Override public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException { checkErroneous();