Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

2023-02-17 Thread Jing Ge
Hi Alex,

Just assigned the ticket to you. Kafka Header should be supported as we
started implementing the KafkaSink. Thanks for driving this!

Best regards,
Jing

On Fri, Feb 17, 2023 at 3:35 PM Alex Gout 
wrote:

> Hey :)
> I created a JIRA  for
> it. Can someone assign it to me?
>
> On Mon, Feb 13, 2023 at 3:17 PM Márton Balassi 
> wrote:
>
> > Hi Alex,
> >
> > Please do share, this comes up somewhat frequently.
> >
> > Marton
> >
> > On Mon, Feb 13, 2023 at 7:44 PM Őrhidi Mátyás 
> > wrote:
> >
> > > Hi Alex,
> > >
> > > This is a reasonable request IMO. I've recently bumped into this topic
> > > myself. This could be handy for supporting schema registries in Kafka
> to
> > > Kafka scenarios for example. Looking forward to your proposal.
> > >
> > > Cheers,
> > > Matyas
> > >
> > > On Mon, Feb 13, 2023 at 7:08 AM Alex Gout
>  > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'm currently working on a few pipelines sinking to Kafka. The
> > downstream
> > > > consumers of the sink topics expect some Kafka headers to be set.
> > However
> > > > the default org.apache.flink.connector.kafka.sink.KafkaSink does
> > > > not support adding Kafka record headers.
> > > >
> > > > I tracked the code path down to
> > > >
> > >
> >
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> > > > where the RecordProducer is created.
> > > > It is relatively simple to add support for record headers by adding a
> > > > "HeaderProducer" next to the key and value serializers and using the
> > > > appropriate RecordProducer constructor.
> > > >
> > > > For the benefit of my own projects, I have implemented this header
> > > support
> > > > and would be eager to share my implementation as a proposal if
> there's
> > a
> > > > consensus this would indeed be a valuable addition.
> > > >
> > > > Please let me know what you think.
> > > > Thanks,
> > > > - Alex
> > > >
> > >
> >
>


Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

2023-02-17 Thread Alex Gout
Hey :)
I created a JIRA  for
it. Can someone assign it to me?

On Mon, Feb 13, 2023 at 3:17 PM Márton Balassi 
wrote:

> Hi Alex,
>
> Please do share, this comes up somewhat frequently.
>
> Marton
>
> On Mon, Feb 13, 2023 at 7:44 PM Őrhidi Mátyás 
> wrote:
>
> > Hi Alex,
> >
> > This is a reasonable request IMO. I've recently bumped into this topic
> > myself. This could be handy for supporting schema registries in Kafka to
> > Kafka scenarios for example. Looking forward to your proposal.
> >
> > Cheers,
> > Matyas
> >
> > On Mon, Feb 13, 2023 at 7:08 AM Alex Gout  >
> > wrote:
> >
> > > Hi all,
> > >
> > > I'm currently working on a few pipelines sinking to Kafka. The
> downstream
> > > consumers of the sink topics expect some Kafka headers to be set.
> However
> > > the default org.apache.flink.connector.kafka.sink.KafkaSink does
> > > not support adding Kafka record headers.
> > >
> > > I tracked the code path down to
> > >
> >
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> > > where the RecordProducer is created.
> > > It is relatively simple to add support for record headers by adding a
> > > "HeaderProducer" next to the key and value serializers and using the
> > > appropriate RecordProducer constructor.
> > >
> > > For the benefit of my own projects, I have implemented this header
> > support
> > > and would be eager to share my implementation as a proposal if there's
> a
> > > consensus this would indeed be a valuable addition.
> > >
> > > Please let me know what you think.
> > > Thanks,
> > > - Alex
> > >
> >
>


Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

2023-02-13 Thread Márton Balassi
Hi Alex,

Please do share, this comes up somewhat frequently.

Marton

On Mon, Feb 13, 2023 at 7:44 PM Őrhidi Mátyás 
wrote:

> Hi Alex,
>
> This is a reasonable request IMO. I've recently bumped into this topic
> myself. This could be handy for supporting schema registries in Kafka to
> Kafka scenarios for example. Looking forward to your proposal.
>
> Cheers,
> Matyas
>
> On Mon, Feb 13, 2023 at 7:08 AM Alex Gout 
> wrote:
>
> > Hi all,
> >
> > I'm currently working on a few pipelines sinking to Kafka. The downstream
> > consumers of the sink topics expect some Kafka headers to be set. However
> > the default org.apache.flink.connector.kafka.sink.KafkaSink does
> > not support adding Kafka record headers.
> >
> > I tracked the code path down to
> >
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> > where the RecordProducer is created.
> > It is relatively simple to add support for record headers by adding a
> > "HeaderProducer" next to the key and value serializers and using the
> > appropriate RecordProducer constructor.
> >
> > For the benefit of my own projects, I have implemented this header
> support
> > and would be eager to share my implementation as a proposal if there's a
> > consensus this would indeed be a valuable addition.
> >
> > Please let me know what you think.
> > Thanks,
> > - Alex
> >
>


Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

2023-02-13 Thread Őrhidi Mátyás
Hi Alex,

This is a reasonable request IMO. I've recently bumped into this topic
myself. This could be handy for supporting schema registries in Kafka to
Kafka scenarios for example. Looking forward to your proposal.

Cheers,
Matyas

On Mon, Feb 13, 2023 at 7:08 AM Alex Gout 
wrote:

> Hi all,
>
> I'm currently working on a few pipelines sinking to Kafka. The downstream
> consumers of the sink topics expect some Kafka headers to be set. However
> the default org.apache.flink.connector.kafka.sink.KafkaSink does
> not support adding Kafka record headers.
>
> I tracked the code path down to
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> where the RecordProducer is created.
> It is relatively simple to add support for record headers by adding a
> "HeaderProducer" next to the key and value serializers and using the
> appropriate RecordProducer constructor.
>
> For the benefit of my own projects, I have implemented this header support
> and would be eager to share my implementation as a proposal if there's a
> consensus this would indeed be a valuable addition.
>
> Please let me know what you think.
> Thanks,
> - Alex
>


Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

2023-02-13 Thread Alex Gout
Hi all,

I'm currently working on a few pipelines sinking to Kafka. The downstream
consumers of the sink topics expect some Kafka headers to be set. However
the default org.apache.flink.connector.kafka.sink.KafkaSink does
not support adding Kafka record headers.

I tracked the code path down to
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
where the RecordProducer is created.
It is relatively simple to add support for record headers by adding a
"HeaderProducer" next to the key and value serializers and using the
appropriate RecordProducer constructor.

For the benefit of my own projects, I have implemented this header support
and would be eager to share my implementation as a proposal if there's a
consensus this would indeed be a valuable addition.

Please let me know what you think.
Thanks,
- Alex