Hello, why Flink implements different serialization schemes for keyed and non keyed messages for Kafka?
I'm using two ways of loading of messages to Kafka. First way is on-fly loading without Flink by Kafka's means only. In this case I'm using something like: props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName()); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); String key = event.getUserId(); String value = DummyEvent.eventToString(event); producer.send(new ProducerRecord<>(topic, key, value)); And from Flink side I can read it without a key by code like: DataStream<String> dataStream = env .addSource(new FlinkKafkaConsumer08<String>( "topic", new *SimpleStringSchema(),* kafkaProps)); As a result I have pure message without a key. Actually I need a key only for partitioning by Kafka and I have an appropriate class https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java . That is standard java-hash for String class. Also I have other case for messages loading from hadoop to Kafka. I'm using Flink for this purpose. All is ok when I'm using dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME), new SimpleStringSchema(), kafkaProps)); But I need partitioning in Kafka and I changed it to TypeInformation<Tuple2<String, String>> stringStringInfo = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String, String>"); KeyedSerializationSchema<Tuple2<String, String>> schema = new TypeInformationKeyValueSerializationSchema<>(String.class, String.class, env.getConfig()); dataStream .map(json -> { Event event = gson.fromJson(json, Event.class); return new Tuple2<String, String>(event.getUserId(), json); }).returns(stringStringInfo) .setParallelism(partitions) .addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME), schema, kafkaProps)); As a result I see that a message which are serialized by TypeInformationKeyValueSerializationSchema may be deserialized by Flink's SimpleStringSchema() or by Kafka's StringSerializer only with additional first symbol. I guess this is a size of String which is added by org.apache.flink.types.StringValue#writeString. That is the value of a message is not more readable by Spark, Storm, Kafka consumer with standard deserialization.... The question, is it correct behavior of Flink? And should I implement own serializer and partitioner for Flink's Kafka sink if I want to use just simple String serialization which may be read by all other tools without Flink? And second question, why Flink requires to implement a custom partitioner for serialized byte[] stream instead of using of primary objects as in Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner class. PS: I can give a link to sources if you have an access to https://github.com/stratosphere/ private repos. Thanks, best regards