Hi Gordon,

Thanks for the advice. Following it I’ve implemented the 
Keyed(De)SerializationSchema and am able to further emit the metadata to 
downstream operators. 

Regards,
Dominik

> On 7 Mar 2017, at 07:08, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Hi Dominik,
> 
> I would recommend implementing a `KeyedSerializationSchema`, and supply it to 
> the constructor when initializing your FlinkKafkaConsumer.
> 
> The `KeyedDeserializationSchema` exposes the metadata of the record such as 
> offset, partition, and key. In the schema, you can implement your own logic 
> of turning the binary data from Kafka into your own data types that carry the 
> metadata information along with the record value, e.g. POJOs or Tuples.
> 
> Some links for more info on this:
> 1. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema>
> 2. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class>
> 
> The metadata `KeyedDeserializationSchema` exposes is extracted from 
> `ConsumerRecord`s within the Kafka connector, so it doesn’t make sense to 
> wrap it up again into a `ConsumerRecord`. The schema interface exposes all 
> available metadata of the record, so it should be sufficient.
> 
> Cheers,
> Gordon
> 
> On March 7, 2017 at 3:51:59 AM, Dominik Safaric (dominiksafa...@gmail.com 
> <mailto:dominiksafa...@gmail.com>) wrote:
> 
>> Hi, 
>> 
>> Unfortunately I cannot find the option of using raw ConsumerRecord<K,V> 
>> instances when creating a Kafka data stream.  
>> 
>> In general, I would like to use an instance of the mentioned type because 
>> our use case requires certain metadata such as record offset and partition. 
>> 
>> So far I’ve examined the source code of the Kafka connector and checked the 
>> docs, but unfortunately I could not find the option of creating a data 
>> stream of the type DataStream<ConsumerRecord<K,V>>.  
>> 
>> Am I missing something or in order to have this ability I have to implement 
>> it myself and build Flink from source?  
>> 
>> Thanks in advance, 
>> Dominik 

Reply via email to