The FlinkKafkaProducer API seems more difficult to use than it should be.

The API requires you pass it a SerializationSchema or a
KeyedSerializationSchema, but the Kafka producer already has a
serialization API.  Requiring a serializer in the Flink API precludes the
use of the Kafka serializers.  For instance, they preclude the use of the
Confluent KafkaAvroSerializer class that makes use of the Confluent Schema
Registry.  Ideally, the serializer would be optional, so as to allow the
Kafka producer serializers to handle the task.

In addition, the KeyedSerializationSchema conflates message key extraction
with key serialization.  If the serializer were optional, to allow the
Kafka producer serializers to take over, you'd still need to extract a key
from the message.

And given that the key may not be part of the message you want to write to
Kafka, an upstream step may have to package the key with the message to
make both available to the sink, for instance in a tuple. That means you
also need to define a method to extract the message to write to Kafka from
the element passed into the sink by Flink.

In summary, there should be separation of extraction of the key and message
from the element passed into the sink from serialization, and the
serialization step should be optional.

