hi. You can use SQL API to parse or write the header in the Kafka record[1] if you are using Flink SQL.
Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata Yaroslav Tkachenko <yaros...@goldsky.com> 于2022年10月13日周四 02:21写道: > Hi, > > You can implement a custom KafkaRecordDeserializationSchema (example > https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer) > and just avoid emitting the record if the header value matches what you > need. > > On Wed, Oct 12, 2022 at 11:04 AM Great Info <gubt...@gmail.com> wrote: > >> I have some flink applications that read streams from Kafka, now >> the producer side code has introduced some additional information in Kafka >> headers while producing records. >> Now I need to change my consumer-side logic to process the records if the >> header contains a specific value, if the header value is different than the >> one I am looking I just need to move forward with the next steam. >> >> I got some sample reference code >> <https://issues.apache.org/jira/browse/KAFKA-4208>but this logic needs >> to deserialize and verify the header. Is there any simple way to ignore the >> record before deserializing? >> >