Hi Gordon, Thanks for the advice. Following it I’ve implemented the Keyed(De)SerializationSchema and am able to further emit the metadata to downstream operators.
Regards, Dominik > On 7 Mar 2017, at 07:08, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Hi Dominik, > > I would recommend implementing a `KeyedSerializationSchema`, and supply it to > the constructor when initializing your FlinkKafkaConsumer. > > The `KeyedDeserializationSchema` exposes the metadata of the record such as > offset, partition, and key. In the schema, you can implement your own logic > of turning the binary data from Kafka into your own data types that carry the > metadata information along with the record value, e.g. POJOs or Tuples. > > Some links for more info on this: > 1. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema> > 2. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class> > > The metadata `KeyedDeserializationSchema` exposes is extracted from > `ConsumerRecord`s within the Kafka connector, so it doesn’t make sense to > wrap it up again into a `ConsumerRecord`. The schema interface exposes all > available metadata of the record, so it should be sufficient. > > Cheers, > Gordon > > On March 7, 2017 at 3:51:59 AM, Dominik Safaric (dominiksafa...@gmail.com > <mailto:dominiksafa...@gmail.com>) wrote: > >> Hi, >> >> Unfortunately I cannot find the option of using raw ConsumerRecord<K,V> >> instances when creating a Kafka data stream. >> >> In general, I would like to use an instance of the mentioned type because >> our use case requires certain metadata such as record offset and partition. >> >> So far I’ve examined the source code of the Kafka connector and checked the >> docs, but unfortunately I could not find the option of creating a data >> stream of the type DataStream<ConsumerRecord<K,V>>. >> >> Am I missing something or in order to have this ability I have to implement >> it myself and build Flink from source? >> >> Thanks in advance, >> Dominik