This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bc16d52  [FLINK-18075][kafka] Call open method of SerializationSchema 
in Kafka producer
bc16d52 is described below

commit bc16d526b36a37915d01005f849e7c7faf805ee1
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jun 3 15:31:20 2020 +0200

    [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka 
producer
    
    The open method of SerializationSchema was not called in the universal
    Kafka producer.
    
    This closes #12450
---
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index b095195..25b359c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -759,12 +759,21 @@ public class FlinkKafkaProducer<IN>
                }
 
                if (kafkaSchema != null) {
-                       kafkaSchema.open(() -> 
getRuntimeContext().getMetricGroup().addGroup("user"));
+                       kafkaSchema.open(createSerializationInitContext());
+               }
+
+               if (keyedSchema != null && keyedSchema instanceof 
KeyedSerializationSchemaWrapper) {
+                       ((KeyedSerializationSchemaWrapper<IN>) 
keyedSchema).getSerializationSchema()
+                               .open(createSerializationInitContext());
                }
 
                super.open(configuration);
        }
 
+       private SerializationSchema.InitializationContext 
createSerializationInitContext() {
+               return () -> 
getRuntimeContext().getMetricGroup().addGroup("user");
+       }
+
        @Override
        public void invoke(FlinkKafkaProducer.KafkaTransactionState 
transaction, IN next, Context context) throws FlinkKafkaException {
                checkErroneous();

Reply via email to