Re: Network Buffers

2023-06-05 Thread Hangxiang Yu
Hi, Pritam.
This error message indicates that the current configuration of the
network buffer is not enough to handle the current workload.

> What is the meaning of this exception (The total number of network buffers
> is currently set to 22773 of 32768 bytes each)?
>
This just provides some information about the current status of network
buffers (22773 * 32768 bytes ~= 711MB).

How to figure out a good combination of
> ('taskmanager.memory.network.fraction', 'taskmanager.memory.network.min',
> and 'taskmanager.memory.network.max'.)
> for this issue ?
>
IIUC,  There is no absolute standard for setting these parameters.
These parameters may be affected by many factors, such as the data flow
rate, computational complexity, and memory usage of your job.

Some steps to setup and adjust these parameters:
1. Check the available memory on your job.
2. Evaluate the network usage and consider how much memory could be used
for network buffers.
3. Monitor the system and collect metrics such as network throughput,
memory usage.
4. Adjust these parameters if the job has a high network usage or
memory-intensive.

Just a personal and immature suggestion about how to adjust when it's not
enough:  1. Increase taskmanager.memory.network.fraction from 0.1 to 0.2,
or just increase taskmanager.memory.network.max slightly.
2. If the buffer size is too large, it may affect checkpoints. So it's
recommended to combine with buffer debloating.



On Tue, Jun 6, 2023 at 2:44 AM Pritam Agarwala 
wrote:

> Hi All,
>
>
> java.io.IOException: Insufficient number of network buffers: required 2,
> but only 0 available. The total number of network buffers is currently set
> to 22773 of 32768 bytes each.
>
> What is the meaning of this exception (The total number of network buffers
> is currently set to 22773 of 32768 bytes each)?
>
> How to figure out a good combination of
> ('taskmanager.memory.network.fraction', 'taskmanager.memory.network.min',
> and 'taskmanager.memory.network.max'.)
> for this issue ?
>
>
> Thanks & Regards,
> Pritam
>


-- 
Best,
Hangxiang.


Re: WELCOME to user@flink.apache.org

2023-06-05 Thread Shammon FY
Hi,

Thanks Li Shao, I got it. You can analyze the specific memory usage of
metaspace. In fact, we have also encountered the same problem of running
batch jobs in the session cluster, which resulted in metaspace growth due
to the classloader. I have created a PR [1] for FLIP-32265 [2] for this and
try to fix this issue.

[1] https://github.com/apache/flink/pull/22718
[2] https://issues.apache.org/jira/browse/FLINK-32265

Best,
Shammon FY


On Tue, Jun 6, 2023 at 8:58 AM Li Shao  wrote:

> Hi Shammon,
>
> Thank you for your reply. My flink job is using batch mode. For streaming
> mode I never see the increasing metaspace.
>
>
> On Mon, Jun 5, 2023 at 5:55 PM Shammon FY  wrote:
>
>> Hi Li Shao,
>>
>> Currently Flink will create a user classloader in JobManager for each job
>> which can only be released by FullGC, I think this is why JVM metaspace is
>> increasing, you can check it.
>> Are you using session mode? I have a small question: Is your job SQL only
>> without UDF or DataStream? Thanks
>>
>> Best,
>> Shammon FY
>>
>> On Tue, Jun 6, 2023 at 4:27 AM Li Shao  wrote:
>>
>>> Hi,
>>>
>>> Recently I noticed my job manager JVM metaspace is keeping increasing
>>> for running batch flink jobs. I found similar stackoverflow post:
>>> https://stackoverflow.com/questions/73184042/apache-flink-job-manager-node-runs-out-of-jvm-metaspace-quickly,
>>> but there is no solution on this. I am wondering if flink can clean up the
>>> job manager JVM metaspace periodically or it does not. Please suggest.
>>>
>>> Thanks,
>>> Li
>>>
>>> Version: 1.14.4 Flink HA mode
>>> JVM Metaspace: 1.88 GB / 2.00 GB
>>>
>>> JVM (Heap/Non-Heap) Memory
>>> TypeCommittedUsedMaximum
>>> Heap 6.00 GB 3.79 GB 6.00 GB
>>> Non-Heap 2.34 GB 2.25 GB 3.23 GB
>>> Outside JVM Memory
>>> TypeCountUsedCapacity
>>> Direct 927 86.9 MB 87.0 MB
>>> Mapped 0 0 B 0 B
>>> Garbage Collection
>>> CollectorCountTime
>>> G1_Young_Generation 1355 57139
>>> G1_Old_Generation 1 1325
>>>
>>> On Mon, Jun 5, 2023 at 1:21 PM  wrote:
>>>
 Hi! This is the ezmlm program. I'm managing the
 user@flink.apache.org mailing list.

 Acknowledgment: I have added the address

lsgreat12...@gmail.com

 to the user mailing list.

 Welcome to user@flink.apache.org!

 Please save this message so that you know the address you are
 subscribed under, in case you later want to unsubscribe or change your
 subscription address.


 --- Administrative commands for the user list ---

 I can handle administrative requests automatically. Please
 do not send them to the list address! Instead, send
 your message to the correct command address:

 To subscribe to the list, send a message to:


 To remove your address from the list, send a message to:


 Send mail to the following for info and FAQ for this list:



 Similar addresses exist for the digest list:



 To get messages 123 through 145 (a maximum of 100 per request), mail:


 To get an index with subject and author for messages 123-456 , mail:


 They are always returned as sets of 100, max 2000 per request,
 so you'll actually get 100-499.

 To receive all messages with the same subject as message 12345,
 send a short message to:


 The messages should contain one line or word of text to avoid being
 treated as sp@m, but I will ignore their content.
 Only the ADDRESS you send to is important.

 You can start a subscription for an alternate address,
 for example "john@host.domain", just add a hyphen and your
 address (with '=' instead of '@') after the command word:
 

 To stop subscription for this address, mail:
 

 In both cases, I'll send a confirmation message to that address. When
 you receive it, simply reply to it to complete your subscription.

 If despite following these instructions, you do not get the
 desired results, please contact my owner at
 user-ow...@flink.apache.org. Please be patient, my owner is a
 lot slower than I am ;-)

 --- Enclosed is a copy of the request I received.

 Return-Path: 
 Received: (qmail 1410866 invoked by uid 116); 5 Jun 2023 20:20:57 -
 Received: from spamproc1-he-de.apache.org (HELO
 spamproc1-he-de.apache.org) (116.203.196.100)
  by apache.org (qpsmtpd/0.94) with ESMTP; Mon, 05 Jun 2023 20:20:57
 +
 Authentication-Results: apache.org; auth=none
 Received: from localhost (localhost [127.0.0.1])
 by spamproc1-he-de.apache.org (ASF Mail Server at
 spamproc1-he-de.apache.org) with ESMTP id 5CD4B1FF748
 for >>> gmail@flink.apache.org>; Mon,  5 Jun 2023 20:20:57 + (UTC)
 X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
 X-Spam-Flag: NO
 X-Spam-Score: 0.24
 X-Spam-Level:
 

Re: flink14 sql sink kafka error

2023-06-05 Thread Hang Ruan
Hi, 湘晗刚,

This error seem to be an error from the Kafka server. Maybe you should
check whether the Kafka server occurs some error.
Or you could provide more messages about the request. These information is
too short to analyze,

Best,
Hang

湘晗刚 <1016465...@qq.com> 于2023年6月5日周一 15:08写道:

> UnknownServerException :The server experienced an unexpected error when
> processing the reqiest.
> Thanks
> Kobe24
>


Re: Custom Counter on Flink File Source

2023-06-05 Thread Hang Ruan
Hi, Kirti Dhar Upadhyay K.

We could get the metric group from the context, like `SourceReaderContext`
and `SplitEnumeratorContext`. These contexts could be found when creating
readers and enumerators. See `AbstractFileSource#createReader` and
`AbstractFileSource#createEnumerator`.

Best,
Hang

Kirti Dhar Upadhyay K via user  于2023年6月5日周一 22:57写道:

> Hi Community,
>
>
>
> I am trying to add a new counter for number of files collected on Flink
> File Source.
>
> Referring the doc
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I
> understand how to add a new counter on any operator.
>
>
>
> *this.*counter *=* *getRuntimeContext().*getMetricGroup*().*counter*(*
> "myCounter"*);*
>
>
>
> But not able to get this RuntimeContext on FileSource.
>
> Can someone give some clue on this?
>
>
>
> Regards,
>
> Kirti Dhar
>


回复:python 自定义sink

2023-06-05 Thread 王国成
退订
 回复的原邮件 
| 发件人 | smq<374060...@qq.com.invalid> |
| 发送日期 | 2023年05月30日 12:22 |
| 收件人 | user-zh  |
| 主题 | python 自定义sink |
java中可以继承richainkfunction和checkpointedfunction 两个类实现自定义sink。在python中如何实现这种功能呢

Re: WELCOME to user@flink.apache.org

2023-06-05 Thread Shammon FY
Hi Li Shao,

Currently Flink will create a user classloader in JobManager for each job
which can only be released by FullGC, I think this is why JVM metaspace is
increasing, you can check it.
Are you using session mode? I have a small question: Is your job SQL only
without UDF or DataStream? Thanks

Best,
Shammon FY

On Tue, Jun 6, 2023 at 4:27 AM Li Shao  wrote:

> Hi,
>
> Recently I noticed my job manager JVM metaspace is keeping increasing for
> running batch flink jobs. I found similar stackoverflow post:
> https://stackoverflow.com/questions/73184042/apache-flink-job-manager-node-runs-out-of-jvm-metaspace-quickly,
> but there is no solution on this. I am wondering if flink can clean up the
> job manager JVM metaspace periodically or it does not. Please suggest.
>
> Thanks,
> Li
>
> Version: 1.14.4 Flink HA mode
> JVM Metaspace: 1.88 GB / 2.00 GB
>
> JVM (Heap/Non-Heap) Memory
> TypeCommittedUsedMaximum
> Heap 6.00 GB 3.79 GB 6.00 GB
> Non-Heap 2.34 GB 2.25 GB 3.23 GB
> Outside JVM Memory
> TypeCountUsedCapacity
> Direct 927 86.9 MB 87.0 MB
> Mapped 0 0 B 0 B
> Garbage Collection
> CollectorCountTime
> G1_Young_Generation 1355 57139
> G1_Old_Generation 1 1325
>
> On Mon, Jun 5, 2023 at 1:21 PM  wrote:
>
>> Hi! This is the ezmlm program. I'm managing the
>> user@flink.apache.org mailing list.
>>
>> Acknowledgment: I have added the address
>>
>>lsgreat12...@gmail.com
>>
>> to the user mailing list.
>>
>> Welcome to user@flink.apache.org!
>>
>> Please save this message so that you know the address you are
>> subscribed under, in case you later want to unsubscribe or change your
>> subscription address.
>>
>>
>> --- Administrative commands for the user list ---
>>
>> I can handle administrative requests automatically. Please
>> do not send them to the list address! Instead, send
>> your message to the correct command address:
>>
>> To subscribe to the list, send a message to:
>>
>>
>> To remove your address from the list, send a message to:
>>
>>
>> Send mail to the following for info and FAQ for this list:
>>
>>
>>
>> Similar addresses exist for the digest list:
>>
>>
>>
>> To get messages 123 through 145 (a maximum of 100 per request), mail:
>>
>>
>> To get an index with subject and author for messages 123-456 , mail:
>>
>>
>> They are always returned as sets of 100, max 2000 per request,
>> so you'll actually get 100-499.
>>
>> To receive all messages with the same subject as message 12345,
>> send a short message to:
>>
>>
>> The messages should contain one line or word of text to avoid being
>> treated as sp@m, but I will ignore their content.
>> Only the ADDRESS you send to is important.
>>
>> You can start a subscription for an alternate address,
>> for example "john@host.domain", just add a hyphen and your
>> address (with '=' instead of '@') after the command word:
>> 
>>
>> To stop subscription for this address, mail:
>> 
>>
>> In both cases, I'll send a confirmation message to that address. When
>> you receive it, simply reply to it to complete your subscription.
>>
>> If despite following these instructions, you do not get the
>> desired results, please contact my owner at
>> user-ow...@flink.apache.org. Please be patient, my owner is a
>> lot slower than I am ;-)
>>
>> --- Enclosed is a copy of the request I received.
>>
>> Return-Path: 
>> Received: (qmail 1410866 invoked by uid 116); 5 Jun 2023 20:20:57 -
>> Received: from spamproc1-he-de.apache.org (HELO
>> spamproc1-he-de.apache.org) (116.203.196.100)
>>  by apache.org (qpsmtpd/0.94) with ESMTP; Mon, 05 Jun 2023 20:20:57 +
>> Authentication-Results: apache.org; auth=none
>> Received: from localhost (localhost [127.0.0.1])
>> by spamproc1-he-de.apache.org (ASF Mail Server at
>> spamproc1-he-de.apache.org) with ESMTP id 5CD4B1FF748
>> for > gmail@flink.apache.org>; Mon,  5 Jun 2023 20:20:57 + (UTC)
>> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
>> X-Spam-Flag: NO
>> X-Spam-Score: 0.24
>> X-Spam-Level:
>> X-Spam-Status: No, score=0.24 tagged_above=-999 required=6.31
>> tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>> DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25,
>> HTML_MESSAGE=0.2,
>> RCVD_IN_DNSWL_BLOCKED=0.001, RCVD_IN_MSPIKE_H2=-0.001,
>> SPF_PASS=-0.001, T_SCC_BODY_TEXT_LINE=-0.01, URIBL_BLOCKED=0.001]
>> autolearn=disabled
>> Authentication-Results: spamproc1-he-de.apache.org (amavisd-new);
>> dkim=pass (2048-bit key) header.d=gmail.com
>> Received: from mx1-ec2-va.apache.org ([116.203.227.195])
>> by localhost (spamproc1-he-de.apache.org [116.203.196.100])
>> (amavisd-new, port 10024)
>> with ESMTP id 7fWybnrBQhFr
>> for > gmail@flink.apache.org>;
>> Mon,  5 Jun 2023 20:20:56 + (UTC)
>> Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.217.51;
>> helo=mail-vs1-f51.google.com; envelope-from=lsgreat12...@gmail.com;
>> 

Re: Bulk storage of protobuf records in files

2023-06-05 Thread Shammon FY
Hi Ryan,

What I usually encounter is writing Protobuf format data to systems such as
Kafka, and I have never encountered writing to a file yet.

Best,
Shammon FY


On Mon, Jun 5, 2023 at 10:50 PM Martijn Visser 
wrote:

> Hey Ryan,
>
> I've never encountered a use case for writing Protobuf encoded files to a
> filesystem.
>
> Best regards,
>
> Martijn
>
> On Fri, May 26, 2023 at 6:39 PM Ryan Skraba via user <
> user@flink.apache.org> wrote:
>
>> Hello all!
>>
>> I discovered while investigating FLINK-32008[1] that we can write to the
>> filesystem connector with the protobuf format, but today, the resulting
>> file is pretty unlikely to be useful or rereadable.
>>
>> There's no real standard for storing many protobuf messages in a single
>> file container, although the documentation mentions writing size-delimited
>> messages sequentially[2].  In practice, I've never encountered protobuf
>> binaries stored on filesystems without using some other sort of "framing"
>> (like how parquet can be accessed with either an Avro or a protobuf
>> oriented API).
>>
>> Does anyone have any use cases for bulk storage of protobuf messages on a
>> filesystem?  Should these files just be considered temporary storage for
>> Flink jobs, or do they need to be compatible with other systems?  Is there
>> a splittable / compressable file format?
>>
>> The alternative might be to just forbid file storage for protobuf
>> messages!  Any opinions?
>>
>> All my best, Ryan Skraba
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-32008
>> [2]: https://protobuf.dev/programming-guides/techniques/#streaming
>>
>


Re: WELCOME to user@flink.apache.org

2023-06-05 Thread Li Shao
Hi,

Recently I noticed my job manager JVM metaspace is keeping increasing for
running batch flink jobs. I found similar stackoverflow post:
https://stackoverflow.com/questions/73184042/apache-flink-job-manager-node-runs-out-of-jvm-metaspace-quickly,
but there is no solution on this. I am wondering if flink can clean up the
job manager JVM metaspace periodically or it does not. Please suggest.

Thanks,
Li

Version: 1.14.4 Flink HA mode
JVM Metaspace: 1.88 GB / 2.00 GB

JVM (Heap/Non-Heap) Memory
TypeCommittedUsedMaximum
Heap 6.00 GB 3.79 GB 6.00 GB
Non-Heap 2.34 GB 2.25 GB 3.23 GB
Outside JVM Memory
TypeCountUsedCapacity
Direct 927 86.9 MB 87.0 MB
Mapped 0 0 B 0 B
Garbage Collection
CollectorCountTime
G1_Young_Generation 1355 57139
G1_Old_Generation 1 1325

On Mon, Jun 5, 2023 at 1:21 PM  wrote:

> Hi! This is the ezmlm program. I'm managing the
> user@flink.apache.org mailing list.
>
> Acknowledgment: I have added the address
>
>lsgreat12...@gmail.com
>
> to the user mailing list.
>
> Welcome to user@flink.apache.org!
>
> Please save this message so that you know the address you are
> subscribed under, in case you later want to unsubscribe or change your
> subscription address.
>
>
> --- Administrative commands for the user list ---
>
> I can handle administrative requests automatically. Please
> do not send them to the list address! Instead, send
> your message to the correct command address:
>
> To subscribe to the list, send a message to:
>
>
> To remove your address from the list, send a message to:
>
>
> Send mail to the following for info and FAQ for this list:
>
>
>
> Similar addresses exist for the digest list:
>
>
>
> To get messages 123 through 145 (a maximum of 100 per request), mail:
>
>
> To get an index with subject and author for messages 123-456 , mail:
>
>
> They are always returned as sets of 100, max 2000 per request,
> so you'll actually get 100-499.
>
> To receive all messages with the same subject as message 12345,
> send a short message to:
>
>
> The messages should contain one line or word of text to avoid being
> treated as sp@m, but I will ignore their content.
> Only the ADDRESS you send to is important.
>
> You can start a subscription for an alternate address,
> for example "john@host.domain", just add a hyphen and your
> address (with '=' instead of '@') after the command word:
> 
>
> To stop subscription for this address, mail:
> 
>
> In both cases, I'll send a confirmation message to that address. When
> you receive it, simply reply to it to complete your subscription.
>
> If despite following these instructions, you do not get the
> desired results, please contact my owner at
> user-ow...@flink.apache.org. Please be patient, my owner is a
> lot slower than I am ;-)
>
> --- Enclosed is a copy of the request I received.
>
> Return-Path: 
> Received: (qmail 1410866 invoked by uid 116); 5 Jun 2023 20:20:57 -
> Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org)
> (116.203.196.100)
>  by apache.org (qpsmtpd/0.94) with ESMTP; Mon, 05 Jun 2023 20:20:57 +
> Authentication-Results: apache.org; auth=none
> Received: from localhost (localhost [127.0.0.1])
> by spamproc1-he-de.apache.org (ASF Mail Server at
> spamproc1-he-de.apache.org) with ESMTP id 5CD4B1FF748
> for  gmail@flink.apache.org>; Mon,  5 Jun 2023 20:20:57 + (UTC)
> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
> X-Spam-Flag: NO
> X-Spam-Score: 0.24
> X-Spam-Level:
> X-Spam-Status: No, score=0.24 tagged_above=-999 required=6.31
> tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
> DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25,
> HTML_MESSAGE=0.2,
> RCVD_IN_DNSWL_BLOCKED=0.001, RCVD_IN_MSPIKE_H2=-0.001,
> SPF_PASS=-0.001, T_SCC_BODY_TEXT_LINE=-0.01, URIBL_BLOCKED=0.001]
> autolearn=disabled
> Authentication-Results: spamproc1-he-de.apache.org (amavisd-new);
> dkim=pass (2048-bit key) header.d=gmail.com
> Received: from mx1-ec2-va.apache.org ([116.203.227.195])
> by localhost (spamproc1-he-de.apache.org [116.203.196.100])
> (amavisd-new, port 10024)
> with ESMTP id 7fWybnrBQhFr
> for  gmail@flink.apache.org>;
> Mon,  5 Jun 2023 20:20:56 + (UTC)
> Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.217.51;
> helo=mail-vs1-f51.google.com; envelope-from=lsgreat12...@gmail.com;
> receiver=
> Received: from mail-vs1-f51.google.com (mail-vs1-f51.google.com
> [209.85.217.51])
> by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org)
> with ESMTPS id CD21BBE717
> for  gmail@flink.apache.org>; Mon,  5 Jun 2023 20:20:55 + (UTC)
> Received: by mail-vs1-f51.google.com with SMTP id
> ada2fe7eead31-439494cbfedso1268755137.3
> for  gmail@flink.apache.org>; Mon, 05 Jun 2023 13:20:55 -0700 (PDT)
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
> 

Network Buffers

2023-06-05 Thread Pritam Agarwala
Hi All,


java.io.IOException: Insufficient number of network buffers: required 2,
but only 0 available. The total number of network buffers is currently set
to 22773 of 32768 bytes each.

What is the meaning of this exception (The total number of network buffers
is currently set to 22773 of 32768 bytes each)?

How to figure out a good combination of
('taskmanager.memory.network.fraction', 'taskmanager.memory.network.min',
and 'taskmanager.memory.network.max'.)
for this issue ?


Thanks & Regards,
Pritam


Custom Counter on Flink File Source

2023-06-05 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I am trying to add a new counter for number of files collected on Flink File 
Source.
Referring the doc  
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I 
understand how to add a new counter on any operator.

this.counter = getRuntimeContext().getMetricGroup().counter("myCounter");

But not able to get this RuntimeContext on FileSource.
Can someone give some clue on this?

Regards,
Kirti Dhar


Re: Bulk storage of protobuf records in files

2023-06-05 Thread Martijn Visser
Hey Ryan,

I've never encountered a use case for writing Protobuf encoded files to a
filesystem.

Best regards,

Martijn

On Fri, May 26, 2023 at 6:39 PM Ryan Skraba via user 
wrote:

> Hello all!
>
> I discovered while investigating FLINK-32008[1] that we can write to the
> filesystem connector with the protobuf format, but today, the resulting
> file is pretty unlikely to be useful or rereadable.
>
> There's no real standard for storing many protobuf messages in a single
> file container, although the documentation mentions writing size-delimited
> messages sequentially[2].  In practice, I've never encountered protobuf
> binaries stored on filesystems without using some other sort of "framing"
> (like how parquet can be accessed with either an Avro or a protobuf
> oriented API).
>
> Does anyone have any use cases for bulk storage of protobuf messages on a
> filesystem?  Should these files just be considered temporary storage for
> Flink jobs, or do they need to be compatible with other systems?  Is there
> a splittable / compressable file format?
>
> The alternative might be to just forbid file storage for protobuf
> messages!  Any opinions?
>
> All my best, Ryan Skraba
>
> [1]: https://issues.apache.org/jira/browse/FLINK-32008
> [2]: https://protobuf.dev/programming-guides/techniques/#streaming
>


Re: flink14 batch mode can read.iceberg table but stream mode can not

2023-06-05 Thread Martijn Visser
Hi,

This question is better suited for the Iceberg community, since they've
built the Flink-Iceberg integration.

Best regards,

Martijn

On Wed, May 31, 2023 at 9:48 AM 湘晗刚 <1016465...@qq.com> wrote:

> flink14 batch mode can read iceberg table but stream mode can not ,why?
> Thanks in advance
> Kobe24
>


Re: Using pre-registered schemas with avro-confluent-registry format is not possible

2023-06-05 Thread Martijn Visser
Hi Jannik,

Can you share how you've set those properties, because I've been able to
use this without any problems.

Best regards,

Martijn

On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik 
wrote:

> Hello Thias,
>
>
>
> thank you for your answer.
>
>
>
> We've tested registering an existing (byte equal) schema a second time,
> but unfortunately the schema registry does still deny the request.
>
>
>
> Your last suggestion sounds promising, but I think there are some edge
> cases with this approach that will still cause an error. For example when
> writing to a new topic that’s empty, querying this topic before won't
> return any records and therefore the schema would not be put into the
> schemaRegistryClient cache.
>
>
>
> I'm still preferring a flag for the "avro-confluent-registry" format that
> will disable registering schemas and instead the format will just try to
> get the ID for a schema string from the registry. If there is an ID for
> that schema, Flink will use it. If there is none, an exception should be
> thrown.
>
> What do you think of that?
>
>
>
> Best regards,
>
> Jannik
>
>
>
>
>
> *Von:* Schwalbe Matthias 
> *Gesendet:* Mittwoch, 31. Mai 2023 13:33
> *An:* Schmeier, Jannik ; user@flink.apache.org
> *Betreff:* RE: Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
>
>
> Hello Jannik,
>
>
>
> Some things to consider (I had a similar problem a couple of years before):
>
>- The schemaRegistryClient actually caches schema ids, so it will hit
>the schema registry only once,
>- The schema registered in schema registry needs to be byte-equal,
>otherwise schema registry considers it to be a new schema (version)
>- … to my best knowledge writing an existing schema to the schema
>registry does not fail because it is actually not written
>   - Could be that this is not entirely true as we had to replace the
>   whole schemaRegistryClient with our own implementation because the 
> existing
>   one could not be reconfigured to accept compressed answers from our r/o
>   proxy
>- if you manage to fill the cache of your schemaRegistryClient with
>the exact schema (e.g. by querying it beforehand) you might never run into
>the trouble
>
>
>
> Hope this helps … keep us posted 
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Schmeier, Jannik 
> *Sent:* Wednesday, May 31, 2023 12:44 PM
> *To:* user@flink.apache.org
> *Subject:* Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello,
>
>
>
> I'm trying to use the avro-confluent-registry format with the Confluent
> Cloud Schema Registry in our company.
>
> Our schemas are managed via Terraform and global write access is denied
> for all Kafka clients in our environments (or at least in production).
>
> Therefore, when using the avro-confluent-registry format I'm getting an
> error when Flink is trying to serialize a row:
>
> java.lang.RuntimeException: Failed to serialize row.
>
> at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
> ~[?:?]
>
> at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
> ~[?:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
> ~[?:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
> ~[?:?]
>
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
> ~[?:?]
>
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
> at
> org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
> at
> org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> 

Re: Flink Stream processing with Dynamic Conditional Sinks

2023-06-05 Thread Andrew Otto
I've considered a similar question before:  Dynamic addition of Sinks /
based on some external configuration.

The answer I've mostly been given is: this is a bad idea.  The checkpoint
state that flink uses for job recovery is dependent on the topology of the
job, and dynamically adding more sinks changes this topology.

You might be able to get away with dynamically submitting multiple jobs in
a single Flink application, but you'd have to restart the application every
time you add a new job.

I've not tried this though, so hopefully someone smarter can come in and
advise as well :)

-Andrew Otto
 Wikimedia Foundation


On Mon, Jun 5, 2023 at 8:27 AM Yogesh Rao  wrote:

> Hello,
>
> I am trying out flink for one stream processing scenario and was wondering
> if it can be achieved using Apache Flink. So any pointers regarding how it
> can be achieved will be of great help.
>
> Scenario :-
>
> A kafka topic has the input for stream processing, multiple applications
> lets say A & B would be publishing their message to the same topic (Topic
> X) with different keys (keys being application names). These messages are
> read by stream processing applications and processed eventually landing in
> sinks specific for A & B. The end result is to have this entire piece
> dynamic so that new applications C,D,E etc.. can be automatically
> accommodated.
>
> ATM i am able to figure out the kafka source and stream processing part.
> What I am not clear is incase of streaming would conditional multiple sinks
> work ? i.e. for Application A data lands into Sink A, Application B -> Sink
> B and so on .
>
> From Implementation I could probably split the stream and pass those
> streams to respective tables. However all this needs to happen dynamically.
>
> Would Apache Flink be able to support this ? if yes how?
>
> I am using Apache Flink 1.17.1 with the pipeline written in Java
>
> Thank you in advance,
>
> Regards,
> -Yogesh
>


Flink Stream processing with Dynamic Conditional Sinks

2023-06-05 Thread Yogesh Rao
Hello,

I am trying out flink for one stream processing scenario and was wondering
if it can be achieved using Apache Flink. So any pointers regarding how it
can be achieved will be of great help.

Scenario :-

A kafka topic has the input for stream processing, multiple applications
lets say A & B would be publishing their message to the same topic (Topic
X) with different keys (keys being application names). These messages are
read by stream processing applications and processed eventually landing in
sinks specific for A & B. The end result is to have this entire piece
dynamic so that new applications C,D,E etc.. can be automatically
accommodated.

ATM i am able to figure out the kafka source and stream processing part.
What I am not clear is incase of streaming would conditional multiple sinks
work ? i.e. for Application A data lands into Sink A, Application B -> Sink
B and so on .

>From Implementation I could probably split the stream and pass those
streams to respective tables. However all this needs to happen dynamically.

Would Apache Flink be able to support this ? if yes how?

I am using Apache Flink 1.17.1 with the pipeline written in Java

Thank you in advance,

Regards,
-Yogesh


flink14 sql sink kafka error

2023-06-05 Thread 湘晗刚
UnknownServerException :The server experienced an unexpected error when 
processing the reqiest.
Thanks
Kobe24

Re: High Start-Delay And Aligned Checkpointing Causing Timeout.

2023-06-05 Thread Hangxiang Yu
Hi, Pritam.
I think the definition  works for aligned checkpoint and
unaligned checkpoint: "The alignment duration, which is defined as the time
between receiving first and the last checkpoint barrier. "
IIUC, Unaligned Checkpoints will help to speed up the flow of barriers
greatly by putting barriers of input queue into output queue when they
arrive at the operator.
But usually there is still a time gap in the propagation of different
barriers. For example, Barriers could be put into the output queue only
when the current record is processed so that different operators have time
gaps for barriers.
So I agree with David that it's expected.

On Sat, Jun 3, 2023 at 2:30 AM David Anderson  wrote:

> I'm not 100% certain what "alignment duration" is measuring exactly in
> the context of unaligned checkpoints -- however, even with unaligned
> checkpointing each operator still has to wait until all of the
> barriers are present in the operator's input queues. It doesn't have
> to wait for the barriers to be aligned, but they do have to all be
> present in order for the operator to know which in-flight messages to
> include in the checkpoint.
>
> David
>
> On Fri, Jun 2, 2023 at 12:38 PM Pritam Agarwala
>  wrote:
> >
> > Hi All,
> >
> >
> > I have enabled checkpointing in production. In peak hours, checkpointing
> is falling due to timeout. When checked in sub-task level  , I could see it
> is taking some time for alignment and there is also a start delay.  I think
> the start delay will be due to back-pressure.
> > As a first step , I have enabled unalignment checkpointing in my test
> env. But I could still see it  is taking time for alignment.  Is this
> expected behaviour. ??
> >
> > Screenshot for your reference:
> >   In test environment:
> >
> >
> > In prod environment :
> >
> >
> >
> >
> >
> >
> > Thanks & Regards,
> > Pritam
> >
>


-- 
Best,
Hangxiang.