RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Kirti Dhar Upadhyay K via user
Thanks Jiabao and Yaroslav for your quick responses.

Regards,
Kirti Dhar

From: Yaroslav Tkachenko 
Sent: 01 February 2024 21:42
Cc: user@flink.apache.org
Subject: Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

The schema registry support is provided in 
ConfluentRegistryAvroSerializationSchema class (flink-avro-confluent-registry 
package).

On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko 
mailto:yaros...@goldsky.com>> wrote:
You can also implement a custom KafkaRecordSerializationSchema, which allows 
creating a ProducerRecord (see "serialize" method) - you can set message key, 
headers, etc. manually. It's supported in older versions.

On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun 
mailto:jiabao@xtransfer.cn>> wrote:
Sorry, I didn't notice the version information.
This feature was completed in FLINK-31049[1] and will be released in version 
3.1.0 of Kafka.
The release process[2] is currently underway and will be completed soon.

However, version 3.1.0 does not promise support for Flink 1.16.
If you need to use this feature, you can consider cherry-picking this commit[3] 
to the v3.0 branch and package it for your own use.

Regarding Schema Registry, I am not familiar with this feature and I apologize 
for not being able to provide an answer.

Best,
Jiabao

[1] https://issues.apache.org/jira/browse/FLINK-31049
[2] 
https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
[3] 
https://github.com/apache/flink-connector-kafka/pull/18


On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> Hi Jiabao,
>
> Thanks for reply.
>
> Currently I am using Flink 1.16.1 and I am not able to find any 
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here: 
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
> But this doesn't seem released yet. Can you please point me towards correct 
> Flink version?
>
> Also, any help on question 1 regarding Schema Registry?
>
> Regards,
> Kirti Dhar
>
> -Original Message-
> From: Jiabao Sun mailto:ji...@xtransfer.cn>>
> Sent: 01 February 2024 13:29
> To: user@flink.apache.org
> Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
>
> Hi Kirti,
>
> Kafka Sink supports sending messages with headers.
> You should implement a HeaderProvider to extract headers from input element.
>
>
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(brokers)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic("topic-name")
> .setValueSerializationSchema(new SimpleStringSchema())
> .setHeaderProvider(new HeaderProvider() {
> @Override
> public Headers getHeaders(String input) {
> //TODO: implements it
> return null;
> }
> })
> .build()
> )
> .build();
>
> Best,
> Jiabao
>
>
> On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > Hi Mates,
> >
> > I have below queries regarding Flink Kafka Sink.
> >
> >
> >   1.  Does Kafka Sink support schema registry? If yes, is there any 
> > documentations to configure the same?
> >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with 
> > headers?
> >
> >
> > Regards,
> > Kirti Dhar
> >
> >
>


Jobmanager restart after it has been requested to stop

2024-02-01 Thread Liting Liu (litiliu) via user
Hi, community:
  I'm running a Flink 1.14.3 job with flink-Kubernetes-operator-1.6.0 on 
the AWS. I found my flink jobmananger container's thread restarted after this 
flinkdeployment has been requested to stop, here is the log of jobmanager:

2024-02-01 21:57:48,977 tn="flink-akka.actor.default-dispatcher-107478" INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application CANCELED:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: CANCELED
  at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$6(ApplicationDispatcherBootstrap.java:353)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
~[?:1.8.0_322]
2024-02-01 21:57:48,984 tn="flink-akka.actor.default-dispatcher-107484" INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2024-02-01 21:57:49,103 tn="flink-akka.actor.default-dispatcher-107478" INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2024-02-01 21:57:49,105 tn="flink-akka.actor.default-dispatcher-107484" INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopped 
dispatcher akka.tcp://flink@
2024-02-01 21:57:49,112 
tn="AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1" INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping Akka 
RPC service.
2024-02-01 21:57:49,286 tn="flink-metrics-15" INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2024-02-01 21:57:49,387 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Terminating 
cluster entrypoint process KubernetesApplicationClusterEntrypoint with exit 
code 0.
2024-02-01 21:57:53,828 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
2024-02-01 21:57:54,287 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting 
KubernetesApplicationClusterEntrypoint.


I found the JM main container's containerId remains the same, after the JM 
auto-restart.
why did this process start to run after it had been requested to stop?



Re: Parallelism and Target TPS

2024-02-01 Thread Zhanghao Chen
Hi Patricia,

Flink will create one Kafka consumer per parallelism, however, you'll need some 
testing to measure the capability of a single task. Usu, one consumer can 
consume at a much higher rate than 1 record per second.

Best,
Zhanghao Chen

From: patricia lee 
Sent: Thursday, February 1, 2024 15:26
To: user@flink.apache.org 
Subject: Parallelism and Target TPS

Hi,

I have a flink job that consumes from kafka and sinks it to an API. I need to 
ensure that my flink job can send within the rate limit 200 tps, we are 
planning to increase the parallelism, but I do not know the right number to 
set. 1 parallelism  does equal to 1 consumer? So if 200, should we set it to 
200 parallelism too?

I only created a simple retry that will send it again if 421 http error is 
received.


Any advise, appreciated. Thanks


Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-01 Thread Prabhjot Bharaj via user
Hi Feng,

Thanks for your prompt response.
If we were to solve this in Flink, my higher level viewpoint is:

1. First to implement Broadcast join in Flink Streaming SQL, that works
across Table api (e.g. via a `left.join(right, ,
join_type="broadcast")
2. Then, support a Broadcast hint that would utilize this new join based on
the hint type

What do you think about this ?
Would you have some pointers on how/where to start on the first part ? (I'm
thinking we'd have to extend the Broadcast state pattern for this purpose)

Thanks,
Prabhjot

On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:

> Hi Prabhjot
>
> I think this is a reasonable scenario. If there is a large table and a
> very small table for regular join, without broadcasting the regular join,
> it can easily cause data skew.
> We have also encountered similar problems too. Currently, we can only copy
> multiple copies of the small table using the union all and append random
> values to alleviate data skewness.
>
>
> Best,
> Feng
>
> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
> user@flink.apache.org> wrote:
>
>> Hello folks,
>>
>>
>> We have a use case where we have a few stream-stream joins, requiring us
>> to join a very large table with a much smaller table, essentially enriching
>> the large table with a permutation on the smaller table (Consider deriving
>> all orders/sessions for a new location). Given the nature of the dataset,
>> if we use a typical join that uses Hash distribution to co-locate the
>> records for each join key, we end up with a very skewed join (a few task
>> slots getting all of the work, as against a good distribution).
>>
>>
>> We’ve internally implemented a Salting based solution where we salt the
>> smaller table and join it with the larger table. While this works in the
>> POC stage, we’d like to leverage flink as much as possible to do such a
>> join.
>>
>>
>> By the nature of the problem, a broadcast join seems theoretically
>> helpful. We’ve done an exploration on query hints supported in Flink,
>> starting with this FLIP
>> 
>> and this FLIP
>> 
>> .
>>
>>
>> Currently, the Optimizer doesn't consider the Broadcast hint in the
>> `Exchange` step of the join, when creating the physical plan (Possibly
>> because the hint would require the stream-stream join to also support
>> Broadcast join with SQL)
>>
>>
>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
>> parsed from the query:
>>
>>
>> ```sql
>>
>> ...
>>
>> ...
>>
>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
>> options:[gpla)
>>
>> ...
>>
>> ```
>>
>>
>> However, the Flink optimizer ignores the hint and still represents the
>> join as a regular `hash` join in the `Exchange` step:
>>
>>
>> ```sql
>>
>> ...
>>
>> ...
>>
>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>
>> ...
>>
>> ```
>>
>>
>> In Flink `StreamExecExchange`, the translation happens only via the
>> `HASH` distribution type
>> .
>> unlike in the Flink `BatchExecExchange`, the translation can happen via a
>> multitude of options
>> 
>> (`HASH/BROADCAST`).
>>
>>
>>
>> Quoting this Flink mailing list discussion
>>  for
>> the FLIP that implemented the Broadcast join hint for batch sql:
>>
>>
>> > But currently, only in batch the optimizer has different Join
>> strategies for Join and
>>
>> > there is no choice of join strategies in the stream. The join hints
>> listed in the current
>>
>> > flip should be ignored (maybe can be warned) in streaming mode. When in
>> the
>>
>> > future the stream mode has the choice of join strategies, I think
>> that's a good time > to discuss that the join hint can affect the streaming
>> SQL.
>>
>>
>> What do you folks think about the possibility of a Broadcast join for
>> Streaming Sql along with its corresponding Broadcast hint, that lets the
>> user choose the kind of distribution they’d want with the dataset ?
>>
>> Happy to learn more about this and hopefully implement it, if it doesn’t
>> sound like a terrible idea.
>>
>>
>> Thanks,
>>
>> Prabhjot
>>
>>
>>
>>


Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-01 Thread Feng Jin
Hi Prabhjot

I think this is a reasonable scenario. If there is a large table and a very
small table for regular join, without broadcasting the regular join, it can
easily cause data skew.
We have also encountered similar problems too. Currently, we can only copy
multiple copies of the small table using the union all and append random
values to alleviate data skewness.


Best,
Feng

On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
user@flink.apache.org> wrote:

> Hello folks,
>
>
> We have a use case where we have a few stream-stream joins, requiring us
> to join a very large table with a much smaller table, essentially enriching
> the large table with a permutation on the smaller table (Consider deriving
> all orders/sessions for a new location). Given the nature of the dataset,
> if we use a typical join that uses Hash distribution to co-locate the
> records for each join key, we end up with a very skewed join (a few task
> slots getting all of the work, as against a good distribution).
>
>
> We’ve internally implemented a Salting based solution where we salt the
> smaller table and join it with the larger table. While this works in the
> POC stage, we’d like to leverage flink as much as possible to do such a
> join.
>
>
> By the nature of the problem, a broadcast join seems theoretically
> helpful. We’ve done an exploration on query hints supported in Flink,
> starting with this FLIP
> 
> and this FLIP
> 
> .
>
>
> Currently, the Optimizer doesn't consider the Broadcast hint in the
> `Exchange` step of the join, when creating the physical plan (Possibly
> because the hint would require the stream-stream join to also support
> Broadcast join with SQL)
>
>
> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
> parsed from the query:
>
>
> ```sql
>
> ...
>
> ...
>
> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla)
>
> ...
>
> ```
>
>
> However, the Flink optimizer ignores the hint and still represents the
> join as a regular `hash` join in the `Exchange` step:
>
>
> ```sql
>
> ...
>
> ...
>
> :- Exchange(distribution=[hash[shop_id, join_key]])
>
> ...
>
> ```
>
>
> In Flink `StreamExecExchange`, the translation happens only via the
> `HASH` distribution type
> .
> unlike in the Flink `BatchExecExchange`, the translation can happen via a
> multitude of options
> 
> (`HASH/BROADCAST`).
>
>
>
> Quoting this Flink mailing list discussion
>  for
> the FLIP that implemented the Broadcast join hint for batch sql:
>
>
> > But currently, only in batch the optimizer has different Join strategies
> for Join and
>
> > there is no choice of join strategies in the stream. The join hints
> listed in the current
>
> > flip should be ignored (maybe can be warned) in streaming mode. When in
> the
>
> > future the stream mode has the choice of join strategies, I think that's
> a good time > to discuss that the join hint can affect the streaming SQL.
>
>
> What do you folks think about the possibility of a Broadcast join for
> Streaming Sql along with its corresponding Broadcast hint, that lets the
> user choose the kind of distribution they’d want with the dataset ?
>
> Happy to learn more about this and hopefully implement it, if it doesn’t
> sound like a terrible idea.
>
>
> Thanks,
>
> Prabhjot
>
>
>
>


Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-01 Thread Prabhjot Bharaj via user
Hello folks,


We have a use case where we have a few stream-stream joins, requiring us to
join a very large table with a much smaller table, essentially enriching
the large table with a permutation on the smaller table (Consider deriving
all orders/sessions for a new location). Given the nature of the dataset,
if we use a typical join that uses Hash distribution to co-locate the
records for each join key, we end up with a very skewed join (a few task
slots getting all of the work, as against a good distribution).


We’ve internally implemented a Salting based solution where we salt the
smaller table and join it with the larger table. While this works in the
POC stage, we’d like to leverage flink as much as possible to do such a
join.


By the nature of the problem, a broadcast join seems theoretically helpful.
We’ve done an exploration on query hints supported in Flink, starting with this
FLIP

and this FLIP

.


Currently, the Optimizer doesn't consider the Broadcast hint in the
`Exchange` step of the join, when creating the physical plan (Possibly
because the hint would require the stream-stream join to also support
Broadcast join with SQL)


Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
parsed from the query:


```sql

...

...

joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla)

...

```


However, the Flink optimizer ignores the hint and still represents the join
as a regular `hash` join in the `Exchange` step:


```sql

...

...

:- Exchange(distribution=[hash[shop_id, join_key]])

...

```


In Flink `StreamExecExchange`, the translation happens only via the `HASH`
distribution type
.
unlike in the Flink `BatchExecExchange`, the translation can happen via a
multitude of options

(`HASH/BROADCAST`).



Quoting this Flink mailing list discussion
 for the
FLIP that implemented the Broadcast join hint for batch sql:


> But currently, only in batch the optimizer has different Join strategies
for Join and

> there is no choice of join strategies in the stream. The join hints
listed in the current

> flip should be ignored (maybe can be warned) in streaming mode. When in
the

> future the stream mode has the choice of join strategies, I think that's
a good time > to discuss that the join hint can affect the streaming SQL.


What do you folks think about the possibility of a Broadcast join for
Streaming Sql along with its corresponding Broadcast hint, that lets the
user choose the kind of distribution they’d want with the dataset ?

Happy to learn more about this and hopefully implement it, if it doesn’t
sound like a terrible idea.


Thanks,

Prabhjot


Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Yaroslav Tkachenko
The schema registry support is provided
in ConfluentRegistryAvroSerializationSchema class
(flink-avro-confluent-registry package).

On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko 
wrote:

> You can also implement a custom KafkaRecordSerializationSchema, which
> allows creating a ProducerRecord (see "serialize" method) - you can set
> message key, headers, etc. manually. It's supported in older versions.
>
> On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun  wrote:
>
>> Sorry, I didn't notice the version information.
>> This feature was completed in FLINK-31049[1] and will be released in
>> version 3.1.0 of Kafka.
>> The release process[2] is currently underway and will be completed soon.
>>
>> However, version 3.1.0 does not promise support for Flink 1.16.
>> If you need to use this feature, you can consider cherry-picking this
>> commit[3] to the v3.0 branch and package it for your own use.
>>
>> Regarding Schema Registry, I am not familiar with this feature and I
>> apologize for not being able to provide an answer.
>>
>> Best,
>> Jiabao
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-31049
>> [2]
>> https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
>> [3] https://github.com/apache/flink-connector-kafka/pull/18
>>
>>
>> On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
>> > Hi Jiabao,
>> >
>> > Thanks for reply.
>> >
>> > Currently I am using Flink 1.16.1 and I am not able to find any
>> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
>> > Although on github I found this support here:
>> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
>> > But this doesn't seem released yet. Can you please point me towards
>> correct Flink version?
>> >
>> > Also, any help on question 1 regarding Schema Registry?
>> >
>> > Regards,
>> > Kirti Dhar
>> >
>> > -Original Message-
>> > From: Jiabao Sun 
>> > Sent: 01 February 2024 13:29
>> > To: user@flink.apache.org
>> > Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
>> >
>> > Hi Kirti,
>> >
>> > Kafka Sink supports sending messages with headers.
>> > You should implement a HeaderProvider to extract headers from input
>> element.
>> >
>> >
>> > KafkaSink sink = KafkaSink.builder()
>> > .setBootstrapServers(brokers)
>> > .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>> > .setTopic("topic-name")
>> > .setValueSerializationSchema(new SimpleStringSchema())
>> > .setHeaderProvider(new HeaderProvider() {
>> > @Override
>> > public Headers getHeaders(String input) {
>> > //TODO: implements it
>> > return null;
>> > }
>> > })
>> > .build()
>> > )
>> > .build();
>> >
>> > Best,
>> > Jiabao
>> >
>> >
>> > On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
>> > > Hi Mates,
>> > >
>> > > I have below queries regarding Flink Kafka Sink.
>> > >
>> > >
>> > >   1.  Does Kafka Sink support schema registry? If yes, is there any
>> documentations to configure the same?
>> > >   2.  Does Kafka Sink support sending  messages (ProducerRecord)
>> with headers?
>> > >
>> > >
>> > > Regards,
>> > > Kirti Dhar
>> > >
>> > >
>> >
>>
>


Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Yaroslav Tkachenko
You can also implement a custom KafkaRecordSerializationSchema, which
allows creating a ProducerRecord (see "serialize" method) - you can set
message key, headers, etc. manually. It's supported in older versions.

On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun  wrote:

> Sorry, I didn't notice the version information.
> This feature was completed in FLINK-31049[1] and will be released in
> version 3.1.0 of Kafka.
> The release process[2] is currently underway and will be completed soon.
>
> However, version 3.1.0 does not promise support for Flink 1.16.
> If you need to use this feature, you can consider cherry-picking this
> commit[3] to the v3.0 branch and package it for your own use.
>
> Regarding Schema Registry, I am not familiar with this feature and I
> apologize for not being able to provide an answer.
>
> Best,
> Jiabao
>
> [1] https://issues.apache.org/jira/browse/FLINK-31049
> [2]
> https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
> [3] https://github.com/apache/flink-connector-kafka/pull/18
>
>
> On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> > Hi Jiabao,
> >
> > Thanks for reply.
> >
> > Currently I am using Flink 1.16.1 and I am not able to find any
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> > Although on github I found this support here:
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
> > But this doesn't seem released yet. Can you please point me towards
> correct Flink version?
> >
> > Also, any help on question 1 regarding Schema Registry?
> >
> > Regards,
> > Kirti Dhar
> >
> > -Original Message-
> > From: Jiabao Sun 
> > Sent: 01 February 2024 13:29
> > To: user@flink.apache.org
> > Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
> >
> > Hi Kirti,
> >
> > Kafka Sink supports sending messages with headers.
> > You should implement a HeaderProvider to extract headers from input
> element.
> >
> >
> > KafkaSink sink = KafkaSink.builder()
> > .setBootstrapServers(brokers)
> > .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> > .setTopic("topic-name")
> > .setValueSerializationSchema(new SimpleStringSchema())
> > .setHeaderProvider(new HeaderProvider() {
> > @Override
> > public Headers getHeaders(String input) {
> > //TODO: implements it
> > return null;
> > }
> > })
> > .build()
> > )
> > .build();
> >
> > Best,
> > Jiabao
> >
> >
> > On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > > Hi Mates,
> > >
> > > I have below queries regarding Flink Kafka Sink.
> > >
> > >
> > >   1.  Does Kafka Sink support schema registry? If yes, is there any
> documentations to configure the same?
> > >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with
> headers?
> > >
> > >
> > > Regards,
> > > Kirti Dhar
> > >
> > >
> >
>


RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Jiabao Sun
Sorry, I didn't notice the version information. 
This feature was completed in FLINK-31049[1] and will be released in version 
3.1.0 of Kafka. 
The release process[2] is currently underway and will be completed soon.

However, version 3.1.0 does not promise support for Flink 1.16.
If you need to use this feature, you can consider cherry-picking this commit[3] 
to the v3.0 branch and package it for your own use.

Regarding Schema Registry, I am not familiar with this feature and I apologize 
for not being able to provide an answer.

Best,
Jiabao

[1] https://issues.apache.org/jira/browse/FLINK-31049
[2] 
https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
[3] https://github.com/apache/flink-connector-kafka/pull/18


On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> Hi Jiabao,
> 
> Thanks for reply.
> 
> Currently I am using Flink 1.16.1 and I am not able to find any 
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here: 
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
> But this doesn't seem released yet. Can you please point me towards correct 
> Flink version?
> 
> Also, any help on question 1 regarding Schema Registry?
> 
> Regards,
> Kirti Dhar
> 
> -Original Message-
> From: Jiabao Sun  
> Sent: 01 February 2024 13:29
> To: user@flink.apache.org
> Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
> 
> Hi Kirti,
> 
> Kafka Sink supports sending messages with headers.
> You should implement a HeaderProvider to extract headers from input element.
> 
> 
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(brokers)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic("topic-name")
> .setValueSerializationSchema(new SimpleStringSchema())
> .setHeaderProvider(new HeaderProvider() {
> @Override
> public Headers getHeaders(String input) {
> //TODO: implements it
> return null;
> }
> })
> .build()
> )
> .build();
> 
> Best,
> Jiabao
> 
> 
> On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > Hi Mates,
> > 
> > I have below queries regarding Flink Kafka Sink.
> > 
> > 
> >   1.  Does Kafka Sink support schema registry? If yes, is there any 
> > documentations to configure the same?
> >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with 
> > headers?
> > 
> > 
> > Regards,
> > Kirti Dhar
> > 
> > 
> 

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Kirti Dhar Upadhyay K via user
Hi Jiabao,

Thanks for reply.

Currently I am using Flink 1.16.1 and I am not able to find any HeaderProvider 
setter method in class KafkaRecordSerializationSchemaBuilder.
Although on github I found this support here: 
https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
But this doesn't seem released yet. Can you please point me towards correct 
Flink version?

Also, any help on question 1 regarding Schema Registry?

Regards,
Kirti Dhar

-Original Message-
From: Jiabao Sun  
Sent: 01 February 2024 13:29
To: user@flink.apache.org
Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers

Hi Kirti,

Kafka Sink supports sending messages with headers.
You should implement a HeaderProvider to extract headers from input element.


KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.setHeaderProvider(new HeaderProvider() {
@Override
public Headers getHeaders(String input) {
//TODO: implements it
return null;
}
})
.build()
)
.build();

Best,
Jiabao


On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> Hi Mates,
> 
> I have below queries regarding Flink Kafka Sink.
> 
> 
>   1.  Does Kafka Sink support schema registry? If yes, is there any 
> documentations to configure the same?
>   2.  Does Kafka Sink support sending  messages (ProducerRecord) with headers?
> 
> 
> Regards,
> Kirti Dhar
> 
> 


[ANNOUNCE] Apache flink-connector-opensearch 1.1.0 released

2024-02-01 Thread Danny Cranmer
The Apache Flink community is very happy to announce the release of Apache
flink-connector-opensearch 1.1.0. This release supports Apache Flink 1.17
and 1.18.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353141

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Danny