Hello,

  why Flink implements different serialization schemes for keyed and non
keyed messages for Kafka?

  I'm using two ways of loading of messages to Kafka. First way is on-fly
loading without Flink by Kafka's means only. In this case I'm using
something like:

props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
String key = event.getUserId();
String  value = DummyEvent.eventToString(event);
producer.send(new ProducerRecord<>(topic, key, value));


 And from Flink side I can read it without a key by code like:

DataStream<String> dataStream = env
        .addSource(new FlinkKafkaConsumer08<String>(
                "topic",
                new *SimpleStringSchema(),* kafkaProps));

As a result I have pure message without a key. Actually I need a key only
for partitioning by Kafka and I have an appropriate class
https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java
. That is standard java-hash for String class.


  Also I have other case for messages loading from hadoop to Kafka. I'm
using Flink for this purpose. All is ok when I'm using

dataStream.addSink(new
FlinkKafkaProducer08<>(config.getProperty("topic",
Config.INPUT_TOPIC_NAME),
        new SimpleStringSchema(),
        kafkaProps));

But I need partitioning in Kafka and I changed it to

TypeInformation<Tuple2<String, String>> stringStringInfo =
        TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String,
String>");

KeyedSerializationSchema<Tuple2<String, String>> schema =
        new TypeInformationKeyValueSerializationSchema<>(String.class,
String.class, env.getConfig());

dataStream
        .map(json -> {
            Event event = gson.fromJson(json, Event.class);
            return new Tuple2<String, String>(event.getUserId(), json);
        }).returns(stringStringInfo)
        .setParallelism(partitions)
        .addSink(new
FlinkKafkaProducer08<>(config.getProperty("topic",
Config.INPUT_TOPIC_NAME),
                schema,
                kafkaProps));

As a result I see that a message which are serialized by
TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
SimpleStringSchema() or by Kafka's StringSerializer only with additional
first symbol. I guess this is a size of String which is added by
org.apache.flink.types.StringValue#writeString. That is the value of a
message is not more readable by Spark, Storm, Kafka consumer with standard
deserialization....

   The question, is it correct behavior of Flink? And should I implement
own serializer and partitioner for Flink's Kafka sink if I want to use just
simple String serialization which may be read by all other tools without
Flink?

   And second question, why Flink requires to implement a custom
partitioner for serialized byte[] stream instead of using of primary
objects as in Kafka's partitioner? Or instead of just allowing to use
Kafka's partitioner class.

  PS: I can give a link to sources if you have an access to
https://github.com/stratosphere/ private repos.

Thanks,
best regards

Reply via email to