Re: How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Junrui Lee
Hi Sachin,

Yes, that's correct. To resume from a savepoint, use the command bin/flink
run -s  . You can find more details in the Flink
documentation on [1].

Additionally, information on how to trigger a savepoint can be found in the
section for triggering savepoints [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints

Best,
Junrui

Sachin Mittal  于2024年5月25日周六 20:35写道:

> Hi,
> I have a long running yarn cluster and I submit my streaming job using the
> following command:
>
> flink run -m yarn-cluster -yid application_1473169569237_0001
> /usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt
> --output file:///output/
>
> Let's say I want to stop this job, make updates to the jar and some new
> input arguments and restart the job from the savepoint. How would I do the
> same ?
>
> Would this be the right command ?
>
> flink run -s /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab -m
> yarn-cluster -yid application_1473169569237_0001
> /usr/lib/flink/examples/streaming/WordCount-Updated.jar --input
> file:///input1.txt --output file:///output1/ --newarg value123
>
> Thanks
> Sachin
>
>


How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Sachin Mittal
Hi,
I have a long running yarn cluster and I submit my streaming job using the
following command:

flink run -m yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt
--output file:///output/

Let's say I want to stop this job, make updates to the jar and some new
input arguments and restart the job from the savepoint. How would I do the
same ?

Would this be the right command ?

flink run -s /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab -m
yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount-Updated.jar --input
file:///input1.txt --output file:///output1/ --newarg value123

Thanks
Sachin


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-25 Thread Péter Váry
> Could the table/database sync with schema evolution (without Flink job
restarts!) potentially work with the Iceberg sink?

Making  this work would be a good addition to the Iceberg-Flink connector.
It is definitely doable, but not a single PR sized task. If you want to try
your hands on it, I will try to find time to review your plans/code, so
your code could be incorporated into the upcoming releases.

Thanks,
Peter



On Fri, May 24, 2024, 17:07 Andrew Otto  wrote:

> > What is not is the automatic syncing of entire databases, with schema
> evolution and detection of new (and dropped?) tables. :)
> Wait.  Is it?
>
> > Flink CDC supports synchronizing all tables of source database instance
> to downstream in one job by configuring the captured database list and
> table list.
>
>
> On Fri, May 24, 2024 at 11:04 AM Andrew Otto  wrote:
>
>> Indeed, using Flink-CDC to write to Flink Sink Tables, including Iceberg,
>> is supported.
>>
>> What is not is the automatic syncing of entire databases, with schema
>> evolution and detection of new (and dropped?) tables.  :)
>>
>>
>>
>>
>> On Fri, May 24, 2024 at 8:58 AM Giannis Polyzos 
>> wrote:
>>
>>> https://nightlies.apache.org/flink/flink-cdc-docs-stable/
>>> All these features come from Flink cdc itself. Because Paimon and Flink
>>> cdc are projects native to Flink there is a strong integration between them.
>>> (I believe it’s on the roadmap to support iceberg as well)
>>>
>>> On Fri, 24 May 2024 at 3:52 PM, Andrew Otto  wrote:
>>>
 > I’m curious if there is any reason for choosing Iceberg instead of
 Paimon

 No technical reason that I'm aware of.  We are using it mostly because
 of momentum.  We looked at Flink Table Store (before it was Paimon), but
 decided it was too early and the docs were too sparse at the time to really
 consider it.

 > Especially for a use case like CDC that iceberg struggles to support.

 We aren't doing any CDC right now (for many reasons), but I have never
 seen a feature like Paimon's database sync before.  One job to sync and
 evolve an entire database?  That is amazing.

 If we could do this with Iceberg, we might be able to make an argument
 to product managers to push for CDC.



 On Fri, May 24, 2024 at 8:36 AM Giannis Polyzos 
 wrote:

> I’m curious if there is any reason for choosing Iceberg instead of
> Paimon (other than - iceberg is more popular).
> Especially for a use case like CDC that iceberg struggles to support.
>
> On Fri, 24 May 2024 at 3:22 PM, Andrew Otto 
> wrote:
>
>> Interesting thank you!
>>
>> I asked this in the Paimon users group:
>>
>> How coupled to Paimon catalogs and tables is the cdc part of Paimon?
>> RichCdcMultiplexRecord
>> 
>>  and
>> related code seem incredibly useful even outside of the context of the
>> Paimon table format.
>>
>> I'm asking because the database sync action
>> 
>>  feature
>> is amazing.  At the Wikimedia Foundation, we are on an all-in journey 
>> with
>> Iceberg.  I'm wondering how hard it would be to extract the CDC logic 
>> from
>> Paimon and abstract the Sink bits.
>>
>> Could the table/database sync with schema evolution (without Flink
>> job restarts!) potentially work with the Iceberg sink?
>>
>>
>>
>>
>> On Thu, May 23, 2024 at 4:34 PM Péter Váry <
>> peter.vary.apa...@gmail.com> wrote:
>>
>>> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on
>>> the wire which contain not only the data, but the schema as well.
>>> With Iceberg we currently only send the row data, and expect to
>>> receive the schema on job start - this is more performant than sending 
>>> the
>>> schema all the time, but has the obvious issue that it is not able to
>>> handle the schema changes. Another part of the dynamic schema
>>> synchronization is the update of the Iceberg table schema - the schema
>>> should be updated for all of the writers and the committer / but only a
>>> single schema change commit is needed (allowed) to the Iceberg table.
>>>
>>> This is a very interesting, but non-trivial change.
>>>
>>> [1]
>>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
>>>
>>> Andrew Otto  ezt írta (időpont: 2024. máj. 23.,
>>> Cs, 21:59):
>>>
 Ah I see, so just auto-restarting to pick up new stuff.

 I'd love to understand how Paimon does this.  They have a database
 sync action

退订

2024-05-24 Thread 蒋少东
退订


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-24 Thread Andrew Otto
> What is not is the automatic syncing of entire databases, with schema
evolution and detection of new (and dropped?) tables. :)
Wait.  Is it?

> Flink CDC supports synchronizing all tables of source database instance
to downstream in one job by configuring the captured database list and
table list.


On Fri, May 24, 2024 at 11:04 AM Andrew Otto  wrote:

> Indeed, using Flink-CDC to write to Flink Sink Tables, including Iceberg,
> is supported.
>
> What is not is the automatic syncing of entire databases, with schema
> evolution and detection of new (and dropped?) tables.  :)
>
>
>
>
> On Fri, May 24, 2024 at 8:58 AM Giannis Polyzos 
> wrote:
>
>> https://nightlies.apache.org/flink/flink-cdc-docs-stable/
>> All these features come from Flink cdc itself. Because Paimon and Flink
>> cdc are projects native to Flink there is a strong integration between them.
>> (I believe it’s on the roadmap to support iceberg as well)
>>
>> On Fri, 24 May 2024 at 3:52 PM, Andrew Otto  wrote:
>>
>>> > I’m curious if there is any reason for choosing Iceberg instead of
>>> Paimon
>>>
>>> No technical reason that I'm aware of.  We are using it mostly because
>>> of momentum.  We looked at Flink Table Store (before it was Paimon), but
>>> decided it was too early and the docs were too sparse at the time to really
>>> consider it.
>>>
>>> > Especially for a use case like CDC that iceberg struggles to support.
>>>
>>> We aren't doing any CDC right now (for many reasons), but I have never
>>> seen a feature like Paimon's database sync before.  One job to sync and
>>> evolve an entire database?  That is amazing.
>>>
>>> If we could do this with Iceberg, we might be able to make an argument
>>> to product managers to push for CDC.
>>>
>>>
>>>
>>> On Fri, May 24, 2024 at 8:36 AM Giannis Polyzos 
>>> wrote:
>>>
 I’m curious if there is any reason for choosing Iceberg instead of
 Paimon (other than - iceberg is more popular).
 Especially for a use case like CDC that iceberg struggles to support.

 On Fri, 24 May 2024 at 3:22 PM, Andrew Otto  wrote:

> Interesting thank you!
>
> I asked this in the Paimon users group:
>
> How coupled to Paimon catalogs and tables is the cdc part of Paimon?
> RichCdcMultiplexRecord
> 
>  and
> related code seem incredibly useful even outside of the context of the
> Paimon table format.
>
> I'm asking because the database sync action
> 
>  feature
> is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
> Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
> Paimon and abstract the Sink bits.
>
> Could the table/database sync with schema evolution (without Flink job
> restarts!) potentially work with the Iceberg sink?
>
>
>
>
> On Thu, May 23, 2024 at 4:34 PM Péter Váry <
> peter.vary.apa...@gmail.com> wrote:
>
>> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the
>> wire which contain not only the data, but the schema as well.
>> With Iceberg we currently only send the row data, and expect to
>> receive the schema on job start - this is more performant than sending 
>> the
>> schema all the time, but has the obvious issue that it is not able to
>> handle the schema changes. Another part of the dynamic schema
>> synchronization is the update of the Iceberg table schema - the schema
>> should be updated for all of the writers and the committer / but only a
>> single schema change commit is needed (allowed) to the Iceberg table.
>>
>> This is a very interesting, but non-trivial change.
>>
>> [1]
>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
>>
>> Andrew Otto  ezt írta (időpont: 2024. máj. 23.,
>> Cs, 21:59):
>>
>>> Ah I see, so just auto-restarting to pick up new stuff.
>>>
>>> I'd love to understand how Paimon does this.  They have a database
>>> sync action
>>> 
>>> which will sync entire databases, handle schema evolution, and I'm 
>>> pretty
>>> sure (I think I saw this in my local test) also pick up new tables.
>>>
>>>
>>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>>>
>>> I'm sure that Paimon table format is great, but at Wikimedia
>>> Foundation we are on the Iceberg train.  Imagine if there was a 
>>> 

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-24 Thread Andrew Otto
Indeed, using Flink-CDC to write to Flink Sink Tables, including Iceberg,
is supported.

What is not is the automatic syncing of entire databases, with schema
evolution and detection of new (and dropped?) tables.  :)




On Fri, May 24, 2024 at 8:58 AM Giannis Polyzos 
wrote:

> https://nightlies.apache.org/flink/flink-cdc-docs-stable/
> All these features come from Flink cdc itself. Because Paimon and Flink
> cdc are projects native to Flink there is a strong integration between them.
> (I believe it’s on the roadmap to support iceberg as well)
>
> On Fri, 24 May 2024 at 3:52 PM, Andrew Otto  wrote:
>
>> > I’m curious if there is any reason for choosing Iceberg instead of
>> Paimon
>>
>> No technical reason that I'm aware of.  We are using it mostly because of
>> momentum.  We looked at Flink Table Store (before it was Paimon), but
>> decided it was too early and the docs were too sparse at the time to really
>> consider it.
>>
>> > Especially for a use case like CDC that iceberg struggles to support.
>>
>> We aren't doing any CDC right now (for many reasons), but I have never
>> seen a feature like Paimon's database sync before.  One job to sync and
>> evolve an entire database?  That is amazing.
>>
>> If we could do this with Iceberg, we might be able to make an argument to
>> product managers to push for CDC.
>>
>>
>>
>> On Fri, May 24, 2024 at 8:36 AM Giannis Polyzos 
>> wrote:
>>
>>> I’m curious if there is any reason for choosing Iceberg instead of
>>> Paimon (other than - iceberg is more popular).
>>> Especially for a use case like CDC that iceberg struggles to support.
>>>
>>> On Fri, 24 May 2024 at 3:22 PM, Andrew Otto  wrote:
>>>
 Interesting thank you!

 I asked this in the Paimon users group:

 How coupled to Paimon catalogs and tables is the cdc part of Paimon?
 RichCdcMultiplexRecord
 
  and
 related code seem incredibly useful even outside of the context of the
 Paimon table format.

 I'm asking because the database sync action
 
  feature
 is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
 Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
 Paimon and abstract the Sink bits.

 Could the table/database sync with schema evolution (without Flink job
 restarts!) potentially work with the Iceberg sink?




 On Thu, May 23, 2024 at 4:34 PM Péter Váry 
 wrote:

> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the
> wire which contain not only the data, but the schema as well.
> With Iceberg we currently only send the row data, and expect to
> receive the schema on job start - this is more performant than sending the
> schema all the time, but has the obvious issue that it is not able to
> handle the schema changes. Another part of the dynamic schema
> synchronization is the update of the Iceberg table schema - the schema
> should be updated for all of the writers and the committer / but only a
> single schema change commit is needed (allowed) to the Iceberg table.
>
> This is a very interesting, but non-trivial change.
>
> [1]
> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
>
> Andrew Otto  ezt írta (időpont: 2024. máj. 23.,
> Cs, 21:59):
>
>> Ah I see, so just auto-restarting to pick up new stuff.
>>
>> I'd love to understand how Paimon does this.  They have a database
>> sync action
>> 
>> which will sync entire databases, handle schema evolution, and I'm pretty
>> sure (I think I saw this in my local test) also pick up new tables.
>>
>>
>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>>
>> I'm sure that Paimon table format is great, but at Wikimedia
>> Foundation we are on the Iceberg train.  Imagine if there was a flink-cdc
>> full database sync to Flink IcebergSink!
>>
>>
>>
>>
>> On Thu, May 23, 2024 at 3:47 PM Péter Váry <
>> peter.vary.apa...@gmail.com> wrote:
>>
>>> I will ask Marton about the slides.
>>>
>>> The solution was something like this in a nutshell:
>>> - Make sure that on job start the latest Iceberg schema is read from
>>> the Iceberg table
>>> - Throw a SuppressRestartsException when data arrives with the wrong
>>> schema
>>> - Use Flink Kubernetes 

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-24 Thread Giannis Polyzos
https://nightlies.apache.org/flink/flink-cdc-docs-stable/
All these features come from Flink cdc itself. Because Paimon and Flink cdc
are projects native to Flink there is a strong integration between them.
(I believe it’s on the roadmap to support iceberg as well)

On Fri, 24 May 2024 at 3:52 PM, Andrew Otto  wrote:

> > I’m curious if there is any reason for choosing Iceberg instead of
> Paimon
>
> No technical reason that I'm aware of.  We are using it mostly because of
> momentum.  We looked at Flink Table Store (before it was Paimon), but
> decided it was too early and the docs were too sparse at the time to really
> consider it.
>
> > Especially for a use case like CDC that iceberg struggles to support.
>
> We aren't doing any CDC right now (for many reasons), but I have never
> seen a feature like Paimon's database sync before.  One job to sync and
> evolve an entire database?  That is amazing.
>
> If we could do this with Iceberg, we might be able to make an argument to
> product managers to push for CDC.
>
>
>
> On Fri, May 24, 2024 at 8:36 AM Giannis Polyzos 
> wrote:
>
>> I’m curious if there is any reason for choosing Iceberg instead of Paimon
>> (other than - iceberg is more popular).
>> Especially for a use case like CDC that iceberg struggles to support.
>>
>> On Fri, 24 May 2024 at 3:22 PM, Andrew Otto  wrote:
>>
>>> Interesting thank you!
>>>
>>> I asked this in the Paimon users group:
>>>
>>> How coupled to Paimon catalogs and tables is the cdc part of Paimon?
>>> RichCdcMultiplexRecord
>>> 
>>>  and
>>> related code seem incredibly useful even outside of the context of the
>>> Paimon table format.
>>>
>>> I'm asking because the database sync action
>>> 
>>>  feature
>>> is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
>>> Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
>>> Paimon and abstract the Sink bits.
>>>
>>> Could the table/database sync with schema evolution (without Flink job
>>> restarts!) potentially work with the Iceberg sink?
>>>
>>>
>>>
>>>
>>> On Thu, May 23, 2024 at 4:34 PM Péter Váry 
>>> wrote:
>>>
 If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the
 wire which contain not only the data, but the schema as well.
 With Iceberg we currently only send the row data, and expect to receive
 the schema on job start - this is more performant than sending the schema
 all the time, but has the obvious issue that it is not able to handle the
 schema changes. Another part of the dynamic schema synchronization is the
 update of the Iceberg table schema - the schema should be updated for all
 of the writers and the committer / but only a single schema change commit
 is needed (allowed) to the Iceberg table.

 This is a very interesting, but non-trivial change.

 [1]
 https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java

 Andrew Otto  ezt írta (időpont: 2024. máj. 23.,
 Cs, 21:59):

> Ah I see, so just auto-restarting to pick up new stuff.
>
> I'd love to understand how Paimon does this.  They have a database
> sync action
> 
> which will sync entire databases, handle schema evolution, and I'm pretty
> sure (I think I saw this in my local test) also pick up new tables.
>
>
> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>
> I'm sure that Paimon table format is great, but at Wikimedia
> Foundation we are on the Iceberg train.  Imagine if there was a flink-cdc
> full database sync to Flink IcebergSink!
>
>
>
>
> On Thu, May 23, 2024 at 3:47 PM Péter Váry <
> peter.vary.apa...@gmail.com> wrote:
>
>> I will ask Marton about the slides.
>>
>> The solution was something like this in a nutshell:
>> - Make sure that on job start the latest Iceberg schema is read from
>> the Iceberg table
>> - Throw a SuppressRestartsException when data arrives with the wrong
>> schema
>> - Use Flink Kubernetes Operator to restart your failed jobs by
>> setting
>> kubernetes.operator.job.restart.failed
>>
>> Thanks, Peter
>>
>> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>>
>>> Wow, I would LOVE to see this talk.  If there is no recording,
>>> perhaps there are slides somewhere?
>>>
>>> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
>>> 

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-24 Thread Andrew Otto
> I’m curious if there is any reason for choosing Iceberg instead of Paimon

No technical reason that I'm aware of.  We are using it mostly because of
momentum.  We looked at Flink Table Store (before it was Paimon), but
decided it was too early and the docs were too sparse at the time to really
consider it.

> Especially for a use case like CDC that iceberg struggles to support.

We aren't doing any CDC right now (for many reasons), but I have never seen
a feature like Paimon's database sync before.  One job to sync and evolve
an entire database?  That is amazing.

If we could do this with Iceberg, we might be able to make an argument to
product managers to push for CDC.



On Fri, May 24, 2024 at 8:36 AM Giannis Polyzos 
wrote:

> I’m curious if there is any reason for choosing Iceberg instead of Paimon
> (other than - iceberg is more popular).
> Especially for a use case like CDC that iceberg struggles to support.
>
> On Fri, 24 May 2024 at 3:22 PM, Andrew Otto  wrote:
>
>> Interesting thank you!
>>
>> I asked this in the Paimon users group:
>>
>> How coupled to Paimon catalogs and tables is the cdc part of Paimon?
>> RichCdcMultiplexRecord
>> 
>>  and
>> related code seem incredibly useful even outside of the context of the
>> Paimon table format.
>>
>> I'm asking because the database sync action
>> 
>>  feature
>> is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
>> Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
>> Paimon and abstract the Sink bits.
>>
>> Could the table/database sync with schema evolution (without Flink job
>> restarts!) potentially work with the Iceberg sink?
>>
>>
>>
>>
>> On Thu, May 23, 2024 at 4:34 PM Péter Váry 
>> wrote:
>>
>>> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the
>>> wire which contain not only the data, but the schema as well.
>>> With Iceberg we currently only send the row data, and expect to receive
>>> the schema on job start - this is more performant than sending the schema
>>> all the time, but has the obvious issue that it is not able to handle the
>>> schema changes. Another part of the dynamic schema synchronization is the
>>> update of the Iceberg table schema - the schema should be updated for all
>>> of the writers and the committer / but only a single schema change commit
>>> is needed (allowed) to the Iceberg table.
>>>
>>> This is a very interesting, but non-trivial change.
>>>
>>> [1]
>>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
>>>
>>> Andrew Otto  ezt írta (időpont: 2024. máj. 23., Cs,
>>> 21:59):
>>>
 Ah I see, so just auto-restarting to pick up new stuff.

 I'd love to understand how Paimon does this.  They have a database
 sync action
 
 which will sync entire databases, handle schema evolution, and I'm pretty
 sure (I think I saw this in my local test) also pick up new tables.


 https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45

 I'm sure that Paimon table format is great, but at Wikimedia Foundation
 we are on the Iceberg train.  Imagine if there was a flink-cdc full
 database sync to Flink IcebergSink!




 On Thu, May 23, 2024 at 3:47 PM Péter Váry 
 wrote:

> I will ask Marton about the slides.
>
> The solution was something like this in a nutshell:
> - Make sure that on job start the latest Iceberg schema is read from
> the Iceberg table
> - Throw a SuppressRestartsException when data arrives with the wrong
> schema
> - Use Flink Kubernetes Operator to restart your failed jobs by setting
> kubernetes.operator.job.restart.failed
>
> Thanks, Peter
>
> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>
>> Wow, I would LOVE to see this talk.  If there is no recording,
>> perhaps there are slides somewhere?
>>
>> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
>> sanabria.miranda.car...@gmail.com> wrote:
>>
>>> Hi everyone!
>>>
>>> I have found in the Flink Forward website the following
>>> presentation: "Self-service ingestion pipelines with evolving
>>> schema via Flink and Iceberg
>>> "
>>> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
>>> the 

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-24 Thread Giannis Polyzos
I’m curious if there is any reason for choosing Iceberg instead of Paimon
(other than - iceberg is more popular).
Especially for a use case like CDC that iceberg struggles to support.

On Fri, 24 May 2024 at 3:22 PM, Andrew Otto  wrote:

> Interesting thank you!
>
> I asked this in the Paimon users group:
>
> How coupled to Paimon catalogs and tables is the cdc part of Paimon?
> RichCdcMultiplexRecord
> 
>  and
> related code seem incredibly useful even outside of the context of the
> Paimon table format.
>
> I'm asking because the database sync action
> 
>  feature
> is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
> Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
> Paimon and abstract the Sink bits.
>
> Could the table/database sync with schema evolution (without Flink job
> restarts!) potentially work with the Iceberg sink?
>
>
>
>
> On Thu, May 23, 2024 at 4:34 PM Péter Váry 
> wrote:
>
>> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the
>> wire which contain not only the data, but the schema as well.
>> With Iceberg we currently only send the row data, and expect to receive
>> the schema on job start - this is more performant than sending the schema
>> all the time, but has the obvious issue that it is not able to handle the
>> schema changes. Another part of the dynamic schema synchronization is the
>> update of the Iceberg table schema - the schema should be updated for all
>> of the writers and the committer / but only a single schema change commit
>> is needed (allowed) to the Iceberg table.
>>
>> This is a very interesting, but non-trivial change.
>>
>> [1]
>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
>>
>> Andrew Otto  ezt írta (időpont: 2024. máj. 23., Cs,
>> 21:59):
>>
>>> Ah I see, so just auto-restarting to pick up new stuff.
>>>
>>> I'd love to understand how Paimon does this.  They have a database sync
>>> action
>>> 
>>> which will sync entire databases, handle schema evolution, and I'm pretty
>>> sure (I think I saw this in my local test) also pick up new tables.
>>>
>>>
>>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>>>
>>> I'm sure that Paimon table format is great, but at Wikimedia Foundation
>>> we are on the Iceberg train.  Imagine if there was a flink-cdc full
>>> database sync to Flink IcebergSink!
>>>
>>>
>>>
>>>
>>> On Thu, May 23, 2024 at 3:47 PM Péter Váry 
>>> wrote:
>>>
 I will ask Marton about the slides.

 The solution was something like this in a nutshell:
 - Make sure that on job start the latest Iceberg schema is read from
 the Iceberg table
 - Throw a SuppressRestartsException when data arrives with the wrong
 schema
 - Use Flink Kubernetes Operator to restart your failed jobs by setting
 kubernetes.operator.job.restart.failed

 Thanks, Peter

 On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:

> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
> there are slides somewhere?
>
> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
> sanabria.miranda.car...@gmail.com> wrote:
>
>> Hi everyone!
>>
>> I have found in the Flink Forward website the following presentation:
>> "Self-service ingestion pipelines with evolving schema via Flink and
>> Iceberg
>> "
>> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
>> the recording anywhere. I have found the recordings of the other
>> presentations in the Ververica Academy website
>> , but not this one.
>>
>> Does anyone know where I can find it? Or at least the slides?
>>
>> We are using Flink with the Iceberg sink connector to write streaming
>> events to Iceberg tables, and we are researching how to handle schema
>> evolution properly. I saw that presentation and I thought it could be of
>> great help to us.
>>
>> Thanks in advance!
>>
>> Carlos
>>
>


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-24 Thread Andrew Otto
Interesting thank you!

I asked this in the Paimon users group:

How coupled to Paimon catalogs and tables is the cdc part of Paimon?
RichCdcMultiplexRecord

and
related code seem incredibly useful even outside of the context of the
Paimon table format.

I'm asking because the database sync action

feature
is amazing.  At the Wikimedia Foundation, we are on an all-in journey with
Iceberg.  I'm wondering how hard it would be to extract the CDC logic from
Paimon and abstract the Sink bits.

Could the table/database sync with schema evolution (without Flink job
restarts!) potentially work with the Iceberg sink?




On Thu, May 23, 2024 at 4:34 PM Péter Váry 
wrote:

> If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the wire
> which contain not only the data, but the schema as well.
> With Iceberg we currently only send the row data, and expect to receive
> the schema on job start - this is more performant than sending the schema
> all the time, but has the obvious issue that it is not able to handle the
> schema changes. Another part of the dynamic schema synchronization is the
> update of the Iceberg table schema - the schema should be updated for all
> of the writers and the committer / but only a single schema change commit
> is needed (allowed) to the Iceberg table.
>
> This is a very interesting, but non-trivial change.
>
> [1]
> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
>
> Andrew Otto  ezt írta (időpont: 2024. máj. 23., Cs,
> 21:59):
>
>> Ah I see, so just auto-restarting to pick up new stuff.
>>
>> I'd love to understand how Paimon does this.  They have a database sync
>> action
>> 
>> which will sync entire databases, handle schema evolution, and I'm pretty
>> sure (I think I saw this in my local test) also pick up new tables.
>>
>>
>> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>>
>> I'm sure that Paimon table format is great, but at Wikimedia Foundation
>> we are on the Iceberg train.  Imagine if there was a flink-cdc full
>> database sync to Flink IcebergSink!
>>
>>
>>
>>
>> On Thu, May 23, 2024 at 3:47 PM Péter Váry 
>> wrote:
>>
>>> I will ask Marton about the slides.
>>>
>>> The solution was something like this in a nutshell:
>>> - Make sure that on job start the latest Iceberg schema is read from the
>>> Iceberg table
>>> - Throw a SuppressRestartsException when data arrives with the wrong
>>> schema
>>> - Use Flink Kubernetes Operator to restart your failed jobs by setting
>>> kubernetes.operator.job.restart.failed
>>>
>>> Thanks, Peter
>>>
>>> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>>>
 Wow, I would LOVE to see this talk.  If there is no recording, perhaps
 there are slides somewhere?

 On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
 sanabria.miranda.car...@gmail.com> wrote:

> Hi everyone!
>
> I have found in the Flink Forward website the following presentation: 
> "Self-service
> ingestion pipelines with evolving schema via Flink and Iceberg
> "
> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
> the recording anywhere. I have found the recordings of the other
> presentations in the Ververica Academy website
> , but not this one.
>
> Does anyone know where I can find it? Or at least the slides?
>
> We are using Flink with the Iceberg sink connector to write streaming
> events to Iceberg tables, and we are researching how to handle schema
> evolution properly. I saw that presentation and I thought it could be of
> great help to us.
>
> Thanks in advance!
>
> Carlos
>



Ways to detect a scaling event within a flink operator at runtime

2024-05-23 Thread Chetas Joshi
Hello,

On a k8s cluster, I have the flink-k8s-operator running 1.8 with autoscaler
= enabled (in-place) and a flinkDeployment (application mode) running
1.18.1.

The flinkDeployment i.e. the flink streaming application has a mock data
producer as the source.  The source generates data points every X milli
to be processed (aggregated) by the downstream operators. The aggregated
results are written to Iceberg.

The pipeline starts with default-parallelism = 1 i..e all the job vertexes
start with par = 1 and X = 0 so all data points are generated continuously.
Due to the lag associated with the aggregation and sink, the
source experiences backpressure and hence the autoscaler triggers a
scale-up. I want to slow down the speed of data production by source after
the first scale-up event. What are the ways I can detect the scale-up event
so that the source can dynamically adjust (increase) X at run-time? I am
wondering if there is a way to detect if the parallelism of any of the
job-vertex in the flink execution graph has gone above 1 within the source
operator at runtime.

This is a test pipeline (flink app) and the goal is to test the scale-up
and scale-down events thus I need to increase X in order to have a
scale-down event get triggered afterwards.

Thank you
Chetas


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Péter Váry
If I understand correctly, Paimon is sending `CdcRecord`-s [1] on the wire
which contain not only the data, but the schema as well.
With Iceberg we currently only send the row data, and expect to receive the
schema on job start - this is more performant than sending the schema all
the time, but has the obvious issue that it is not able to handle the
schema changes. Another part of the dynamic schema synchronization is the
update of the Iceberg table schema - the schema should be updated for all
of the writers and the committer / but only a single schema change commit
is needed (allowed) to the Iceberg table.

This is a very interesting, but non-trivial change.

[1]
https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java

Andrew Otto  ezt írta (időpont: 2024. máj. 23., Cs,
21:59):

> Ah I see, so just auto-restarting to pick up new stuff.
>
> I'd love to understand how Paimon does this.  They have a database sync
> action
> 
> which will sync entire databases, handle schema evolution, and I'm pretty
> sure (I think I saw this in my local test) also pick up new tables.
>
>
> https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45
>
> I'm sure that Paimon table format is great, but at Wikimedia Foundation we
> are on the Iceberg train.  Imagine if there was a flink-cdc full database
> sync to Flink IcebergSink!
>
>
>
>
> On Thu, May 23, 2024 at 3:47 PM Péter Váry 
> wrote:
>
>> I will ask Marton about the slides.
>>
>> The solution was something like this in a nutshell:
>> - Make sure that on job start the latest Iceberg schema is read from the
>> Iceberg table
>> - Throw a SuppressRestartsException when data arrives with the wrong
>> schema
>> - Use Flink Kubernetes Operator to restart your failed jobs by setting
>> kubernetes.operator.job.restart.failed
>>
>> Thanks, Peter
>>
>> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>>
>>> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
>>> there are slides somewhere?
>>>
>>> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
>>> sanabria.miranda.car...@gmail.com> wrote:
>>>
 Hi everyone!

 I have found in the Flink Forward website the following presentation: 
 "Self-service
 ingestion pipelines with evolving schema via Flink and Iceberg
 "
 by Márton Balassi from the 2023 conference in Seattle, but I cannot find
 the recording anywhere. I have found the recordings of the other
 presentations in the Ververica Academy website
 , but not this one.

 Does anyone know where I can find it? Or at least the slides?

 We are using Flink with the Iceberg sink connector to write streaming
 events to Iceberg tables, and we are researching how to handle schema
 evolution properly. I saw that presentation and I thought it could be of
 great help to us.

 Thanks in advance!

 Carlos

>>>


Pulsar connector resets existing subscription

2024-05-23 Thread Igor Basov
Hi everyone,

I have a problem with how Flink deals with the existing subscription in a
Pulsar topic.

   - Subscription has some accumulated backlog
   - Flink job is deployed from a clear state (no checkpoints)
   - Flink job uses the same subscription name as the existing one; the
   start cursor is the default one (earliest)

Based on the docs here
,
the priority for setting up the cursor position should be: checkpoint >
existed subscription position > StartCursor. So, since there are no
checkpoints, I expect the job to get the existing position from Pulsar and
start reading from there.
But that’s not what I see. As soon as the job is connected to the topic, I
see the number of messages in the subscription backlog jumping to a new
high, and JM logs show messages:

Seeking subscription to the message -1:-1:-1
Successfully reset subscription to the message -1:-1:-1

Apparently, Flink ignored the existing subscription position and reset its
cursor position to the earliest.
The related code seems to be here
,
but I’m not sure if it takes into account the existence of subscriptions.

Flink: 1.18.1
Pulsar connector: org.apache.flink:flink-connector-pulsar:4.1.0-1.18

Thanks in advance!

Best regards,
Igor


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Andrew Otto
Ah I see, so just auto-restarting to pick up new stuff.

I'd love to understand how Paimon does this.  They have a database sync
action

which will sync entire databases, handle schema evolution, and I'm pretty
sure (I think I saw this in my local test) also pick up new tables.

https://github.com/apache/paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java#L45

I'm sure that Paimon table format is great, but at Wikimedia Foundation we
are on the Iceberg train.  Imagine if there was a flink-cdc full database
sync to Flink IcebergSink!




On Thu, May 23, 2024 at 3:47 PM Péter Váry 
wrote:

> I will ask Marton about the slides.
>
> The solution was something like this in a nutshell:
> - Make sure that on job start the latest Iceberg schema is read from the
> Iceberg table
> - Throw a SuppressRestartsException when data arrives with the wrong schema
> - Use Flink Kubernetes Operator to restart your failed jobs by setting
> kubernetes.operator.job.restart.failed
>
> Thanks, Peter
>
> On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:
>
>> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
>> there are slides somewhere?
>>
>> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
>> sanabria.miranda.car...@gmail.com> wrote:
>>
>>> Hi everyone!
>>>
>>> I have found in the Flink Forward website the following presentation: 
>>> "Self-service
>>> ingestion pipelines with evolving schema via Flink and Iceberg
>>> "
>>> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
>>> the recording anywhere. I have found the recordings of the other
>>> presentations in the Ververica Academy website
>>> , but not this one.
>>>
>>> Does anyone know where I can find it? Or at least the slides?
>>>
>>> We are using Flink with the Iceberg sink connector to write streaming
>>> events to Iceberg tables, and we are researching how to handle schema
>>> evolution properly. I saw that presentation and I thought it could be of
>>> great help to us.
>>>
>>> Thanks in advance!
>>>
>>> Carlos
>>>
>>


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Péter Váry
I will ask Marton about the slides.

The solution was something like this in a nutshell:
- Make sure that on job start the latest Iceberg schema is read from the
Iceberg table
- Throw a SuppressRestartsException when data arrives with the wrong schema
- Use Flink Kubernetes Operator to restart your failed jobs by setting
kubernetes.operator.job.restart.failed

Thanks, Peter

On Thu, May 23, 2024, 20:29 Andrew Otto  wrote:

> Wow, I would LOVE to see this talk.  If there is no recording, perhaps
> there are slides somewhere?
>
> On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
> sanabria.miranda.car...@gmail.com> wrote:
>
>> Hi everyone!
>>
>> I have found in the Flink Forward website the following presentation: 
>> "Self-service
>> ingestion pipelines with evolving schema via Flink and Iceberg
>> "
>> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
>> the recording anywhere. I have found the recordings of the other
>> presentations in the Ververica Academy website
>> , but not this one.
>>
>> Does anyone know where I can find it? Or at least the slides?
>>
>> We are using Flink with the Iceberg sink connector to write streaming
>> events to Iceberg tables, and we are researching how to handle schema
>> evolution properly. I saw that presentation and I thought it could be of
>> great help to us.
>>
>> Thanks in advance!
>>
>> Carlos
>>
>


Re: issues with Flink kinesis connector

2024-05-23 Thread Nick Hecht
Thank you for your help!

On Thu, May 23, 2024 at 1:40 PM Aleksandr Pilipenko 
wrote:

> Hi Nick,
>
> You need to use another method to add sink to your job - sinkTo.
> KinesisStreamsSink implements newer Sink interface, while addSink expect
> old SinkFunction. You can see this by looking at method signatures[1] and
> in usage examples in documentation[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink
>
> Best,
> Aleksandr
>
>
> On Thu, 23 May 2024 at 17:19, Nick Hecht 
> wrote:
>
>> Hello,
>>
>> I am currently having issues trying to use the python flink 1.18
>> Datastream api with the Amazon Kinesis Data Streams Connector.
>>
>> From the documentation
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
>>  I have downloaded the "flink-connector-kinesis" jar
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar
>>
>> and i have added it in my code:
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>> env.enable_checkpointing(5000)
>>
>> env.add_jars(
>> "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar"
>> ,
>> )
>>
>> and it has worked perfectly so far when setting up my kinesis source,  i
>> recently added a kinesis sink to complete my pipeline (was testing with
>> print before)
>>
>> # ds = ds.print() sink = KinesisStreamsSink.builder() \
>> .set_kinesis_client_properties(config) \
>> .set_serialization_schema(SimpleStringSchema()) \
>> .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
>> .set_stream_name(stream_name) \
>> .build()
>>
>> ds = ds.add_sink(sink)
>>
>> s_env.execute('pipeline')
>>
>> now when i run my python flink application it throws an error at my
>> add_sink call with the following exception:
>>
>> > python locations_flink_app.py
>>
>> 2024-05-23 14:53:10,219 - INFO -
>> apache_beam.typehints.native_type_compatibility - 315 - Using Any for
>> unsupported type: typing.Sequence[~T]
>> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
>> module named google.cloud.bigquery_storage_v1. As a result, the
>> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
>> Traceback (most recent call last):
>>   File "locations_flink_app.py", line 90, in 
>> setup_flink_app(s_env, props)
>>   File "locations_flink_app.py", line 71, in setup_flink_app
>> ds = ds.add_sink(create_sink(
>>   File
>> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
>> line 819, in add_sink
>> return
>> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
>>   File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py",
>> line 1322, in __call__
>> return_value = get_return_value(
>>   File
>> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line
>> 146, in deco
>> return f(*a, **kw)
>>   File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line
>> 330, in get_return_value
>> raise Py4JError(
>> py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
>> Trace:
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
>> does not exist
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> when i open the jar i downloaded
>> (flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
>> the classes i need
>> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
>> has  KinesisStreamsSink.class[class
>> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]
>>
>> If I remove the sink the source still works perfectly fine
>> (FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
>> I'm using should have everything.
>>
>> anyone else have similar issues?  or know what I might need to do?
>>
>>
>> Thank you,
>>
>> Nick Hecht
>>
>


Re: issues with Flink kinesis connector

2024-05-23 Thread Aleksandr Pilipenko
Hi Nick,

You need to use another method to add sink to your job - sinkTo.
KinesisStreamsSink implements newer Sink interface, while addSink expect
old SinkFunction. You can see this by looking at method signatures[1] and
in usage examples in documentation[2]

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink

Best,
Aleksandr


On Thu, 23 May 2024 at 17:19, Nick Hecht  wrote:

> Hello,
>
> I am currently having issues trying to use the python flink 1.18
> Datastream api with the Amazon Kinesis Data Streams Connector.
>
> From the documentation
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
>  I have downloaded the "flink-connector-kinesis" jar
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar
>
> and i have added it in my code:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
> env.enable_checkpointing(5000)
>
> env.add_jars(
> "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar",
> )
>
> and it has worked perfectly so far when setting up my kinesis source,  i
> recently added a kinesis sink to complete my pipeline (was testing with
> print before)
>
> # ds = ds.print() sink = KinesisStreamsSink.builder() \
> .set_kinesis_client_properties(config) \
> .set_serialization_schema(SimpleStringSchema()) \
> .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
> .set_stream_name(stream_name) \
> .build()
>
> ds = ds.add_sink(sink)
>
> s_env.execute('pipeline')
>
> now when i run my python flink application it throws an error at my
> add_sink call with the following exception:
>
> > python locations_flink_app.py
>
> 2024-05-23 14:53:10,219 - INFO -
> apache_beam.typehints.native_type_compatibility - 315 - Using Any for
> unsupported type: typing.Sequence[~T]
> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
> module named google.cloud.bigquery_storage_v1. As a result, the
> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
> Traceback (most recent call last):
>   File "locations_flink_app.py", line 90, in 
> setup_flink_app(s_env, props)
>   File "locations_flink_app.py", line 71, in setup_flink_app
> ds = ds.add_sink(create_sink(
>   File
> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
> line 819, in add_sink
> return
> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
>   File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line
> 1322, in __call__
> return_value = get_return_value(
>   File
> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line
> 146, in deco
> return f(*a, **kw)
>   File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line
> 330, in get_return_value
> raise Py4JError(
> py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
> Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
> does not exist
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> when i open the jar i downloaded
> (flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
> the classes i need
> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
> has  KinesisStreamsSink.class[class
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]
>
> If I remove the sink the source still works perfectly fine
> (FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
> I'm using should have everything.
>
> anyone else have similar issues?  or know what I might need to do?
>
>
> Thank you,
>
> Nick Hecht
>


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread Zhanghao Chen
Hi John,

Based on the Memory config screenshot provided before, each of your TM should 
have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. 
Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory 
exceed pod physical mem, you may check the detailed TM memory model [1] and 
double check for yourself.

Maybe you can further analyze the direct memory usage using tools like JVM 
Native Memory Tracking (NMT).

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Zhanghao Chen

From: John Smith 
Sent: Thursday, May 23, 2024 22:40
To: Zhanghao Chen 
Cc: Biao Geng ; user 
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My suspicion 
is that I may have allocated too much of taskmanager.memory.flink.size and the 
total including MaxDirectMemory is more than what the physical OS has, is that 
possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula 
MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith 
mailto:java.dev@gmail.com>> wrote:
Ok, but I still don't get why it's doing it... It's the same version of 
flink... Only difference is java 11 and also I allocated more JVM heap and the 
actual physical is has more ram. Maybe I should reduce the JVM heap by a a 
gigabyte or two?

On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi John,

A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + 
Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, 
regardless of the version of JVM.

Best,
Zhanghao Chen

From: John Smith mailto:java.dev@gmail.com>>
Sent: Wednesday, May 22, 2024 22:56
To: Biao Geng mailto:biaoge...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Hi, apologies I hit reply instead of reply all. So not sure who saw this or 
didn't. We have not switched to SSL and also our assumption here would be that 
if we did switch to SSL the jobs would not work or produce invalid results. The 
jobs work absolutely fine for a week or so and then they fail.

Here is the consumer config from the task logs, which says PLAINTEXT and port 
9092 is used. Also I attached a screen of the task manager memory usage. As 
well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by 
default calculates the direct memory size to 87% of the max heap size. While 
Java 11 set it to 100% of the max heap size.

[Screen Shot 2024-05-22 at 9.50.38 AM.png]

 allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092, 
xx-kafka-0003:9092]
check.crcs = true
client.dns.lookup = default
client.id = xx
client.rack =
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = xx
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 6
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null

Re: Task Manager memory usage

2024-05-23 Thread Zhanghao Chen
Hi Sigalit,

For states stored in memory, they would most probably keep alive for several 
rounds of GC and ended up in the old gen of heap, and won't get recycled until 
a Full GC.

As for the TM pod memory usage, most probabliy it will stop increasing at some 
point. You could try setting a larger taskmanager.memory.jvm-overhead memory, 
and monitor it for a long time. If that's not the case, then there might be 
native memory leakage somewhere, but that may not be related to the state.

Best,
Zhanghao Chen

From: Sigalit Eliazov 
Sent: Thursday, May 23, 2024 18:20
To: user 
Subject: Task Manager memory usage


Hi,

I am trying to understand the following behavior in our Flink application 
cluster. Any assistance would be appreciated.

We are running a Flink application cluster with 5 task managers, each with the 
following configuration:

  *   jobManagerMemory: 12g
  *   taskManagerMemory: 20g
  *   taskManagerMemoryHeapSize: 12g
  *   taskManagerMemoryNetworkMax: 4g
  *   taskManagerMemoryNetworkMin: 1g
  *   taskManagerMemoryManagedSize: 50m
  *   taskManagerMemoryOffHeapSize: 2g
  *   taskManagerMemoryNetworkFraction: 0.2
  *   taskManagerNetworkMemorySegmentSize: 4mb
  *   taskManagerMemoryFloatingBuffersPerGate: 64
  *   taskmanager.memory.jvm-overhead.min: 256mb
  *   taskmanager.memory.jvm-overhead.max: 2g
  *   taskmanager.memory.jvm-overhead.fraction: 0.1

Our pipeline includes stateful transformations, and we are verifying that we 
clear the state once it is no longer needed.

Through the Flink UI, we observe that the heap size increases and decreases 
during the job lifecycle.

However, there is a noticeable delay between clearing the state and the 
reduction in heap size usage, which I assume is related to the garbage 
collector frequency.

What is puzzling is the task manager pod memory usage. It appears that the 
memory usage increases intermittently and is not released. We verified the 
different state metrics and confirmed they are changing according to the logic.

Additionally, if we had a state that was never released, I would expect to see 
the heap size increasing constantly as well.

Any insights or ideas?

Thanks,

Sigalit


Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Andrew Otto
Wow, I would LOVE to see this talk.  If there is no recording, perhaps
there are slides somewhere?

On Thu, May 23, 2024 at 11:00 AM Carlos Sanabria Miranda <
sanabria.miranda.car...@gmail.com> wrote:

> Hi everyone!
>
> I have found in the Flink Forward website the following presentation: 
> "Self-service
> ingestion pipelines with evolving schema via Flink and Iceberg
> "
> by Márton Balassi from the 2023 conference in Seattle, but I cannot find
> the recording anywhere. I have found the recordings of the other
> presentations in the Ververica Academy website
> , but not this one.
>
> Does anyone know where I can find it? Or at least the slides?
>
> We are using Flink with the Iceberg sink connector to write streaming
> events to Iceberg tables, and we are researching how to handle schema
> evolution properly. I saw that presentation and I thought it could be of
> great help to us.
>
> Thanks in advance!
>
> Carlos
>


issues with Flink kinesis connector

2024-05-23 Thread Nick Hecht
Hello,

I am currently having issues trying to use the python flink 1.18 Datastream
api with the Amazon Kinesis Data Streams Connector.

>From the documentation
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
 I have downloaded the "flink-connector-kinesis" jar
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar

and i have added it in my code:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.enable_checkpointing(5000)

env.add_jars(
"file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar",
)

and it has worked perfectly so far when setting up my kinesis source,  i
recently added a kinesis sink to complete my pipeline (was testing with
print before)

# ds = ds.print() sink = KinesisStreamsSink.builder() \
.set_kinesis_client_properties(config) \
.set_serialization_schema(SimpleStringSchema()) \
.set_partition_key_generator(PartitionKeyGenerator.fixed()) \
.set_stream_name(stream_name) \
.build()

ds = ds.add_sink(sink)

s_env.execute('pipeline')

now when i run my python flink application it throws an error at my
add_sink call with the following exception:

> python locations_flink_app.py

2024-05-23 14:53:10,219 - INFO -
apache_beam.typehints.native_type_compatibility - 315 - Using Any for
unsupported type: typing.Sequence[~T]
2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
module named google.cloud.bigquery_storage_v1. As a result, the
ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Traceback (most recent call last):
  File "locations_flink_app.py", line 90, in 
setup_flink_app(s_env, props)
  File "locations_flink_app.py", line 71, in setup_flink_app
ds = ds.add_sink(create_sink(
  File
"/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
line 819, in add_sink
return
DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
  File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line
1322, in __call__
return_value = get_return_value(
  File "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py",
line 146, in deco
return f(*a, **kw)
  File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line 330,
in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method
addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
does not exist
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)

when i open the jar i downloaded
(flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
the classes i need
Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
has  KinesisStreamsSink.class[class
org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]

If I remove the sink the source still works perfectly fine
(FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
I'm using should have everything.

anyone else have similar issues?  or know what I might need to do?


Thank you,

Nick Hecht


"Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Carlos Sanabria Miranda
Hi everyone!

I have found in the Flink Forward website the following presentation:
"Self-service
ingestion pipelines with evolving schema via Flink and Iceberg
"
by Márton Balassi from the 2023 conference in Seattle, but I cannot find
the recording anywhere. I have found the recordings of the other
presentations in the Ververica Academy website
, but not this one.

Does anyone know where I can find it? Or at least the slides?

We are using Flink with the Iceberg sink connector to write streaming
events to Iceberg tables, and we are researching how to handle schema
evolution properly. I saw that presentation and I thought it could be of
great help to us.

Thanks in advance!

Carlos


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread John Smith
Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My
suspicion is that I may have allocated too much
of taskmanager.memory.flink.size and the total including MaxDirectMemory is
more than what the physical OS has, is that possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula
MaxDirectMemory
of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith  wrote:

> Ok, but I still don't get why it's doing it... It's the same version of
> flink... Only difference is java 11 and also I allocated more JVM heap and
> the actual physical is has more ram. Maybe I should reduce the JVM heap by
> a a gigabyte or two?
>
> On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
> wrote:
>
>> Hi John,
>>
>> A side note here: Flink will set the MaxDirectMemory of TM = Network
>> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default
>> setting, regardless of the version of JVM.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* John Smith 
>> *Sent:* Wednesday, May 22, 2024 22:56
>> *To:* Biao Geng 
>> *Cc:* user 
>> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct
>> buffer memory?
>>
>> Hi, apologies I hit reply instead of reply all. So not sure who saw this
>> or didn't. We have not switched to SSL and also our assumption here
>> would be that if we did switch to SSL the jobs would not work or produce
>> invalid results. The jobs work absolutely fine for a week or so and then
>> they fail.
>>
>> Here is the consumer config from the task logs, which says PLAINTEXT and
>> port 9092 is used. Also I attached a screen of the task manager memory
>> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11.
>> Java 8 by default calculates the direct memory size to 87% of the max heap
>> size. While Java 11 set it to 100% of the max heap size.
>>
>> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png]
>>
>>  allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092,
>> xx-kafka-0003:9092]
>> check.crcs = true
>> client.dns.lookup = default
>> client.id = xx
>> client.rack =
>> connections.max.idle.ms = 54
>> default.api.timeout.ms = 6
>> enable.auto.commit = false
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = xx
>> group.instance.id = null
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> max.partition.fetch.bytes = 1048576
>> max.poll.interval.ms = 30
>> max.poll.records = 500
>> metadata.max.age.ms = 30
>> metric.reporters = []
>> metrics.num.samples = 2
>> metrics.recording.level = INFO
>> metrics.sample.window.ms = 3
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>> receive.buffer.bytes = 65536
>> reconnect.backoff.max.ms = 1000
>> reconnect.backoff.ms = 50
>> request.timeout.ms = 6
>> retry.backoff.ms = 100
>> sasl.client.callback.handler.class = null
>> sasl.jaas.config = null
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.min.time.before.relogin = 6
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> sasl.login.callback.handler.class = null
>> sasl.login.class = null
>> sasl.login.refresh.buffer.seconds = 300
>> sasl.login.refresh.min.period.seconds = 60
>> sasl.login.refresh.window.factor = 0.8
>> sasl.login.refresh.window.jitter = 0.05
>> sasl.mechanism = GSSAPI
>> security.protocol = PLAINTEXT
>> security.providers = null
>> send.buffer.bytes = 131072
>> session.timeout.ms = 1
>> ssl.cipher.suites = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.endpoint.identification.algorithm = https
>> ssl.key.password = null
>> ssl.keymanager.algorithm = SunX509
>> ssl.keystore.location = null
>> ssl.keystore.password = null
>> ssl.keystore.type = JKS
>> ssl.protocol = TLS
>> ssl.provider = null
>> ssl.secure.random.implementation = null
>> ssl.trustmanager.algorithm = PKIX
>> ssl.truststore.location = null
>> ssl.truststore.password = null
>> ssl.truststore.type = JKS
>> value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> On Thu, May 16, 2024 at 3:20 AM Biao Geng  wrote:
>>
>> Hi John,
>>
>> Just want to check, have you ever changed the kafka protocol in your job
>> after using the new cluster? The error message shows that it is caused by
>> the kafka 

kirankumarkathe...@gmail.com-unsubscribe

2024-05-23 Thread Kiran Kumar Kathe
Kindly un subscribe for this gmail account kirankumarkathe...@gmail.com


Re: Task Manager memory usage

2024-05-23 Thread Sigalit Eliazov
hi,
thanks for your reply,
we are storing the data in memory since it is a short term we thought that
adding rocksdb will add overhead.


On Thu, May 23, 2024 at 4:38 PM Sachin Mittal  wrote:

> Hi
> Where are you storing the state.
> Try rocksdb.
>
> Thanks
> Sachin
>
>
> On Thu, 23 May 2024 at 6:19 PM, Sigalit Eliazov 
> wrote:
>
>> Hi,
>>
>> I am trying to understand the following behavior in our Flink application
>> cluster. Any assistance would be appreciated.
>>
>> We are running a Flink application cluster with 5 task managers, each
>> with the following configuration:
>>
>>- jobManagerMemory: 12g
>>- taskManagerMemory: 20g
>>- taskManagerMemoryHeapSize: 12g
>>- taskManagerMemoryNetworkMax: 4g
>>- taskManagerMemoryNetworkMin: 1g
>>- taskManagerMemoryManagedSize: 50m
>>- taskManagerMemoryOffHeapSize: 2g
>>- taskManagerMemoryNetworkFraction: 0.2
>>- taskManagerNetworkMemorySegmentSize: 4mb
>>- taskManagerMemoryFloatingBuffersPerGate: 64
>>- taskmanager.memory.jvm-overhead.min: 256mb
>>- taskmanager.memory.jvm-overhead.max: 2g
>>- taskmanager.memory.jvm-overhead.fraction: 0.1
>>
>> Our pipeline includes stateful transformations, and we are verifying that
>> we clear the state once it is no longer needed.
>>
>> Through the Flink UI, we observe that the heap size increases and
>> decreases during the job lifecycle.
>>
>> However, there is a noticeable delay between clearing the state and the
>> reduction in heap size usage, which I assume is related to the garbage
>> collector frequency.
>>
>> What is puzzling is the task manager pod memory usage. It appears that
>> the memory usage increases intermittently and is not released. We verified
>> the different state metrics and confirmed they are changing according to
>> the logic.
>>
>> Additionally, if we had a state that was never released, I would expect
>> to see the heap size increasing constantly as well.
>>
>> Any insights or ideas?
>>
>> Thanks,
>>
>> Sigalit
>>
>


Re: Task Manager memory usage

2024-05-23 Thread Sachin Mittal
Hi
Where are you storing the state.
Try rocksdb.

Thanks
Sachin


On Thu, 23 May 2024 at 6:19 PM, Sigalit Eliazov  wrote:

> Hi,
>
> I am trying to understand the following behavior in our Flink application
> cluster. Any assistance would be appreciated.
>
> We are running a Flink application cluster with 5 task managers, each with
> the following configuration:
>
>- jobManagerMemory: 12g
>- taskManagerMemory: 20g
>- taskManagerMemoryHeapSize: 12g
>- taskManagerMemoryNetworkMax: 4g
>- taskManagerMemoryNetworkMin: 1g
>- taskManagerMemoryManagedSize: 50m
>- taskManagerMemoryOffHeapSize: 2g
>- taskManagerMemoryNetworkFraction: 0.2
>- taskManagerNetworkMemorySegmentSize: 4mb
>- taskManagerMemoryFloatingBuffersPerGate: 64
>- taskmanager.memory.jvm-overhead.min: 256mb
>- taskmanager.memory.jvm-overhead.max: 2g
>- taskmanager.memory.jvm-overhead.fraction: 0.1
>
> Our pipeline includes stateful transformations, and we are verifying that
> we clear the state once it is no longer needed.
>
> Through the Flink UI, we observe that the heap size increases and
> decreases during the job lifecycle.
>
> However, there is a noticeable delay between clearing the state and the
> reduction in heap size usage, which I assume is related to the garbage
> collector frequency.
>
> What is puzzling is the task manager pod memory usage. It appears that the
> memory usage increases intermittently and is not released. We verified the
> different state metrics and confirmed they are changing according to the
> logic.
>
> Additionally, if we had a state that was never released, I would expect to
> see the heap size increasing constantly as well.
>
> Any insights or ideas?
>
> Thanks,
>
> Sigalit
>


Help with monitoring metrics of StateFun runtime with prometheus

2024-05-23 Thread Oliver Schmied
Dear Apache Flink community,

 

I am setting up an apche flink statefun runtime on Kubernetes, following the flink-playground example: https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s.

This is the manifest I used for creating the statefun enviroment:

```---

apiVersion: v1
kind: ConfigMap
metadata:
  namespace: statefun
  name: flink-config
  labels:
    app: statefun
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: statefun-master
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
    state.backend: rocksdb
    state.backend.rocksdb.timer-service.factory: ROCKSDB
    state.backend.incremental: true
    parallelism.default: 1
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
    state.checkpoints.dir: s3://checkpoints/subscriptions
    s3.endpoint: http://minio.statefun.svc.cluster.local:9000
    s3.path-style-access: true
    jobmanager.memory.process.size: 1g
    taskmanager.memory.process.size: 1g

  log4j-console.properties: |+
  monitorInterval=30
  rootLogger.level = INFO
  rootLogger.appenderRef.console.ref = ConsoleAppender
  logger.akka.name = akka
  logger.akka.level = INFO
  logger.kafka.name= org.apache.kafka
  logger.kafka.level = INFO
  logger.hadoop.name = org.apache.hadoop
  logger.hadoop.level = INFO
  logger.zookeeper.name = org.apache.zookeeper
  logger.zookeeper.level = INFO
  appender.console.name = ConsoleAppender
  appender.console.type = CONSOLE
  appender.console.layout.type = PatternLayout
  appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
  logger.netty.level = OFF

---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master-rest
  namespace: statefun
spec:
  type: NodePort
  ports:
    - name: rest
  port: 8081
  targetPort: 8081
  selector:
    app: statefun
    component: master
---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master
  namespace: statefun
spec:
  type: ClusterIP
  ports:
    - name: rpc
  port: 6123
    - name: blob
  port: 6124
    - name: ui
  port: 8081
  selector:
    app: statefun
    component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-master
  namespace: statefun
spec:
  replicas: 1
  selector:
    matchLabels:
  app: statefun
  component: master
  template:
    metadata:
  labels:
    app: statefun
    component: master
    spec:
  containers:
    - name: master
  image: apache/flink-statefun:3.3.0
  imagePullPolicy: IfNotPresent
  env:
    - name: ROLE
  value: master
    - name: MASTER_HOST
  value: statefun-master
  ports:
    - containerPort: 6123
  name: rpc
    - containerPort: 6124
  name: blob
    - containerPort: 8081
  name: ui
  livenessProbe:
    tcpSocket:
  port: 6123
    initialDelaySeconds: 30
    periodSeconds: 60
  volumeMounts:
    - name: flink-config-volume
  mountPath: /opt/flink/conf
    - name: module-config-volume
  mountPath: /opt/statefun/modules/example
  volumes:
    - name: flink-config-volume
  configMap:
    name: flink-config
    items:
  - key: flink-conf.yaml
    path: flink-conf.yaml
  - key: log4j-console.properties
    path: log4j-console.properties
    - name: module-config-volume
  configMap:
    name: module-config
    items:
  - key: module.yaml
    path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-worker
  namespace: statefun
spec:
  replicas: 1
  selector:
    matchLabels:
  app: statefun
  component: worker
  template:
    metadata:
  labels:
    app: statefun
    component: worker
    spec:
  containers:
    - name: worker
  image: apache/flink-statefun:3.3.0
  imagePullPolicy: IfNotPresent
  env:
    - name: ROLE
  value: worker
    - name: MASTER_HOST
  value: statefun-master
  resources:
    requests:
  memory: "1Gi"
  ports:
    - containerPort: 6122
  name: rpc
    - containerPort: 6124
  name: blob
    - containerPort: 8081
  name: ui
  livenessProbe:
    tcpSocket:
  port: 6122
    initialDelaySeconds: 

Task Manager memory usage

2024-05-23 Thread Sigalit Eliazov
Hi,

I am trying to understand the following behavior in our Flink application
cluster. Any assistance would be appreciated.

We are running a Flink application cluster with 5 task managers, each with
the following configuration:

   - jobManagerMemory: 12g
   - taskManagerMemory: 20g
   - taskManagerMemoryHeapSize: 12g
   - taskManagerMemoryNetworkMax: 4g
   - taskManagerMemoryNetworkMin: 1g
   - taskManagerMemoryManagedSize: 50m
   - taskManagerMemoryOffHeapSize: 2g
   - taskManagerMemoryNetworkFraction: 0.2
   - taskManagerNetworkMemorySegmentSize: 4mb
   - taskManagerMemoryFloatingBuffersPerGate: 64
   - taskmanager.memory.jvm-overhead.min: 256mb
   - taskmanager.memory.jvm-overhead.max: 2g
   - taskmanager.memory.jvm-overhead.fraction: 0.1

Our pipeline includes stateful transformations, and we are verifying that
we clear the state once it is no longer needed.

Through the Flink UI, we observe that the heap size increases and decreases
during the job lifecycle.

However, there is a noticeable delay between clearing the state and the
reduction in heap size usage, which I assume is related to the garbage
collector frequency.

What is puzzling is the task manager pod memory usage. It appears that the
memory usage increases intermittently and is not released. We verified the
different state metrics and confirmed they are changing according to the
logic.

Additionally, if we had a state that was never released, I would expect to
see the heap size increasing constantly as well.

Any insights or ideas?

Thanks,

Sigalit


Re: 关于 mongo db 的splitVector 权限问题

2024-05-23 Thread Jiabao Sun
Hi,

splitVector 是 MongoDB 计算分片的内部命令,在副本集部署模式下也可以使用此命令来计算 chunk 区间。
如果没有 splitVector 权限,会自动降级为 sample 切分策略。

Best,
Jiabao

evio12...@gmail.com  于2024年5月23日周四 16:57写道:

>
> hello~
>
>
> 我正在使用 flink-cdc mongodb connector 2.3.0
> 
>  (
> https://github.com/apache/flink-cdc/blob/release-2.3/docs/content/connectors/mongodb-cdc.md)
> ,
> 文档中指出 mongo 账号需要这些权限 'splitVector', 'listDatabases', 'listCollections',
> 'collStats', 'find', and 'changeStream' ,
>
>
> 我现在使用的mongo是 replica-set , 但是了解到 splitVector 权限主要是对分片集,
> 如果DBA不授权 splitVector , 会有什么影响呢?
>
> --
> evio12...@gmail.com
>


关于 mongo db 的splitVector 权限问题

2024-05-23 Thread evio12...@gmail.com

hello~


我正在使用 flink-cdc mongodb connector 2.3.0 
(https://github.com/apache/flink-cdc/blob/release-2.3/docs/content/connectors/mongodb-cdc.md)
  ,
文档中指出 mongo 账号需要这些权限 'splitVector', 'listDatabases', 'listCollections', 
'collStats', 'find', and 'changeStream' ,


我现在使用的mongo是 replica-set , 但是了解到 splitVector 权限主要是对分片集,
如果DBA不授权 splitVector , 会有什么影响呢?



evio12...@gmail.com


Re: Flink kinesis connector 4.3.0 release estimated date

2024-05-23 Thread Vararu, Vadim
That’s great news.

Thanks.

From: Leonard Xu 
Date: Thursday, 23 May 2024 at 04:42
To: Vararu, Vadim 
Cc: user , Danny Cranmer 
Subject: Re: Flink kinesis connector 4.3.0 release estimated date
Hey, Vararu

The kinesis connector 4.3.0 release is under vote phase and we hope to finalize 
the release work in this week if everything goes well.


Best,
Leonard



2024年5月22日 下午11:51,Vararu, Vadim 
mailto:vadim.var...@adswizz.com>> 写道:

Hi guys,

Any idea when the 4.3.0 kinesis connector is estimated to be released?

Cheers,
Vadim.



Re: Flink kinesis connector 4.3.0 release estimated date

2024-05-22 Thread Leonard Xu
Hey, Vararu

The kinesis connector 4.3.0 release is under vote phase and we hope to finalize 
the release work in this week if everything goes well.


Best,
Leonard


> 2024年5月22日 下午11:51,Vararu, Vadim  写道:
> 
> Hi guys,
>  
> Any idea when the 4.3.0 kinesis connector is estimated to be released?
>  
> Cheers,
> Vadim.



Flink Kubernetes Operator Pod Disruption Budget

2024-05-22 Thread Jeremy Alvis via user
Hello,

In order to maintain at least one pod for both the Flink Kubernetes
Operator and JobManagers through Kubernetes processes that use the Eviction
API

such as when draining a node, we have deployed Pod Disruption Budgets

in the appropriate namespaces.

Here is the flink-kubernetes-operator PDB:

apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: flink-kubernetes-operator
spec:
  minAvailable: 1
  selector:
matchLabels:
  app: flink-kubernetes-operator

Where the Flink Kubernetes Operator has the flink-kubernetes-operator app
label defined:
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
app: flink-kubernetes-operator


Here is the jobmanager PDB (deployed alongside each FlinkDeployment):

apiVersion: policy/v1
kind: PodDisruptionBudget
spec:
  minAvailable: 1
  selector:
matchLabels:
  name: jobmanager

Where the FlinkDeployment has the jobmanager name label defined:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
spec:
  jobManager:
podTemplate:
  metadata:
labels:
  name: jobmanager

We were wondering if it would make sense for the Flink Kubernetes Operator
to automatically create the PDBs as they are a native Kubernetes resource
like the Ingress that the operator currently creates.

Thanks,
Jeremy


Flink kinesis connector 4.3.0 release estimated date

2024-05-22 Thread Vararu, Vadim
Hi guys,

Any idea when the 4.3.0 kinesis connector is estimated to be released?

Cheers,
Vadim.


[ANNOUNCE] Apache Celeborn 0.4.1 available

2024-05-22 Thread Nicholas Jiang
Hi all,

Apache Celeborn community is glad to announce the
new release of Apache Celeborn 0.4.1.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

- https://github.com/apache/celeborn/releases/tag/v0.4.1

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.1


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Regards,
Nicholas Jiang
On behalf of the Apache Celeborn community

StateMigrationException while using stateTTL

2024-05-22 Thread irakli.keshel...@sony.com
Hello,

I'm using Flink 1.17.1 and I have stateTTL enabled in one of my Flink jobs 
where I'm using the RocksDB for checkpointing. I have a value state of Pojo 
class (which is generated from Avro schema). I added a new field to my schema 
along with the default value to make sure it is backwards compatible, however 
when I redeployed the job, I got StateMigrationException. I have similar setup 
with other Flink jobs where adding a column doesn't cause any trouble.

This is my stateTTL config:

StateTtlConfig
 .newBuilder(Time.days(7))
 .cleanupInRocksdbCompactFilter(1000)
 .build

This is how I enable it:

val myStateDescriptor: ValueStateDescriptor[MyPojoClass] =
 new ValueStateDescriptor[MyPojoClass](
   "test-name",
   classOf[MyPojoClass])

myStateDescriptor.enableTimeToLive(initStateTTLConfig())

This is the exception I end up with:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51) must 
not be incompatible with the old state serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51).
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:222)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:145)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:129)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:69)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 25 more

Does anyone know what is causing the issue?

Cheers,
Irakli




Re: Get access to unmatching events in Apache Flink Cep

2024-05-22 Thread Anton Sidorov
In answer Biao said "currently there is no such API to access the middle
NFA state". May be that API exist in plan? Or I can create issue or pull
request that add API?

пт, 17 мая 2024 г. в 12:04, Anton Sidorov :

> Ok, thanks for the reply.
>
> пт, 17 мая 2024 г. в 09:22, Biao Geng :
>
>> Hi Anton,
>>
>> I am afraid that currently there is no such API to access the middle NFA
>> state in your case. For patterns that contain 'within()' condition, the
>> timeout events could be retrieved via TimedOutPartialMatchHandler
>> interface, but other unmatching events would be pruned immediately once
>> they are considered as unnecessary to keep.
>>
>> Best,
>> Biao Geng
>>
>>
>> Anton Sidorov  于2024年5月16日周四 16:12写道:
>>
>>> Hello!
>>>
>>> I have a Flink Job with CEP pattern.
>>>
>>> Pattern example:
>>>
>>> // Strict Contiguity
>>> // a b+ c d e
>>> Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
>>> .next("b").where(...).oneOrMore()
>>> .next("c").where(...)
>>> .next("d").where(...)
>>> .next("e").where(...);
>>>
>>> I have events with wrong order stream on input:
>>>
>>> a b d c e
>>>
>>> On output I haven`t any matching. But I want have access to events, that
>>> not matching.
>>>
>>> Can I have access to middle NFA state in CEP pattern, or get some other
>>> way to view unmatching events?
>>>
>>> Example project with CEP pattern on github
>>> , and my question
>>> on SO
>>> 
>>>
>>> Thanks in advance
>>>
>>
>
> --
> С уважением, Антон.
>


-- 
С уважением, Антон.


IllegalStateException: invalid BLOB

2024-05-21 Thread Lars Skjærven
Hello,

We're facing the bug reported in
https://issues.apache.org/jira/browse/FLINK-32212

More specifically, when kubernetes decides to drain a node, a job manager
restart (but not the task manager), the job fails with:

java.lang.IllegalStateException: The library registration references a
different set of library BLOBs than previous registrations for this job:
old:[p-46b02bb8d9740e39d3fe3b3782b0bd335b35ad9f-eefd1e86ec70ea3aaf3e3dce568fe172]
new:[p-46b02bb8d9740e39d3fe3b3782b0bd335b35ad9f-a91193b75fdb799d7b4b8de3f4984597]

deleting the pod(s) running the TM(s) resolves the problem, but is not
ideal for us.

Any status on FLINK-32212 to solve this problem?

Kind regards
Lars


Re:咨询Flink 1.19文档中关于iterate操作

2024-05-20 Thread Xuyang
Hi, 

目前Iterate api在1.19版本上废弃了,不再支持,具体可以参考[1][2]。Flip[1]中提供了另一种替代的办法[3]




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream

[2] https://issues.apache.org/jira/browse/FLINK-33144

[3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300




--

Best!
Xuyang





在 2024-05-20 22:39:37,""  写道:
>尊敬的Flink开发团队:
>
>您好!
>
>我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
>1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations
>
>但是,我发现Flink 
>1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?
>
>非常感谢您的时间和帮助,期待您的回复。
>
>谢谢!
>
>李智诚


Re: Flink autoscaler with AWS ASG: checkpoint access issue

2024-05-20 Thread Chetas Joshi
Hello,

After digging into the 403 issue a bit, I figured out that after the
scale-up event, the flink-s3-fs-presto uses the node-profile instead of
IRSA (Iam Role for Service Account) on some of the newly created TM pods.

1. Anyone else experienced this as well?
2. Verified that this is an issue with the flink-s3-fs-presto plugin as if
I switch to the hadoop plugin, I don't run into 403 errors after the
scale-up events.
3. What is the reason why the presto plugin is recommended over the hadoop
plugin while working with the checkpoint files in S3?

Thank you
Chetas

On Mon, May 13, 2024 at 6:59 PM Chetas Joshi  wrote:

> Hello,
>
> Set up
>
> I am running my Flink streaming jobs (upgradeMode = stateless) on an AWS
> EKS cluster. The node-type for the pods of the streaming jobs belongs to a
> node-group that has an AWS ASG (auto scaling group).
> The streaming jobs are the FlinkDeployments managed by the
> flink-k8s-operator (1.8) and I have enabled the job autoscaler.
>
> Scenario
>
> When the flink auto-scaler scales up a flink streaming job, new flink TMs
> are first added onto any existing nodes with available resources. If
> resources are not enough to schedule all the TM pods,  ASG adds new nodes
> to the EKS cluster and the rest of the TM pods are scheduled on these new
> nodes.
>
> Issue
>
> After the scale-up, the TM pods scheduled on the existing nodes with
> available resources successfully read the checkpoint from S3 however the TM
> pods scheduled on the new nodes added by ASG run into 403 (access denied)
> while reading the same checkpoint file from the checkpoint location in S3.
>
> Just FYI: I have disabled memory auto-tuning so the auto-scaling events
> are in place.
>
> 1. The IAM role associated with the service account being used by the
> FlinkDeployment is as expected for the new pods.
> 2. I am able to reproduce this issue every single time there is a scale-up
> that requires ASG to add new nodes to the cluster.
> 3. If I delete the FlinkDeployment and allow the operator to restart it,
> it starts and stops throwing 403.
> 4. I am also observing some 404 (not found) being reported by certain
> newly added TM pods. They are looking for an older checkpoint (for example
> looking for chk10 while a chk11 has already been created in S3 and chk10
> would have gotten subsumed by chk11)
>
> I would appreciate it if there are any pointers on how to debug this
> further.
>
> Let me know if you need more information.
>
> Thank you
> Chetas
>
>


咨询Flink 1.19文档中关于iterate操作

2024-05-20 Thread www
尊敬的Flink开发团队:

您好!

我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations

但是,我发现Flink 
1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?

非常感谢您的时间和帮助,期待您的回复。

谢谢!

李智诚

Re: flinksql 经过优化后,group by字段少了

2024-05-20 Thread Lincoln Lee
Hi,

可以尝试下 1.17 或更新的版本, 这个问题在 flink 1.17.0 中已修复[1]。
批处理中做这个 remove 优化是符合语义的,而在流中不能直接裁剪,
对于相关时间函数的说明文档[2]中也进行了更新

[1] https://issues.apache.org/jira/browse/FLINK-30006
[2]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e5%86%85%e7%bd%ae%e5%87%bd%e6%95%b0%e7%9a%84%e7%a1%ae%e5%ae%9a%e6%80%a7


Best,
Lincoln Lee


℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 22:07写道:

> 当前用的是 flink 1.16 版本,这个issue虽然合并到了 calcite-1.22.0 中,但是在之后一段时间内,又被新的 pr (
> https://github.com/apache/calcite/pull/1735/files)合并了。
> 所以,当前flink中是仍然存在这个问题。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2024年5月20日(星期一) 中午12:51
> 收件人:"user-zh"
> 主题:Re: flinksql 经过优化后,group by字段少了
>
>
>
> 你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
> 版本开始就已经用的是这个 calcite 版本了。
>
> 所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
> issue 来报一个 bug。
>
> PS:
> 上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。
>
> [1] https://issues.apache.org/jira/browse/CALCITE-3531
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月20日周一 11:06写道:
> 
>  您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite
> 中修复了,https://github.com/apache/calcite/pull/1602/files
>  ; 但是,flink 中引用的
> calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
> 
> 发件人:
> "user-zh"
>   发送时间:nbsp;2024年5月20日(星期一) 上午10:32
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flinksql 经过优化后,group by字段少了
> 
> 
> 
>  看起来像是因为 "dt = cast(CURRENT_DATEnbsp; as string)" 推导 dt
> 这个字段是个常量,进而被优化掉了。
> 
>  将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
> 
>  ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalidgt; 于2024年5月19日周日 01:01写道:
>  gt;
>  gt; create view tmp_view as
>  gt; SELECT
>  gt;nbsp;nbsp;nbsp;nbsp; dt, -- 2
>  gt;nbsp;nbsp;nbsp;nbsp; uid, -- 0
>  gt;nbsp;nbsp;nbsp;nbsp; uname, -- 1
>  gt;nbsp;nbsp;nbsp;nbsp; uage -- 3
>  gt; from
>  gt;nbsp;nbsp;nbsp;nbsp; kafkaTable
>  gt; where dt = cast(CURRENT_DATEnbsp; as string);
>  gt;
>  gt; insert into printSinkTable
>  gt; select
>  gt;nbsp;nbsp;nbsp;nbsp; dt, uid, uname,
> sum(uage)
>  gt; from tmp_view
>  gt; group by
>  gt;nbsp;nbsp;nbsp;nbsp; dt,
>  gt;nbsp;nbsp;nbsp;nbsp; uid,
>  gt;nbsp;nbsp;nbsp;nbsp; uname;
>  gt;
>  gt;
>  gt;
>  gt; sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname
> 三个字段进行聚合求和操作。
>  gt; 但是,经过优化后,生成的 物理结构如下:
>  gt; == Optimized Execution Plan ==
>  gt;
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt,
> uid, uname, EXPR$3])
>  gt; +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid,
> uname, EXPR$3])
>  gt; amp;nbsp; amp;nbsp;+- GroupAggregate(groupBy=[uid,
> uname], select=[uid, uname, SUM(uage) AS EXPR$3])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; +-
> Exchange(distribution=[hash[uid, uname]])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp;
> amp;nbsp;+- Calc(select=[uid, uname, uage], where=[(dt =
> CAST(CURRENT_DATE()))])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp;
> amp;nbsp; amp;nbsp; +- TableSourceScan(table=[[default_catalog,
> default_database, kafkaTable]], fields=[uid, uname, dt, uage])
>  gt;
>  gt;
>  gt;
>  gt; 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
> 
> 
> 
>  --
> 
>  Best,
>  Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li


Re: What is the best way to aggregate data over a long window

2024-05-20 Thread gongzhongqiang
Hi  Sachin,

`performing incremental aggregation using stateful processing` is same as
`windows with agg`, but former is more flexible.If flink window can not
satisfy your performance needs
,and your business logic has some features that can be customized for
optimization. You can choose the former.

Best,
Zhongqiang Gong

Sachin Mittal  于2024年5月17日周五 19:39写道:

> Hi,
> I am doing the following
> 1. Use reduce function where the data type of output after windowing is
> the same as the input.
> 2. Where the output of data type after windowing is different from that of
> input I use the aggregate function. For example:
>
> SingleOutputStreamOperator data =
> reducedPlayerStatsData
> .keyBy(new KeySelector())
> .window(
> TumblingEventTimeWindows.of(Time.seconds(secs)))
> .aggregate(new DataAggregator())
> .name("aggregate");
>
> In this case data which is aggregated is of a different type than the
> input so I had to use aggregate function.
> However in cases where data is of the same type using reduce function is
> very simple to use.
> Is there any fundamental difference between aggregate and reduce function
> in terms of performance?
> 3. I have enable incremental checkpoints at flink conf level using:
> state.backend.type: "rocksdb"
> state.backend.incremental: "true"
>
> 4. I am really not sure how I can use TTL. I assumed that Flink would
> automatically clean the state of windows that are expired ? Is there any
> way I can use TTL in the steps I have mentioned.
> 5. When you talk about pre-aggregation is this what you mean, say first
> compute minute aggregation and use that as input for hour aggregation ? So
> my pipeline would be something like this:
>
> SingleOutputStreamOperator reducedData =
> data
> .keyBy(new KeySelector())
> .window(
> TumblingEventTimeWindows.of(Time.seconds(60)))
> .reduce(new DataReducer()).window(
>
> TumblingEventTimeWindows.of(Time.seconds(3600)))
> .reduce(new DataReducer()).name("reduce");
>
>
> I was thinking of performing incremental aggregation using stateful 
> processing.
>
> Basically read one record and reduce it and store it in state and then read 
> next and reduce that plus the current state and update the new reduced value 
> back in the state and so on.
>
> Fire the final reduced value from the state at the end of eventtime I 
> register to my event timer and then update the timer to next event time and 
> also clean the state.
>
> This way each state would always keep only one record, no matter for what 
> period we aggregate data for.
>
> Is this a better approach than windowing ?
>
>
> Thanks
> Sachin
>
>
> On Fri, May 17, 2024 at 1:14 PM gongzhongqiang 
> wrote:
>
>> Hi  Sachin,
>>
>> We can optimize this problem in the following ways:
>> -
>> use 
>> org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
>> to reduce number of data
>> - use TTL to clean data which are not need
>> - enble incremental checkpoint
>> - use
>> multi-level time window granularity for pre-aggregation can significantly 
>> improve performance and reduce computation latency
>>
>> Best,
>> Zhongqiang Gong
>>
>> Sachin Mittal  于2024年5月17日周五 03:48写道:
>>
>>> Hi,
>>> My pipeline step is something like this:
>>>
>>> SingleOutputStreamOperator reducedData =
>>> data
>>> .keyBy(new KeySelector())
>>> .window(
>>> TumblingEventTimeWindows.of(Time.seconds(secs)))
>>> .reduce(new DataReducer())
>>> .name("reduce");
>>>
>>>
>>> This works fine for secs = 300.
>>> However once I increase the time window to say 1 hour or 3600 the state
>>> size increases as now it has a lot more records to reduce.
>>>
>>> Hence I need to allocate much more memory to the task manager.
>>>
>>> However there is no upper limit to this memory allocated. If the volume
>>> of data increases by say 10 fold I would have no option but to again
>>> increase the memory.
>>>
>>> Is there a better way to perform long window aggregation so overall this
>>> step has a small memory footprint.
>>>
>>> Thanks
>>> Sachin
>>>
>>>


Re: Email submission

2024-05-20 Thread Hang Ruan
Hi, Michas.

Please subscribe to the mailing list by sending an email to
user-subscr...@flink.apache.org .

Best,
Hang

Michas Szacillo (BLOOMBERG/ 919 3RD A) 
于2024年5月19日周日 04:34写道:

> Sending my email to join the apache user mailing list.
>
> Email: mszaci...@bloomberg.net
>


Re: Restore from checkpoint

2024-05-20 Thread archzi lu
Hi Phil,
correction: But the error
you have is a familiar error if you have written some code to handle
directory path.  --> But the error
you have is a familiar error if you have written some code to handle
directory path with Java.

No offence.

Best regards.
Jiadong. Lu

Jiadong Lu  于2024年5月20日周一 14:42写道:
>
> Hi, Phil
>
> I don't have more expertise about the flink-python module. But the error
> you have is a familiar error if you have written some code to handle
> directory path.
>
> The correct form of Path/URI will be :
> 1. "/home/foo"
> 2. "file:///home/foo/boo"
> 3. "hdfs:///home/foo/boo"
> 4. or Win32 directory form
>
> Best regards,
> Jiadong Lu
>
> On 2024/5/20 02:28, Phil Stavridis wrote:
> > Hi Lu,
> >
> > Thanks for your reply. In what way are the paths to get passed to the job 
> > that needs to used the checkpoint? Is the standard way, using -s :/ 
> > or by passing the path in the module as a Python arg?
> >
> > Kind regards
> > Phil
> >
> >> On 18 May 2024, at 03:19, jiadong.lu  wrote:
> >>
> >> Hi Phil,
> >>
> >> AFAIK, the error indicated your path was incorrect.
> >> your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
> >> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> >>
> >> Best.
> >> Jiadong.Lu
> >>
> >> On 5/18/24 2:37 AM, Phil Stavridis wrote:
> >>> Hi,
> >>> I am trying to test how the checkpoints work for restoring state, but not 
> >>> sure how to run a new instance of a flink job, after I have cancelled it, 
> >>> using the checkpoints which I store in the filesystem of the job manager, 
> >>> e.g. /opt/flink/checkpoints.
> >>> I have tried passing the checkpoint as an argument in the function and 
> >>> use it while setting the checkpoint but it looks like the way it is done 
> >>> is something like below:
> >>> docker-compose exec jobmanager flink run -s 
> >>> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py 
> >>> /opt/app/flink_job.py
> >>> But I am getting error:
> >>> Caused by: java.io.IOException: Checkpoint/savepoint path 
> >>> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid 
> >>> file URI. Either the pointer path is invalid, or the checkpoint was 
> >>> created by a different state backend.
> >>> What is wrong with the  way the job is re-submitted to the cluster?
> >>> Kind regards
> >>> Phil
> >


Re: Restore from checkpoint

2024-05-20 Thread Jiadong Lu

Hi, Phil

I don't have more expertise about the flink-python module. But the error 
you have is a familiar error if you have written some code to handle 
directory path.


The correct form of Path/URI will be :
1. "/home/foo"
2. "file:///home/foo/boo"
3. "hdfs:///home/foo/boo"
4. or Win32 directory form

Best regards,
Jiadong Lu

On 2024/5/20 02:28, Phil Stavridis wrote:

Hi Lu,

Thanks for your reply. In what way are the paths to get passed to the job that needs 
to used the checkpoint? Is the standard way, using -s :/ or by passing 
the path in the module as a Python arg?

Kind regards
Phil


On 18 May 2024, at 03:19, jiadong.lu  wrote:

Hi Phil,

AFAIK, the error indicated your path was incorrect.
your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.

Best.
Jiadong.Lu

On 5/18/24 2:37 AM, Phil Stavridis wrote:

Hi,
I am trying to test how the checkpoints work for restoring state, but not sure 
how to run a new instance of a flink job, after I have cancelled it, using the 
checkpoints which I store in the filesystem of the job manager, e.g. 
/opt/flink/checkpoints.
I have tried passing the checkpoint as an argument in the function and use it 
while setting the checkpoint but it looks like the way it is done is something 
like below:
docker-compose exec jobmanager flink run -s 
:/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py
But I am getting error:
Caused by: java.io.IOException: Checkpoint/savepoint path 
':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. 
Either the pointer path is invalid, or the checkpoint was created by a 
different state backend.
What is wrong with the  way the job is re-submitted to the cluster?
Kind regards
Phil




Re: flinksql 经过优化后,group by字段少了

2024-05-19 Thread Benchao Li
你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
版本开始就已经用的是这个 calcite 版本了。

所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
issue 来报一个 bug。

PS: 
上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。

[1] https://issues.apache.org/jira/browse/CALCITE-3531

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道:
>
> 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite 
> 中修复了,https://github.com/apache/calcite/pull/1602/files
> 但是,flink 中引用的 calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2024年5月20日(星期一) 上午10:32
> 收件人:"user-zh"
> 主题:Re: flinksql 经过优化后,group by字段少了
>
>
>
> 看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。
>
> 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月19日周日 01:01写道:
> 
>  create view tmp_view as
>  SELECT
>  dt, -- 2
>  uid, -- 0
>  uname, -- 1
>  uage -- 3
>  from
>  kafkaTable
>  where dt = cast(CURRENT_DATE as string);
> 
>  insert into printSinkTable
>  select
>  dt, uid, uname, sum(uage)
>  from tmp_view
>  group by
>  dt,
>  uid,
>  uname;
> 
> 
> 
>  sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
>  但是,经过优化后,生成的 物理结构如下:
>  == Optimized Execution Plan ==
>  Sink(table=[default_catalog.default_database.printSinkTable], 
> fields=[dt, uid, uname, EXPR$3])
>  +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
>  nbsp; nbsp;+- GroupAggregate(groupBy=[uid, uname], 
> select=[uid, uname, SUM(uage) AS EXPR$3])
>  nbsp; nbsp; nbsp; +- Exchange(distribution=[hash[uid, 
> uname]])
>  nbsp; nbsp; nbsp; nbsp; nbsp;+- 
> Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))])
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
> 
> 
> 
>  请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: Restore from checkpoint

2024-05-19 Thread Jinzhong Li
Hi Phil,

I think you can use the "-s :checkpointMetaDataPath" arg  to resume the job
from a retained checkpoint[1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint

Best,
Jinzhong Li


On Mon, May 20, 2024 at 2:29 AM Phil Stavridis  wrote:

> Hi Lu,
>
> Thanks for your reply. In what way are the paths to get passed to the job
> that needs to used the checkpoint? Is the standard way, using -s :/
> or by passing the path in the module as a Python arg?
>
> Kind regards
> Phil
>
> > On 18 May 2024, at 03:19, jiadong.lu  wrote:
> >
> > Hi Phil,
> >
> > AFAIK, the error indicated your path was incorrect.
> > your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or
> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> >
> > Best.
> > Jiadong.Lu
> >
> > On 5/18/24 2:37 AM, Phil Stavridis wrote:
> >> Hi,
> >> I am trying to test how the checkpoints work for restoring state, but
> not sure how to run a new instance of a flink job, after I have cancelled
> it, using the checkpoints which I store in the filesystem of the job
> manager, e.g. /opt/flink/checkpoints.
> >> I have tried passing the checkpoint as an argument in the function and
> use it while setting the checkpoint but it looks like the way it is done is
> something like below:
> >> docker-compose exec jobmanager flink run -s
> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py
> /opt/app/flink_job.py
> >> But I am getting error:
> >> Caused by: java.io.IOException: Checkpoint/savepoint path
> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file
> URI. Either the pointer path is invalid, or the checkpoint was created by a
> different state backend.
> >> What is wrong with the  way the job is re-submitted to the cluster?
> >> Kind regards
> >> Phil
>
>


Re: flinksql 经过优化后,group by字段少了

2024-05-19 Thread Benchao Li
看起来像是因为 "dt = cast(CURRENT_DATE  as string)" 推导 dt 这个字段是个常量,进而被优化掉了。

将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道:
>
> create view tmp_view as
> SELECT
> dt, -- 2
> uid, -- 0
> uname, -- 1
> uage -- 3
> from
> kafkaTable
> where dt = cast(CURRENT_DATE  as string);
>
> insert into printSinkTable
> select
> dt, uid, uname, sum(uage)
> from tmp_view
> group by
> dt,
> uid,
> uname;
>
>
>
> sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
> 但是,经过优化后,生成的 物理结构如下:
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt, 
> uid, uname, EXPR$3])
> +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
>  +- GroupAggregate(groupBy=[uid, uname], select=[uid, uname, 
> SUM(uage) AS EXPR$3])
>+- Exchange(distribution=[hash[uid, uname]])
> +- Calc(select=[uid, uname, uage], 
> where=[(dt = CAST(CURRENT_DATE()))])
>   +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
>
>
>
> 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢



-- 

Best,
Benchao Li


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 Thread Jingsong Li
CC to the Paimon community.

Best,
Jingsong

On Mon, May 20, 2024 at 9:55 AM Jingsong Li  wrote:
>
> Amazing, congrats!
>
> Best,
> Jingsong
>
> On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
> >
> > 退订
> >
> >
> >
> >
> >
> >
> >
> > Original Email
> >
> >
> >
> > Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
> >
> > Sent Time:2024/5/17 23:10
> >
> > To:"Qingsheng Ren"< re...@apache.org ;
> >
> > Cc recipient:"dev"< d...@flink.apache.org ;"user"< 
> > u...@flink.apache.org ;"user-zh"< user-zh@flink.apache.org ;"Apache 
> > Announce List"< annou...@apache.org ;
> >
> > Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
> >
> >
> > Congratulations !
> > Thanks for all contributors.
> >
> >
> > Best,
> >
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于 2024年5月17日周五 17:33写道:
> >
> >  The Apache Flink community is very happy to announce the release of
> >  Apache Flink CDC 3.1.0.
> > 
> >  Apache Flink CDC is a distributed data integration tool for real time
> >  data and batch data, bringing the simplicity and elegance of data
> >  integration via YAML to describe the data movement and transformation
> >  in a data pipeline.
> > 
> >  Please check out the release blog post for an overview of the release:
> > 
> >  
> > https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> > 
> >  The release is available for download at:
> >  https://flink.apache.org/downloads.html
> > 
> >  Maven artifacts for Flink CDC can be found at:
> >  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> > 
> >  The full release notes are available in Jira:
> > 
> >  
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> > 
> >  We would like to thank all contributors of the Apache Flink community
> >  who made this release possible!
> > 
> >  Regards,
> >  Qingsheng Ren
> > 


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 Thread Jingsong Li
CC to the Paimon community.

Best,
Jingsong

On Mon, May 20, 2024 at 9:55 AM Jingsong Li  wrote:
>
> Amazing, congrats!
>
> Best,
> Jingsong
>
> On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
> >
> > 退订
> >
> >
> >
> >
> >
> >
> >
> > Original Email
> >
> >
> >
> > Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
> >
> > Sent Time:2024/5/17 23:10
> >
> > To:"Qingsheng Ren"< re...@apache.org ;
> >
> > Cc recipient:"dev"< d...@flink.apache.org ;"user"< 
> > user@flink.apache.org ;"user-zh"< user...@flink.apache.org ;"Apache 
> > Announce List"< annou...@apache.org ;
> >
> > Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
> >
> >
> > Congratulations !
> > Thanks for all contributors.
> >
> >
> > Best,
> >
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于 2024年5月17日周五 17:33写道:
> >
> >  The Apache Flink community is very happy to announce the release of
> >  Apache Flink CDC 3.1.0.
> > 
> >  Apache Flink CDC is a distributed data integration tool for real time
> >  data and batch data, bringing the simplicity and elegance of data
> >  integration via YAML to describe the data movement and transformation
> >  in a data pipeline.
> > 
> >  Please check out the release blog post for an overview of the release:
> > 
> >  
> > https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> > 
> >  The release is available for download at:
> >  https://flink.apache.org/downloads.html
> > 
> >  Maven artifacts for Flink CDC can be found at:
> >  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> > 
> >  The full release notes are available in Jira:
> > 
> >  
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> > 
> >  We would like to thank all contributors of the Apache Flink community
> >  who made this release possible!
> > 
> >  Regards,
> >  Qingsheng Ren
> > 


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 Thread Jingsong Li
Amazing, congrats!

Best,
Jingsong

On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
>
> 退订
>
>
>
>
>
>
>
> Original Email
>
>
>
> Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
>
> Sent Time:2024/5/17 23:10
>
> To:"Qingsheng Ren"< re...@apache.org ;
>
> Cc recipient:"dev"< d...@flink.apache.org ;"user"< u...@flink.apache.org 
> ;"user-zh"< user-zh@flink.apache.org ;"Apache Announce List"< 
> annou...@apache.org ;
>
> Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
>
>
> Congratulations !
> Thanks for all contributors.
>
>
> Best,
>
> Zhongqiang Gong
>
> Qingsheng Ren  于 2024年5月17日周五 17:33写道:
>
>  The Apache Flink community is very happy to announce the release of
>  Apache Flink CDC 3.1.0.
> 
>  Apache Flink CDC is a distributed data integration tool for real time
>  data and batch data, bringing the simplicity and elegance of data
>  integration via YAML to describe the data movement and transformation
>  in a data pipeline.
> 
>  Please check out the release blog post for an overview of the release:
> 
>  
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
>  The release is available for download at:
>  https://flink.apache.org/downloads.html
> 
>  Maven artifacts for Flink CDC can be found at:
>  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
>  The full release notes are available in Jira:
> 
>  
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> 
>  We would like to thank all contributors of the Apache Flink community
>  who made this release possible!
> 
>  Regards,
>  Qingsheng Ren
> 


Aw: Re: Advice Needed: Setting Up Prometheus and Grafana Monitoring for Apache Flink on Kubernetes

2024-05-19 Thread Oliver Schmied
Dear Biao Geng,

 

thank you very much. With the help of your demo and the YAML configuration, I was able to successfully set up monitoring for my Apache Flink jobs.

 

Thanks again for your time and help.

 

Best regards,

Oliver

 
 

Gesendet: Sonntag, 19. Mai 2024 um 17:42 Uhr
Von: "Biao Geng" 
An: "Oliver Schmied" 
Cc: user@flink.apache.org
Betreff: Re: Advice Needed: Setting Up Prometheus and Grafana Monitoring for Apache Flink on Kubernetes


Hi Oliver,
 

I believe you are almost there. One thing I found could improve is that in your job yaml, instead of using:
    kubernetes.operator.metrics.reporter.prommetrics.reporters: prom
    kubernetes.operator.metrics.reporter.prommetrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    kubernetes.operator.metrics.reporter.prom.port: 9249-9250
, you should use 

    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: "9249"

 

Configs with the prefix, `kubernetes.operator`, is for the flink k8s operator itself(You may use it if you want to collect the metrics of the operator). For the job config, we do not need it.

 

I created a detailed demo of using Prometheus to monitor jobs started by flink k8s operator. Maybe it can be helpful.

 


Best,
Biao Geng





 


Oliver Schmied  于2024年5月19日周日 04:21写道:




Dear Apache Flink Community,

I am currently trying to monitor an Apache Flink cluster deployed on Kubernetes using Prometheus and Grafana. Despite following the official guide (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/metrics-logging/)  on how to setup prometheus I have not been able to get Flink-specific metrics to appear in Prometheus. I am reaching out to seek your assistance, as I`ve tried many things but nothing worked.

 

# My setup:

* Kubernetes

* flink v.18 deployed as FlinkDeployment

with this manifest:

```apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: default
  name: flink-cluster
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    #Added
    kubernetes.operator.metrics.reporter.prommetrics.reporters: prom
    kubernetes.operator.metrics.reporter.prommetrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    kubernetes.operator.metrics.reporter.prom.port: 9249-9250
  serviceAccount: flink
  jobManager:
    resource:
  memory: "1048m"
  cpu: 1
  taskManager:
    resource:
  memory: "1048m"
  cpu: 1

```

* Prometheus operator install via

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack

 

* deployed a pod-monitor.yaml

```

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: flink-kubernetes-operator
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
  app: flink-cluster
  podMetricsEndpoints:
  - port: metrics

 

```

 

# The problem

 

* I can access prometheus fine and concerning the logs of the pod-monitor, it seems to collect flink specific metrics, but I can't access these metrics with flink

* Do I even setup prometheus correctly in my flink deployment manifest?


* I also added the following line to my values.yaml file, but apart from that I change nothing:

```

metrics:
  port: 


```

 

# My questions

 

* Can anyone see the mistake in my deployment?

* Or does anyone have a better idea on how to monitor my flink deployment?

 

 

I would be very grateful for your answers. Thank you very much.

 

Best regards,

Oliver

 

 

 

 

 










Re: Restore from checkpoint

2024-05-19 Thread Phil Stavridis
Hi Lu,

Thanks for your reply. In what way are the paths to get passed to the job that 
needs to used the checkpoint? Is the standard way, using -s :/ or by 
passing the path in the module as a Python arg?

Kind regards
Phil

> On 18 May 2024, at 03:19, jiadong.lu  wrote:
> 
> Hi Phil,
> 
> AFAIK, the error indicated your path was incorrect.
> your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> 
> Best.
> Jiadong.Lu
> 
> On 5/18/24 2:37 AM, Phil Stavridis wrote:
>> Hi,
>> I am trying to test how the checkpoints work for restoring state, but not 
>> sure how to run a new instance of a flink job, after I have cancelled it, 
>> using the checkpoints which I store in the filesystem of the job manager, 
>> e.g. /opt/flink/checkpoints.
>> I have tried passing the checkpoint as an argument in the function and use 
>> it while setting the checkpoint but it looks like the way it is done is 
>> something like below:
>> docker-compose exec jobmanager flink run -s 
>> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py 
>> /opt/app/flink_job.py
>> But I am getting error:
>> Caused by: java.io.IOException: Checkpoint/savepoint path 
>> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file 
>> URI. Either the pointer path is invalid, or the checkpoint was created by a 
>> different state backend.
>> What is wrong with the  way the job is re-submitted to the cluster?
>> Kind regards
>> Phil



Re: Advice Needed: Setting Up Prometheus and Grafana Monitoring for Apache Flink on Kubernetes

2024-05-19 Thread Biao Geng
Hi Oliver,

I believe you are almost there. One thing I found could improve is that in
your job yaml, instead of using:
kubernetes.operator.metrics.reporter.prommetrics.reporters: prom

kubernetes.operator.metrics.reporter.prommetrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
kubernetes.operator.metrics.reporter.prom.port: 9249-9250
, you should use
metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9249"

Configs with the prefix, `kubernetes.operator`, is for the flink k8s
operator itself(You may use it if you want to collect the metrics of the
operator). For the job config, we do not need it.

I created a detailed demo

of using Prometheus to monitor jobs started by flink k8s operator. Maybe it
can be helpful.

Best,
Biao Geng


Oliver Schmied  于2024年5月19日周日 04:21写道:

> Dear Apache Flink Community,
>
> I am currently trying to monitor an Apache Flink cluster deployed on
> Kubernetes using Prometheus and Grafana. Despite following the official
> guide (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/metrics-logging/)
> on how to setup prometheus I have not been able to get Flink-specific
> metrics to appear in Prometheus. I am reaching out to seek your assistance,
> as I`ve tried many things but nothing worked.
>
>
>
> # My setup:
>
> * Kubernetes
>
> * flink v.18 deployed as FlinkDeployment
>
> with this manifest:
>
> ```apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   namespace: default
>   name: flink-cluster
> spec:
>   image: flink:1.18
>   flinkVersion: v1_18
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> #Added
> kubernetes.operator.metrics.reporter.prommetrics.reporters: prom
>
> kubernetes.operator.metrics.reporter.prommetrics.reporter.prom.factory.class:
> org.apache.flink.metrics.prometheus.PrometheusReporterFactory
> kubernetes.operator.metrics.reporter.prom.port: 9249-9250
>   serviceAccount: flink
>   jobManager:
> resource:
>   memory: "1048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "1048m"
>   cpu: 1
>
> ```
>
> * Prometheus operator install via
>
> helm repo add prometheus-community 
> https://prometheus-community.github.io/helm-chartshelm install prometheus 
> prometheus-community/kube-prometheus-stack
>
>
> * deployed a pod-monitor.yaml
> ```
> apiVersion: monitoring.coreos.com/v1
> kind: PodMonitor
> metadata:
>   name: flink-kubernetes-operator
>   labels:
> release: prometheus
> spec:
>   selector:
> matchLabels:
>   app: flink-cluster
>   podMetricsEndpoints:
>   - port: metrics
>
> ```
>
> # The problem
>
> * I can access prometheus fine and concerning the logs of the pod-monitor,
> it seems to collect flink specific metrics, but I can't access these
> metrics with flink
> * Do I even setup prometheus correctly in my flink deployment manifest?
> * I also added the following line to my values.yaml file, but apart from
> that I change nothing:
> ```
>
> metrics:  port: 
>
> ```
>
> # My questions
>
> * Can anyone see the mistake in my deployment?
> * Or does anyone have a better idea on how to monitor my flink deployment?
>
>
> I would be very grateful for your answers. Thank you very much.
>
> Best regards,
> Oliver
>
>
>
>
>
>
>


Advice Needed: Setting Up Prometheus and Grafana Monitoring for Apache Flink on Kubernetes

2024-05-18 Thread Oliver Schmied
Dear Apache Flink Community,

I am currently trying to monitor an Apache Flink cluster deployed on Kubernetes using Prometheus and Grafana. Despite following the official guide (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/metrics-logging/)  on how to setup prometheus I have not been able to get Flink-specific metrics to appear in Prometheus. I am reaching out to seek your assistance, as I`ve tried many things but nothing worked.

 

# My setup:

* Kubernetes

* flink v.18 deployed as FlinkDeployment

with this manifest:

```apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: default
  name: flink-cluster
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    #Added
    kubernetes.operator.metrics.reporter.prommetrics.reporters: prom
    kubernetes.operator.metrics.reporter.prommetrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    kubernetes.operator.metrics.reporter.prom.port: 9249-9250
  serviceAccount: flink
  jobManager:
    resource:
  memory: "1048m"
  cpu: 1
  taskManager:
    resource:
  memory: "1048m"
  cpu: 1

```

* Prometheus operator install via

helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack

 

* deployed a pod-monitor.yaml

```

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: flink-kubernetes-operator
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
  app: flink-cluster
  podMetricsEndpoints:
  - port: metrics

 

```

 

# The problem

 

* I can access prometheus fine and concerning the logs of the pod-monitor, it seems to collect flink specific metrics, but I can't access these metrics with flink

* Do I even setup prometheus correctly in my flink deployment manifest?


* I also added the following line to my values.yaml file, but apart from that I change nothing:

```

metrics:
  port: 


```

 

# My questions

 

* Can anyone see the mistake in my deployment?

* Or does anyone have a better idea on how to monitor my flink deployment?

 

 

I would be very grateful for your answers. Thank you very much.

 

Best regards,

Oliver

 

 

 

 

 


Email submission

2024-05-18 Thread Michas Szacillo (BLOOMBERG/ 919 3RD A)
Sending my email to join the apache user mailing list. 

Email: mszaci...@bloomberg.net

Re: problem with the heartbeat interval feature

2024-05-18 Thread Hongshun Wang
Hi Thomas,

I have reviewed the code and just
noticed that heartbeat.action.query is not mandatory. Debezium will
generate Heartbeat Events at regular intervals. Flink CDC will then
receive these Heartbeat Events and advance the offset[1]. Finally, the
source
reader
will commit the offset during checkpointing in the streaming phase[2].

Therefore, you may want to verify whether checkpointing is enabled and
if the process has entered the streaming phase (indicating that it is
only reading the WAL log).

[1]
https://github.com/apache/flink-cdc/blob/d386c7c1c2db3bb8590be6c20198286cf0567c97/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L119

[2]
https://github.com/apache/flink-cdc/blob/d386c7c1c2db3bb8590be6c20198286cf0567c97/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReaderWithCommit.java#L93

On Sat, May 18, 2024 at 12:34 AM Thomas Peyric 
wrote:

> thanks Hongshun for your response !
>
> Le ven. 17 mai 2024 à 07:51, Hongshun Wang  a
> écrit :
>
>> Hi Thomas,
>>
>> In debezium dos says: For the connector to detect and process events from
>> a heartbeat table, you must add the table to the PostgreSQL publication
>> specified by the publication.name
>> 
>>  property.
>> If this publication predates your Debezium deployment, the connector uses
>> the publications as defined. If the publication is not already configured
>> to automatically replicate changes FOR ALL TABLES in the database, you
>> must explicitly add the heartbeat table to the publication[2].
>>
>> Thus, if you want use heart beat in cdc:
>>
>>1. add a heartbeat table to publication: ALTER PUBLICATION
>>** ADD TABLE **;
>>2. set heartbeatInterval
>>3. add debezium.heartbeat.action.query
>>
>> 
>> [3]
>>
>> However, when I use it it CDC, some exception occurs:
>>
>> Caused by: java.lang.NullPointerException
>> at 
>> io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
>> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
>> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94)
>>
>>
>>
>>
>> It seems CDC don't add  a HeartbeatConnectionProvider  when configure
>> PostgresEventDispatcher:
>>
>> //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
>>  =
>> new PostgresEventDispatcher<>(
>> dbzConfig,
>> topicSelector,
>> schema,
>> queue,
>> dbzConfig.getTableFilters().dataCollectionFilter(),
>> DataChangeEvent::new,
>> metadataProvider,
>> schemaNameAdjuster);
>>
>>
>> In debezium, when PostgresConnectorTask start, it will  do it
>>
>> //io.debezium.connector.postgresql.PostgresConnectorTask#start  final 
>> PostgresEventDispatcher dispatcher = new PostgresEventDispatcher<>(
>> connectorConfig,
>> topicNamingStrategy,
>> schema,
>> queue,
>> connectorConfig.getTableFilters().dataCollectionFilter(),
>> DataChangeEvent::new,
>> PostgresChangeRecordEmitter::updateSchema,
>> metadataProvider,
>> connectorConfig.createHeartbeat(
>> topicNamingStrategy,
>> schemaNameAdjuster,
>> () -> new 
>> PostgresConnection(connectorConfig.getJdbcConfig(), 
>> PostgresConnection.CONNECTION_GENERAL),
>> exception -> {
>> String sqlErrorId = exception.getSQLState();
>> switch (sqlErrorId) {
>> case "57P01":
>> // Postgres error admin_shutdown, 
>> see https://www.postgresql.org/docs/12/errcodes-appendix.html
>> throw new DebeziumException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>> case "57P03":
>> // Postgres error 
>> cannot_connect_now, see 
>> https://www.postgresql.org/docs/12/errcodes-appendix.html
>> throw new RetriableException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>> 

Re: Restore from checkpoint

2024-05-17 Thread jiadong.lu

Hi Phil,

AFAIK, the error indicated your path was incorrect.
your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.


Best.
Jiadong.Lu

On 5/18/24 2:37 AM, Phil Stavridis wrote:

Hi,

I am trying to test how the checkpoints work for restoring state, but not sure 
how to run a new instance of a flink job, after I have cancelled it, using the 
checkpoints which I store in the filesystem of the job manager, e.g. 
/opt/flink/checkpoints.

I have tried passing the checkpoint as an argument in the function and use it 
while setting the checkpoint but it looks like the way it is done is something 
like below:


docker-compose exec jobmanager flink run -s 
:/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py

But I am getting error:
Caused by: java.io.IOException: Checkpoint/savepoint path 
':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. 
Either the pointer path is invalid, or the checkpoint was created by a 
different state backend.

What is wrong with the  way the job is re-submitted to the cluster?

Kind regards
Phil


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Muhammet Orazov via user

Amazing, congrats!

Thanks for your efforts!

Best,
Muhammet

On 2024-05-17 09:32, Qingsheng Ren wrote:

The Apache Flink community is very happy to announce the release of
Apache Flink CDC 3.1.0.

Apache Flink CDC is a distributed data integration tool for real time
data and batch data, bringing the simplicity and elegance of data
integration via YAML to describe the data movement and transformation
in a data pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Qingsheng Ren


Restore from checkpoint

2024-05-17 Thread Phil Stavridis
Hi,

I am trying to test how the checkpoints work for restoring state, but not sure 
how to run a new instance of a flink job, after I have cancelled it, using the 
checkpoints which I store in the filesystem of the job manager, e.g. 
/opt/flink/checkpoints.

I have tried passing the checkpoint as an argument in the function and use it 
while setting the checkpoint but it looks like the way it is done is something 
like below:


docker-compose exec jobmanager flink run -s 
:/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py /opt/app/flink_job.py

But I am getting error:
Caused by: java.io.IOException: Checkpoint/savepoint path 
':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid file URI. 
Either the pointer path is invalid, or the checkpoint was created by a 
different state backend.

What is wrong with the  way the job is re-submitted to the cluster? 

Kind regards
Phil

Re: problem with the heartbeat interval feature

2024-05-17 Thread Thomas Peyric
thanks Hongshun for your response !

Le ven. 17 mai 2024 à 07:51, Hongshun Wang  a
écrit :

> Hi Thomas,
>
> In debezium dos says: For the connector to detect and process events from
> a heartbeat table, you must add the table to the PostgreSQL publication
> specified by the publication.name
> 
>  property.
> If this publication predates your Debezium deployment, the connector uses
> the publications as defined. If the publication is not already configured
> to automatically replicate changes FOR ALL TABLES in the database, you
> must explicitly add the heartbeat table to the publication[2].
>
> Thus, if you want use heart beat in cdc:
>
>1. add a heartbeat table to publication: ALTER PUBLICATION
>** ADD TABLE **;
>2. set heartbeatInterval
>3. add debezium.heartbeat.action.query
>
> 
> [3]
>
> However, when I use it it CDC, some exception occurs:
>
> Caused by: java.lang.NullPointerException
> at 
> io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
> at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94)
>
>
>
>
> It seems CDC don't add  a HeartbeatConnectionProvider  when configure
> PostgresEventDispatcher:
>
> //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
>  =
> new PostgresEventDispatcher<>(
> dbzConfig,
> topicSelector,
> schema,
> queue,
> dbzConfig.getTableFilters().dataCollectionFilter(),
> DataChangeEvent::new,
> metadataProvider,
> schemaNameAdjuster);
>
>
> In debezium, when PostgresConnectorTask start, it will  do it
>
> //io.debezium.connector.postgresql.PostgresConnectorTask#start  final 
> PostgresEventDispatcher dispatcher = new PostgresEventDispatcher<>(
> connectorConfig,
> topicNamingStrategy,
> schema,
> queue,
> connectorConfig.getTableFilters().dataCollectionFilter(),
> DataChangeEvent::new,
> PostgresChangeRecordEmitter::updateSchema,
> metadataProvider,
> connectorConfig.createHeartbeat(
> topicNamingStrategy,
> schemaNameAdjuster,
> () -> new 
> PostgresConnection(connectorConfig.getJdbcConfig(), 
> PostgresConnection.CONNECTION_GENERAL),
> exception -> {
> String sqlErrorId = exception.getSQLState();
> switch (sqlErrorId) {
> case "57P01":
> // Postgres error admin_shutdown, see 
> https://www.postgresql.org/docs/12/errcodes-appendix.html 
>throw new DebeziumException("Could not execute heartbeat 
> action query (Error: " + sqlErrorId + ")", exception);
> case "57P03":
> // Postgres error cannot_connect_now, 
> see https://www.postgresql.org/docs/12/errcodes-appendix.html 
>throw new RetriableException("Could not execute 
> heartbeat action query (Error: " + sqlErrorId + ")", exception);
> default:
> break;
> }
> }),
> schemaNameAdjuster,
> signalProcessor);
>
> Thus, I have create a new jira[4] to fix it.
>
>
>
>  [1]
> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/
>
> [2]
> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms
>
> [3]
> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query
>
> [4] https://issues.apache.org/jira/browse/FLINK-35387
>
>
> Best
>
> Hongshun
>
> On Thu, May 16, 2024 at 9:03 PM Thomas Peyric 
> wrote:
>
>> Hi Flink Community !
>>
>> I am using :
>> * Flink
>> * Flink CDC posgtres Connector
>> * scala + sbt
>>
>> versions are :
>>   * orgApacheKafkaVersion = "3.2.3"
>>   * flinkVersion = "1.19.0"
>>   * flinkKafkaVersion = "3.0.2-1.18"
>>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>>   * debeziumVersion = "1.9.8.Final"
>>   * scalaVersion = "2.12.13"
>>   * 

Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread gongzhongqiang
Congratulations !
Thanks for all contributors.


Best,

Zhongqiang Gong

Qingsheng Ren  于 2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread gongzhongqiang
Congratulations !
Thanks for all contributors.


Best,

Zhongqiang Gong

Qingsheng Ren  于 2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


RockDb - Failed to clip DB after initialization - end key comes before start key

2024-05-17 Thread Francesco Leone
Hi,

We are facing a new issue related to RockDb when deploying a new version of
our job, which is adding 3 more operators. We are using flink 1.17.1 with
RockDb on Java 11.

We get an exception from another pre-existing operator during its
initialization. That operator and the new ones have different ids and they
are not in the same path of execution. So we are struggling to correlate
the new operators with this error.

In particular the error message is

*our-operator-name -> Timestamps/Watermarks (12/12)#4
(49136e1eafd63fcf33f7574ed87c768a_6c6dc6fa91330c57e3390709d51035b1_11_4)
switched from INITIALIZING to FAILED with failure cause:"*

and the stack trace of the error is






















*o.r.RocksDBException: end key comes before start keyat
org.rocksdb.RocksDB.deleteRange(RocksDB.java)at
org.rocksdb.RocksDB.deleteRange(RocksDB.java:1493)at
o.a.f.c.s.s.RocksDBIncrementalCheckpointUtils.deleteRange(RocksDBIncrementalCheckpointUtils.java:153)at
o.a.f.c.s.s.RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(RocksDBIncrementalCheckpointUtils.java:129)at
o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.initDBWithRescaling(RocksDBIncrementalRestoreOperation.java:388)...
21 common frames omittedWrapped by: o.a.f.r.s.BackendBuildingException:
Failed to clip DB after initialization.at
o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.initDBWithRescaling(RocksDBIncrementalRestoreOperation.java:397)at
o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:295)at
o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:167)at
o.a.f.c.s.s.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)at
o.a.f.c.s.s.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512)at
o.a.f.c.s.s.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99)at
o.a.f.s.a.o.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)at
o.a.f.s.a.o.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)at
o.a.f.s.a.o.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)...
13 common frames omittedWrapped by: o.a.f.u.FlinkException: Could not
restore keyed state backend for
KeyedProcessOperator_6c6dc6fa91330c57e3390709d51035b1_(12/12) from any of
the 1 provided restore options.at
o.a.f.s.a.o.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)at
o.a.f.s.a.o.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)at
o.a.f.s.a.o.StreamTaskStateInitializerImpl.stream...*


This RockDb ticket https://github.com/facebook/rocksdb/issues/8239 could be
correlated to this problem.

Any help would be really appreciated.

Many Thanks


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Hang Ruan
Congratulations!

Thanks for the great work.

Best,
Hang

Qingsheng Ren  于2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Hang Ruan
Congratulations!

Thanks for the great work.

Best,
Hang

Qingsheng Ren  于2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Leonard Xu
Congratulations !

Thanks Qingsheng for the great work and all contributors involved !!

Best,
Leonard


> 2024年5月17日 下午5:32,Qingsheng Ren  写道:
> 
> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
> 
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
> 
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
> 
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> 
> Regards,
> Qingsheng Ren



Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Leonard Xu
Congratulations !

Thanks Qingsheng for the great work and all contributors involved !!

Best,
Leonard


> 2024年5月17日 下午5:32,Qingsheng Ren  写道:
> 
> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
> 
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
> 
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
> 
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> 
> Regards,
> Qingsheng Ren



[ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Qingsheng Ren
The Apache Flink community is very happy to announce the release of
Apache Flink CDC 3.1.0.

Apache Flink CDC is a distributed data integration tool for real time
data and batch data, bringing the simplicity and elegance of data
integration via YAML to describe the data movement and transformation
in a data pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Qingsheng Ren


[ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Qingsheng Ren
The Apache Flink community is very happy to announce the release of
Apache Flink CDC 3.1.0.

Apache Flink CDC is a distributed data integration tool for real time
data and batch data, bringing the simplicity and elegance of data
integration via YAML to describe the data movement and transformation
in a data pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Qingsheng Ren


Re: Get access to unmatching events in Apache Flink Cep

2024-05-17 Thread Anton Sidorov
Ok, thanks for the reply.

пт, 17 мая 2024 г. в 09:22, Biao Geng :

> Hi Anton,
>
> I am afraid that currently there is no such API to access the middle NFA
> state in your case. For patterns that contain 'within()' condition, the
> timeout events could be retrieved via TimedOutPartialMatchHandler
> interface, but other unmatching events would be pruned immediately once
> they are considered as unnecessary to keep.
>
> Best,
> Biao Geng
>
>
> Anton Sidorov  于2024年5月16日周四 16:12写道:
>
>> Hello!
>>
>> I have a Flink Job with CEP pattern.
>>
>> Pattern example:
>>
>> // Strict Contiguity
>> // a b+ c d e
>> Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
>> .next("b").where(...).oneOrMore()
>> .next("c").where(...)
>> .next("d").where(...)
>> .next("e").where(...);
>>
>> I have events with wrong order stream on input:
>>
>> a b d c e
>>
>> On output I haven`t any matching. But I want have access to events, that
>> not matching.
>>
>> Can I have access to middle NFA state in CEP pattern, or get some other
>> way to view unmatching events?
>>
>> Example project with CEP pattern on github
>> , and my question
>> on SO
>> 
>>
>> Thanks in advance
>>
>

-- 
С уважением, Антон.


Re: SSL Kafka PyFlink

2024-05-17 Thread Evgeniy Lyutikov via user
Hi Phil

You need specify keystore with CA location [1]

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#security



От: gongzhongqiang 
Отправлено: 17 мая 2024 г. 10:44:18
Кому: Phil Stavridis
Копия: user@flink.apache.org
Тема: Re: SSL Kafka PyFlink

Hi Phil,

The kafka configuration keys of ssl maybe not correct. You can refer the kafka 
document[1] to get the ssl configurations of client.


[1] https://kafka.apache.org/documentation/#security_configclients


Best,
Zhongqiang Gong

Phil Stavridis mailto:phi...@gmail.com>> 于2024年5月17日周五 
01:44写道:
Hi,

I have a PyFlink job that needs to read from a Kafka topic and the 
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just 
Python.

from confluent_kafka import Consumer, KafkaException, KafkaError



def get_config(bootstrap_servers, ca_file, cert_file, key_file):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SSL',
'ssl.ca.location': ca_file,
'ssl.certificate.location': cert_file,
'ssl.key.location': key_file,
'ssl.endpoint.identification.algorithm': 'none',
'enable.ssl.certificate.verification': 'false',
'group.id': ‘my_group_id'
}


return config



And have read messages from the Kafka topic.

I am trying to set up something similar with Flink SQL:

t_env.execute_sql(f"""
CREATE TABLE logs (
`user` ROW(`user_id` BIGINT),
`timestamp` ROW(`secs` BIGINT)
) WITH (
'connector' = '{CONNECTOR_TYPE}',
'topic' = ‘{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id' = '{CONSUMER_GROUP}',
'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
'format' = '{MESSAGE_FORMAT}',
'properties.security.protocol' = 'SSL',
'properties.ssl.ca.location' = '{ca_file}',
'properties.ssl.certificate.location' = '{cert_file}',
'properties.ssl.key.location' = '{key_file}',
'properties.ssl.endpoint.identification.algorithm' = ''
)
""")


But when this runs I am getting this error:

Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator 
cbc357ccb763df2852fee8c4fc7d55f2).
...
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list 
subscribed topic partitions due to
at
...
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs].
at
...
... 3 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target
...
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
at java.lang.Thread.run(Thread.java:750)
Caused by: sun.security.validator.ValidatorException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)

Re: What is the best way to aggregate data over a long window

2024-05-17 Thread Sachin Mittal
Hi,
I am doing the following
1. Use reduce function where the data type of output after windowing is the
same as the input.
2. Where the output of data type after windowing is different from that of
input I use the aggregate function. For example:

SingleOutputStreamOperator data =
reducedPlayerStatsData
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.aggregate(new DataAggregator())
.name("aggregate");

In this case data which is aggregated is of a different type than the input
so I had to use aggregate function.
However in cases where data is of the same type using reduce function is
very simple to use.
Is there any fundamental difference between aggregate and reduce function
in terms of performance?
3. I have enable incremental checkpoints at flink conf level using:
state.backend.type: "rocksdb"
state.backend.incremental: "true"

4. I am really not sure how I can use TTL. I assumed that Flink would
automatically clean the state of windows that are expired ? Is there any
way I can use TTL in the steps I have mentioned.
5. When you talk about pre-aggregation is this what you mean, say first
compute minute aggregation and use that as input for hour aggregation ? So
my pipeline would be something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new DataReducer()).window(

TumblingEventTimeWindows.of(Time.seconds(3600)))
.reduce(new DataReducer()).name("reduce");


I was thinking of performing incremental aggregation using stateful processing.

Basically read one record and reduce it and store it in state and then
read next and reduce that plus the current state and update the new
reduced value back in the state and so on.

Fire the final reduced value from the state at the end of eventtime I
register to my event timer and then update the timer to next event
time and also clean the state.

This way each state would always keep only one record, no matter for
what period we aggregate data for.

Is this a better approach than windowing ?


Thanks
Sachin


On Fri, May 17, 2024 at 1:14 PM gongzhongqiang 
wrote:

> Hi  Sachin,
>
> We can optimize this problem in the following ways:
> -
> use 
> org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
> to reduce number of data
> - use TTL to clean data which are not need
> - enble incremental checkpoint
> - use
> multi-level time window granularity for pre-aggregation can significantly 
> improve performance and reduce computation latency
>
> Best,
> Zhongqiang Gong
>
> Sachin Mittal  于2024年5月17日周五 03:48写道:
>
>> Hi,
>> My pipeline step is something like this:
>>
>> SingleOutputStreamOperator reducedData =
>> data
>> .keyBy(new KeySelector())
>> .window(
>> TumblingEventTimeWindows.of(Time.seconds(secs)))
>> .reduce(new DataReducer())
>> .name("reduce");
>>
>>
>> This works fine for secs = 300.
>> However once I increase the time window to say 1 hour or 3600 the state
>> size increases as now it has a lot more records to reduce.
>>
>> Hence I need to allocate much more memory to the task manager.
>>
>> However there is no upper limit to this memory allocated. If the volume
>> of data increases by say 10 fold I would have no option but to again
>> increase the memory.
>>
>> Is there a better way to perform long window aggregation so overall this
>> step has a small memory footprint.
>>
>> Thanks
>> Sachin
>>
>>


Re: Re: Re: Flink kafka connector for v 1.19.0

2024-05-17 Thread Niklas Wilcke
Hi Hang,

thanks for pointing me to the mail thread. That is indeed interesting. Can we 
maybe ping someone to get this done? Can I do something about it? Becoming a 
PMC member might be difficult. :)
Are still three PMC votes outstanding? I'm not entirely sure how to properly 
check who is part of the PMC.

Regards,
Niklas

> On 17. May 2024, at 05:03, Hang Ruan  wrote:
> 
> Hi, Niklas.
> 
> The kafka connector version 3.2.0[1] is for Flink 1.19 and it has a vote 
> thread[2] already. But there is not enough votes,
> 
> Best,
> Hang
> 
> [1] https://issues.apache.org/jira/browse/FLINK-35138
> [2] https://lists.apache.org/thread/7shs2wzb0jkfdyst3mh6d9pn3z1bo93c
> 
> Niklas Wilcke mailto:niklas.wil...@uniberg.com>> 
> 于2024年5月16日周四 22:04写道:
>> Hi Ahmed,
>> 
>> are you aware of a blocker? I'm also a bit confused that after Flink 1.19 
>> being available for a month now the connectors still aren't. It would be 
>> great to get some insights or maybe a reference to an issue. From looking at 
>> the Github repos and the Jira I wasn't able to spot something obvious 
>> telling me that this matter is really in the focus. Thank you!
>> 
>> Regards,
>> Niklas
>> 
>> 
>>> On 10. May 2024, at 20:10, Ahmed Hamdy >> > wrote:
>>> 
>>> Hi Aniket
>>> 
>>> The community is currently working on releasing a new version for all the 
>>> connectors that is compatible with 1.19. Please follow the announcements in 
>>> Flink website[1] to get notified when it is available.
>>> 
>>> 1-https://flink.apache.org/posts/
>>> Best Regards
>>> Ahmed Hamdy
>>> 
>>> 
>>> On Fri, 10 May 2024 at 18:14, Aniket Sule >> > wrote:
 Hello,
 
 On the Flink downloads page, the latest stable version is Flink 1.19.0. 
 However, the Flink Kafka connector is v 3.1.0, that is compatible with 
 1.18.x.
 
 Is there a timeline when the Kafka connector for v 1.19 will be released? 
 Is it possible to use the v3.1.0 connector with Flink v 1.19?
 
  
 
 Thanks and regards,
 
 Aniket Sule
 
 Caution: External email. Do not click or open attachments unless you know 
 and trust the sender.
>> 



smime.p7s
Description: S/MIME cryptographic signature


Re: What is the best way to aggregate data over a long window

2024-05-17 Thread gongzhongqiang
Hi  Sachin,

We can optimize this problem in the following ways:
-
use 
org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction)
to reduce number of data
- use TTL to clean data which are not need
- enble incremental checkpoint
- use
multi-level time window granularity for pre-aggregation can
significantly improve performance and reduce computation latency

Best,
Zhongqiang Gong

Sachin Mittal  于2024年5月17日周五 03:48写道:

> Hi,
> My pipeline step is something like this:
>
> SingleOutputStreamOperator reducedData =
> data
> .keyBy(new KeySelector())
> .window(
> TumblingEventTimeWindows.of(Time.seconds(secs)))
> .reduce(new DataReducer())
> .name("reduce");
>
>
> This works fine for secs = 300.
> However once I increase the time window to say 1 hour or 3600 the state
> size increases as now it has a lot more records to reduce.
>
> Hence I need to allocate much more memory to the task manager.
>
> However there is no upper limit to this memory allocated. If the volume of
> data increases by say 10 fold I would have no option but to again increase
> the memory.
>
> Is there a better way to perform long window aggregation so overall this
> step has a small memory footprint.
>
> Thanks
> Sachin
>
>


Re: Get access to unmatching events in Apache Flink Cep

2024-05-17 Thread Biao Geng
Hi Anton,

I am afraid that currently there is no such API to access the middle NFA
state in your case. For patterns that contain 'within()' condition, the
timeout events could be retrieved via TimedOutPartialMatchHandler
interface, but other unmatching events would be pruned immediately once
they are considered as unnecessary to keep.

Best,
Biao Geng


Anton Sidorov  于2024年5月16日周四 16:12写道:

> Hello!
>
> I have a Flink Job with CEP pattern.
>
> Pattern example:
>
> // Strict Contiguity
> // a b+ c d e
> Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
> .next("b").where(...).oneOrMore()
> .next("c").where(...)
> .next("d").where(...)
> .next("e").where(...);
>
> I have events with wrong order stream on input:
>
> a b d c e
>
> On output I haven`t any matching. But I want have access to events, that
> not matching.
>
> Can I have access to middle NFA state in CEP pattern, or get some other
> way to view unmatching events?
>
> Example project with CEP pattern on github
> , and my question
> on SO
> 
>
> Thanks in advance
>


Re: problem with the heartbeat interval feature

2024-05-16 Thread Hongshun Wang
Hi Thomas,

In debezium dos says: For the connector to detect and process events from a
heartbeat table, you must add the table to the PostgreSQL publication
specified by the publication.name

property.
If this publication predates your Debezium deployment, the connector uses
the publications as defined. If the publication is not already configured
to automatically replicate changes FOR ALL TABLES in the database, you must
explicitly add the heartbeat table to the publication[2].

Thus, if you want use heart beat in cdc:

   1. add a heartbeat table to publication: ALTER PUBLICATION
   ** ADD TABLE **;
   2. set heartbeatInterval
   3. add debezium.heartbeat.action.query
   

[3]

However, when I use it it CDC, some exception occurs:

Caused by: java.lang.NullPointerException
at 
io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94)




It seems CDC don't add  a HeartbeatConnectionProvider  when configure
PostgresEventDispatcher:

//org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
=
new PostgresEventDispatcher<>(
dbzConfig,
topicSelector,
schema,
queue,
dbzConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);


In debezium, when PostgresConnectorTask start, it will  do it

//io.debezium.connector.postgresql.PostgresConnectorTask#start  final
PostgresEventDispatcher dispatcher = new
PostgresEventDispatcher<>(
connectorConfig,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
PostgresChangeRecordEmitter::updateSchema,
metadataProvider,
connectorConfig.createHeartbeat(
topicNamingStrategy,
schemaNameAdjuster,
() -> new
PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
// Postgres error
admin_shutdown, see
https://www.postgresql.org/docs/12/errcodes-appendix.html
  throw new DebeziumException("Could not
execute heartbeat action query (Error: " + sqlErrorId + ")",
exception);
case "57P03":
// Postgres error
cannot_connect_now, see
https://www.postgresql.org/docs/12/errcodes-appendix.html
  throw new RetriableException("Could not
execute heartbeat action query (Error: " + sqlErrorId + ")",
exception);
default:
break;
}
}),
schemaNameAdjuster,
signalProcessor);

Thus, I have create a new jira[4] to fix it.



 [1]
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/

[2]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms

[3]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query

[4] https://issues.apache.org/jira/browse/FLINK-35387


Best

Hongshun

On Thu, May 16, 2024 at 9:03 PM Thomas Peyric 
wrote:

> Hi Flink Community !
>
> I am using :
> * Flink
> * Flink CDC posgtres Connector
> * scala + sbt
>
> versions are :
>   * orgApacheKafkaVersion = "3.2.3"
>   * flinkVersion = "1.19.0"
>   * flinkKafkaVersion = "3.0.2-1.18"
>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>   * debeziumVersion = "1.9.8.Final"
>   * scalaVersion = "2.12.13"
>   * javaVersion = "11"
>
>
> the problem
> ---
>
> I have a problem with the heartbeat interval feature:
> * when I am querying PG with `select * from pg_replication_slots;` for
> checking if information are updated on each replication slots at defined
> interval
> * then confirmed_flush_lsn values are never updated
> PS: i have other 

Re: SSL Kafka PyFlink

2024-05-16 Thread gongzhongqiang
Hi Phil,

The kafka configuration keys of ssl maybe not correct. You can refer the
kafka document[1] to get the ssl configurations of client.


[1] https://kafka.apache.org/documentation/#security_configclients


Best,
Zhongqiang Gong

Phil Stavridis  于2024年5月17日周五 01:44写道:

> Hi,
>
> I have a PyFlink job that needs to read from a Kafka topic and the
> communication with the Kafka broker requires SSL.
> I have connected to the Kafka cluster with something like this using just
> Python.
>
> from confluent_kafka import Consumer, KafkaException, KafkaError
>
>
>
> def get_config(bootstrap_servers, ca_file, cert_file, key_file):
> config = {
> 'bootstrap.servers': bootstrap_servers,
> 'security.protocol': 'SSL',
> 'ssl.ca.location': ca_file,
> 'ssl.certificate.location': cert_file,
> 'ssl.key.location': key_file,
> 'ssl.endpoint.identification.algorithm': 'none',
> 'enable.ssl.certificate.verification': 'false',
> 'group.id': ‘my_group_id'
> }
>
>
> return config
>
>
>
> And have read messages from the Kafka topic.
>
> I am trying to set up something similar with Flink SQL:
>
> t_env.execute_sql(f"""
> CREATE TABLE logs (
> `user` ROW(`user_id` BIGINT),
> `timestamp` ROW(`secs` BIGINT)
> ) WITH (
> 'connector' = '{CONNECTOR_TYPE}',
> 'topic' = ‘{KAFKA_TOPIC}',
> 'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
> 'properties.group.id' = '{CONSUMER_GROUP}',
> 'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
> 'format' = '{MESSAGE_FORMAT}',
> 'properties.security.protocol' = 'SSL',
> 'properties.ssl.ca.location' = '{ca_file}',
> 'properties.ssl.certificate.location' = '{cert_file}',
> 'properties.ssl.key.location' = '{key_file}',
> 'properties.ssl.endpoint.identification.algorithm' = ''
> )
> """)
>
>
> But when this runs I am getting this error:
>
> Caused by: org.apache.flink.util.FlinkException: Global failure triggered
> by OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]'
> (operator cbc357ccb763df2852fee8c4fc7d55f2).
> ...
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list
> subscribed topic partitions due to
> at
> ...
> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
> [logs].
> at
> ...
> ... 3 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
> SSL handshake failed
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at
> org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
> ... 10 more
> Caused by:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
> SSL handshake failed
> Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> ...
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
> at
> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
> at sun.security.validator.Validator.validate(Validator.java:271)
> at
> sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
> at
> 

Re: [EXTERNAL]Re: Flink kafka connector for v 1.19.0

2024-05-16 Thread Hang Ruan
Hi, Niklas.

The kafka connector version 3.2.0[1] is for Flink 1.19 and it has a vote
thread[2] already. But there is not enough votes,

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-35138
[2] https://lists.apache.org/thread/7shs2wzb0jkfdyst3mh6d9pn3z1bo93c

Niklas Wilcke  于2024年5月16日周四 22:04写道:

> Hi Ahmed,
>
> are you aware of a blocker? I'm also a bit confused that after Flink 1.19
> being available for a month now the connectors still aren't. It would be
> great to get some insights or maybe a reference to an issue. From looking
> at the Github repos and the Jira I wasn't able to spot something obvious
> telling me that this matter is really in the focus. Thank you!
>
> Regards,
> Niklas
>
>
> On 10. May 2024, at 20:10, Ahmed Hamdy  wrote:
>
> Hi Aniket
>
> The community is currently working on releasing a new version for all the
> connectors that is compatible with 1.19. Please follow the announcements in
> Flink website[1] to get notified when it is available.
>
> 1-https://flink.apache.org/posts/
> Best Regards
> Ahmed Hamdy
>
>
> On Fri, 10 May 2024 at 18:14, Aniket Sule 
> wrote:
>
>> Hello,
>>
>> On the Flink downloads page, the latest stable version is Flink 1.19.0.
>> However, the Flink Kafka connector is v 3.1.0, that is compatible with
>> 1.18.x.
>>
>> Is there a timeline when the Kafka connector for v 1.19 will be released?
>> Is it possible to use the v3.1.0 connector with Flink v 1.19?
>>
>>
>>
>> Thanks and regards,
>>
>> Aniket Sule
>> Caution: External email. Do not click or open attachments unless you know
>> and trust the sender.
>>
>
>


Re: monitoring message latency for flink sql app

2024-05-16 Thread Hang Ruan
Hi, mete.

As Feng Jin said, I think you could make use of the metric `
currentEmitEventTimeLag`.
Besides that, if you develop your job with the DataStream API, you could
add a new operator to handle it by yourself.

Best,
Hang

Feng Jin  于2024年5月17日周五 02:44写道:

> Hi Mete
>
> You can refer to the metrics provided by the Kafka source connector.
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka//#monitoring
>
> Best,
> Feng
>
> On Thu, May 16, 2024 at 7:55 PM mete  wrote:
>
>> Hello,
>>
>> For an sql application using kafka as source (and kafka as sink) what
>> would be the recommended way to monitor for processing delay? For example,
>> i want to be able to alert if the app has a certain delay compared to some
>> event time field in the message.
>>
>> Best,
>> Mete
>>
>>
>>


Re: Flink 1.18.1 ,重启状态恢复

2024-05-16 Thread Yanfei Lei
看起来和 FLINK-34063 / FLINK-33863 是同样的问题,您可以升级到1.18.2 试试看。
[1] https://issues.apache.org/jira/browse/FLINK-33863
[2] https://issues.apache.org/jira/browse/FLINK-34063

陈叶超  于2024年5月16日周四 16:38写道:
>
> 升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
> 2024-04-09 13:03:48
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for 
> RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
> the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore operator state backend
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.IOException: invalid stream header
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
> at 
> org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
> at 
> org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
> at 
> org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
> at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
> ... 18 more
>


-- 
Best,
Yanfei


What is the best way to aggregate data over a long window

2024-05-16 Thread Sachin Mittal
Hi,
My pipeline step is something like this:

SingleOutputStreamOperator reducedData =
data
.keyBy(new KeySelector())
.window(
TumblingEventTimeWindows.of(Time.seconds(secs)))
.reduce(new DataReducer())
.name("reduce");


This works fine for secs = 300.
However once I increase the time window to say 1 hour or 3600 the state
size increases as now it has a lot more records to reduce.

Hence I need to allocate much more memory to the task manager.

However there is no upper limit to this memory allocated. If the volume of
data increases by say 10 fold I would have no option but to again increase
the memory.

Is there a better way to perform long window aggregation so overall this
step has a small memory footprint.

Thanks
Sachin


Re: monitoring message latency for flink sql app

2024-05-16 Thread Feng Jin
Hi Mete

You can refer to the metrics provided by the Kafka source connector.

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka//#monitoring

Best,
Feng

On Thu, May 16, 2024 at 7:55 PM mete  wrote:

> Hello,
>
> For an sql application using kafka as source (and kafka as sink) what
> would be the recommended way to monitor for processing delay? For example,
> i want to be able to alert if the app has a certain delay compared to some
> event time field in the message.
>
> Best,
> Mete
>
>
>


RE: monitoring message latency for flink sql app

2024-05-16 Thread Anton Sidorov
Hello mete.

I found this SO article
https://stackoverflow.com/questions/54293808/measuring-event-time-latency-with-flink-cep

If I'm not mistake, you can use Flink metrics system for operators and get
time of processing event in operator.

On 2024/05/16 11:54:44 mete wrote:

> Hello,

>

> For an sql application using kafka as source (and kafka as sink) what
would

> be the recommended way to monitor for processing delay? For example, i
want

> to be able to alert if the app has a certain delay compared to some event

> time field in the message.

>

> Best,

> Mete

>


problem with the heartbeat interval feature

2024-05-16 Thread Thomas Peyric
Hi Flink Community !

I am using :
* Flink
* Flink CDC posgtres Connector
* scala + sbt

versions are :
  * orgApacheKafkaVersion = "3.2.3"
  * flinkVersion = "1.19.0"
  * flinkKafkaVersion = "3.0.2-1.18"
  * flinkConnectorPostgresCdcVersion = "3.0.1"
  * debeziumVersion = "1.9.8.Final"
  * scalaVersion = "2.12.13"
  * javaVersion = "11"


the problem
---

I have a problem with the heartbeat interval feature:
* when I am querying PG with `select * from pg_replication_slots;` for
checking if information are updated on each replication slots at defined
interval
* then confirmed_flush_lsn values are never updated
PS: i have other replication slots managed directly with debezium (without
flink) and their confirmed_flush_lsn values are updated correctly (same pg
DB) depending of their own interval

```
 slot_name  |  plugin  | slot_type |  datoid   |
database  | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn  | confirmed_flush_lsn
+--+---+---+---+---+++--+--+--+-
 slot_table1| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10870 |  |   1630392036 |
712/697C0DB8 | 712/697C0DF0
 slot_table2| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10894 |  |   1630392033 |
712/697AD0A8 | 712/697AD0E0
 slot_table3| pgoutput | logical   | 811518778 | the_db
   | f | t  |  10978 |  |   1630392034 |
712/697AD0A8 | 712/697AD0A8
```



My setup


I have configured 3 distinct DataStreamSource on 3 pg database tables using
this common method :

```
private def initEntityDataSource(conf: Config, env:
StreamExecutionEnvironment, entityName: String, columnList: String) = {

val dbzProps: Properties = new Properties()
dbzProps.setProperty("column.include.list", columnList)
// "public.tableX.column1,public.tableX.column2"

val postgresIncrementalSource:
PostgresSourceBuilder.PostgresIncrementalSource[String] =
PostgresSourceBuilder.PostgresIncrementalSource.builder()
  .hostname(conf.getString("pg.hostname"))
  .port(conf.getInt("pg.port"))
  .database(conf.getString("pg.database"))
  .username(conf.getString("pg.username"))
  .password(conf.getString("pg.password"))
  .slotName(conf.getString(s"flink.${entityName}.slot_name"))
  // slot_tableX
  .decodingPluginName("pgoutput")
  .includeSchemaChanges(true)
  .deserializer(new JsonDebeziumDeserializationSchema())
  .closeIdleReaders(true)
  .heartbeatInterval(Duration.ofMillis(1))  //
<--// 10 seconds
  .connectTimeout(Duration.ofSeconds(10))
  // 10 Seconds
  .startupOptions(StartupOptions.initial())
  .schemaList("public")
  .tableList("public." +
conf.getString(s"flink.${entityName}.table_name"))   // public.tableX
  .debeziumProperties(dbzProps) //
<--// dbzProps
  .build()

env.fromSource(postgresIncrementalSource,
WatermarkStrategy.noWatermarks[String](), s"pg-projector-${entityName}")
  .setParallelism(1)

  }
```

After that I have converted each DataStreamSource into Table
And I join those 3 Table and convert result into a DataStream[Row]

On this new DataStream I do a keyBy for processing a custom
KeyedProcessFunction function

All of this is working fine and do its job

But heartbeat seems to not refresh values into
pg_replication_slots.confirmed_flush_lsn column


PS: I also try this :

1) instead of using the .heartbeatInterval() method to set the value of
interval ... i use debezium properties like this

```
dbzProps.setProperty("heartbeat.interval.ms", "1")// and also
"PT10S"
```

it seems there is no effect with this

2) it seems that debezium needs to create a kafka topic for managing
heartbeat. In theory, If the topic does not exist it will be automaitcally
created
But my kafka server does not authorize this auto creation ... so i create
this topic mannually with this name :
`__flink-heartbeat.postgres_cdc_source`

i also add this dbzProps for setting the good topic prefix

```
dbzProps.setProperty("topic.heartbeat.prefix", "__flink-heartbeat")
```

it seems there is no effect with this too




So ... Do you have any ideas ?

Thanks,

Thomas

-- 




You received this electronic message as part of a business or 
employment relationship with one or several Ask Locala entities. Its 
content is strictly confidential and is covered by the obligation of 
confidentiality and business secrecy. Any dissemination, copying, printing 
distribution, retention or use of the message’s content or any attachments 
that could be detrimental to Ask Locala is forbidden, even if it was 
forwarded by mailing lists. 
If you are not the intended recipient, please 
notify the sender of 

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread John Smith
Hi. No I have not changed the protocol.

On Thu, May 16, 2024, 3:20 AM Biao Geng  wrote:

> Hi John,
>
> Just want to check, have you ever changed the kafka protocol in your job
> after using the new cluster? The error message shows that it is caused by
> the kafka client and there is a similar error in this issue
> 
> .
>
> Best,
> Biao Geng
>
>
> John Smith  于2024年5月16日周四 09:01写道:
>
>> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
>> difference using Java 11 and it seems after a week of usage the below
>> exception happens.
>>
>> The task manager is...
>>
>> 32GB total
>>
>> And i have the ONLY following memory settings
>>
>> taskmanager.memory.flink.size: 16384m
>> taskmanager.memory.jvm-metaspace.size: 3072m
>>
>>
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ... 1 more
>>
>


Re: [EXTERNAL]Re: Flink kafka connector for v 1.19.0

2024-05-16 Thread Niklas Wilcke
Hi Ahmed,

are you aware of a blocker? I'm also a bit confused that after Flink 1.19 being 
available for a month now the connectors still aren't. It would be great to get 
some insights or maybe a reference to an issue. From looking at the Github 
repos and the Jira I wasn't able to spot something obvious telling me that this 
matter is really in the focus. Thank you!

Regards,
Niklas


> On 10. May 2024, at 20:10, Ahmed Hamdy  wrote:
> 
> Hi Aniket
> 
> The community is currently working on releasing a new version for all the 
> connectors that is compatible with 1.19. Please follow the announcements in 
> Flink website[1] to get notified when it is available.
> 
> 1-https://flink.apache.org/posts/
> Best Regards
> Ahmed Hamdy
> 
> 
> On Fri, 10 May 2024 at 18:14, Aniket Sule  > wrote:
>> Hello,
>> 
>> On the Flink downloads page, the latest stable version is Flink 1.19.0. 
>> However, the Flink Kafka connector is v 3.1.0, that is compatible with 
>> 1.18.x.
>> 
>> Is there a timeline when the Kafka connector for v 1.19 will be released? Is 
>> it possible to use the v3.1.0 connector with Flink v 1.19?
>> 
>>  
>> 
>> Thanks and regards,
>> 
>> Aniket Sule
>> 
>> Caution: External email. Do not click or open attachments unless you know 
>> and trust the sender. 



smime.p7s
Description: S/MIME cryptographic signature


monitoring message latency for flink sql app

2024-05-16 Thread mete
Hello,

For an sql application using kafka as source (and kafka as sink) what would
be the recommended way to monitor for processing delay? For example, i want
to be able to alert if the app has a certain delay compared to some event
time field in the message.

Best,
Mete


SSL Kafka PyFlink

2024-05-16 Thread Phil Stavridis
Hi,

I have a PyFlink job that needs to read from a Kafka topic and the 
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just 
Python.

from confluent_kafka import Consumer, KafkaException, KafkaError



def get_config(bootstrap_servers, ca_file, cert_file, key_file):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SSL',
'ssl.ca.location': ca_file,
'ssl.certificate.location': cert_file,
'ssl.key.location': key_file,
'ssl.endpoint.identification.algorithm': 'none',
'enable.ssl.certificate.verification': 'false',
'group.id': ‘my_group_id'
}


return config



And have read messages from the Kafka topic.

I am trying to set up something similar with Flink SQL:

t_env.execute_sql(f"""
CREATE TABLE logs (
`user` ROW(`user_id` BIGINT),
`timestamp` ROW(`secs` BIGINT)
) WITH (
'connector' = '{CONNECTOR_TYPE}',
'topic' = ‘{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id' = '{CONSUMER_GROUP}',
'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
'format' = '{MESSAGE_FORMAT}',
'properties.security.protocol' = 'SSL',
'properties.ssl.ca.location' = '{ca_file}',
'properties.ssl.certificate.location' = '{cert_file}',
'properties.ssl.key.location' = '{key_file}',
'properties.ssl.endpoint.identification.algorithm' = ''
)
""")


But when this runs I am getting this error:

Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator 
cbc357ccb763df2852fee8c4fc7d55f2).
...
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list 
subscribed topic partitions due to
at 
...
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs].
at 
...
... 3 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target
...
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
at java.lang.Thread.run(Thread.java:750)
Caused by: sun.security.validator.ValidatorException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at 
sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable 
to find valid certification path to requested target
at 
sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
at 

Get access to unmatching events in Apache Flink Cep

2024-05-16 Thread Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Flink 1.18.1 ,重启状态恢复

2024-05-16 Thread 陈叶超
升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
2024-04-09 13:03:48
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: invalid stream header
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
at 
org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
at 
org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
at 
org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
... 18 more



Get access to unmatching events in Apache Flink Cep

2024-05-16 Thread Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread Biao Geng
Hi John,

Just want to check, have you ever changed the kafka protocol in your job
after using the new cluster? The error message shows that it is caused by
the kafka client and there is a similar error in this issue

.

Best,
Biao Geng


John Smith  于2024年5月16日周四 09:01写道:

> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
> difference using Java 11 and it seems after a week of usage the below
> exception happens.
>
> The task manager is...
>
> 32GB total
>
> And i have the ONLY following memory settings
>
> taskmanager.memory.flink.size: 16384m
> taskmanager.memory.jvm-metaspace.size: 3072m
>
>
>
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...
> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
>


Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-15 Thread John Smith
I deployed a new cluster, same version as my old cluster(1.14.4 ), only
difference using Java 11 and it seems after a week of usage the below
exception happens.

The task manager is...

32GB total

And i have the ONLY following memory settings

taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m




Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things: either job(s)
require(s) a larger size of JVM direct memory or there is a direct memory
leak. The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased. Flink framework and its
dependencies also consume the direct memory, mostly for network
communication. The most of network memory is managed by Flink and should
not result in out-of-memory error. In certain special cases, in particular
for jobs with high parallelism, the framework may require more direct
memory which is not managed by Flink. In this case
'taskmanager.memory.framework.off-heap.size' configuration option should be
increased. If the error persists then there is probably a direct memory
leak in user code or some of its dependencies which has to be investigated
and fixed. The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more


Confirmation on Lambda Support for UDFs in FlinkSQL / Table API

2024-05-15 Thread Tucker Harvey via user
Hi Flink Community,

I’m writing to confirm whether lambda expressions are supported with User 
Defined Functions (UDFs) in FlinkSQL and the Table API. My current 
understanding is that they are not supported.
Can anyone verify this, or let me know if there have been any recent changes 
regarding this?
Thanks for your help.

Best regards,
Tucker

Re:Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 Thread Xuyang
Hi, 

> 现在可以用中文了?

我看你发的是中文答疑邮箱




> 就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了

你的意思是,之前报错类似"找不到一个jdbc 
connector",然后直接在gateway的jar包里的META-INF/services内的Factory文件(SPI文件)内加入jdbc 
connector的Factory实现类就好了吗?




如果是这个问题就有点奇怪,因为本身flink-connector-jdbc的spi文件就已经将相关的类写进去了[1],按理说放到lib目录下,就会spi发现的




[1] 
https://github.com/apache/flink-connector-jdbc/blob/bde28e6a92ffa75ae45bc8df6be55d299ff995a2/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory#L16




--

Best!
Xuyang





在 2024-05-15 15:51:49,abc15...@163.com 写道:
>现在可以用中文了?就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了
>
>
>> 在 2024年5月15日,15:36,Xuyang  写道:
>> 
>> Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。
>> 
>> 
>> 
>> 
>> 如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!
>> 
>> 
>> 
>> 
>> [1] https://issues.apache.org/jira/projects/FLINK/summary
>> 
>> 
>> 
>> 
>> --
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>>> 在 2024-05-10 12:26:22,abc15...@163.com 写道:
>>> I've solved it. You need to register the number of connections in the jar 
>>> of gateway. But this is inconvenient, and I still hope to improve it.
>>> 发自我的 iPhone
>>> 
> 在 2024年5月10日,11:56,Xuyang  写道:
 
 Hi, can you print the classloader and verify if the jdbc connector exists 
 in it?
 
 
 
 
 --
 
   Best!
   Xuyang
 
 
 
 
 
 At 2024-05-09 17:48:33, "McClone"  wrote:
> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
> not  find jdbc connector,but use sql-client is normal.


Re:Unable to log any data captured from kafka

2024-05-15 Thread Xuyang
Hi, Nida.




I'd like to confirm whether there would be any log output if it's executed 
directly in the IDE. 




If there are logs in the IDE but not when running by submission, you could 
check if the log configuration files in the TM logs are normal. 

If there are no logs in the IDE either, I've encountered this before, and it 
was an issue with jar package conflicts.




--

Best!
Xuyang




At 2024-05-15 15:20:45, "Fidea Lidea"  wrote:

Hi Team,

I've written a flink job & enabled slf4j logging mechanism for it.
Flow ofFlink Job :  Kafka source => process datastream 
elements(Transformations) => kafka sink.

It stops logging while processing datastream. I want to log all data captured 
from kafka either in a log file or on the stdout console.
I've tried a few approaches but am unable to log data.  

Approaches tried : 

1. dataStream.print() or dataStream.print().toString()

2.Iterator myOutput = DataStreamUtils.collect(dataStream);
while (myOutput.hasNext()) {
log.info("myOutput" + myOutput.next());
LOG.debug("LOG" + myOutput.next());
}
3.IterativeStream iteration = dataStream.iterate();

DataStream ge=iteration.map(new MapFunction() {
@Override
public DTO map(DTO obj) throws Exception {
log.info("id  "+obj.getID());
return obj;
}


});
4. Store data in a static map & log it before kafka sink etc.

Thanks
Nida





Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 Thread abc15606
现在可以用中文了?就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了


> 在 2024年5月15日,15:36,Xuyang  写道:
> 
> Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。
> 
> 
> 
> 
> 如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!
> 
> 
> 
> 
> [1] https://issues.apache.org/jira/projects/FLINK/summary
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
>> 在 2024-05-10 12:26:22,abc15...@163.com 写道:
>> I've solved it. You need to register the number of connections in the jar of 
>> gateway. But this is inconvenient, and I still hope to improve it.
>> 发自我的 iPhone
>> 
 在 2024年5月10日,11:56,Xuyang  写道:
>>> 
>>> Hi, can you print the classloader and verify if the jdbc connector exists 
>>> in it?
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>>   Best!
>>>   Xuyang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> At 2024-05-09 17:48:33, "McClone"  wrote:
 I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
 not  find jdbc connector,but use sql-client is normal.



Re:请问如何贡献Flink Hologres连接器?

2024-05-15 Thread Xuyang
Hi, 

我觉得如果只是从贡献的角度来说,支持flink hologres 
connector是没问题的,hologres目前作为比较热门的数据库,肯定是有很多需求的,并且现在aliyun 
github官方也基于此提供了开源的flink hologres connector[1]。





但是涉及到aliyun等公司商业化的ververica-connector-hologres包,如果想直接开源的话,在我的角度最好事先确认下面几点,不然可能会隐含一些法律风险

  1. jar包的提供方(aliyun等公司)是否知情、且愿意开源,不然直接拿着商业化的东西给出来有点不太好

2. jar包内的协议是否满足开源的协议,而不是商业化的协议




我推荐如果真要开源,可以基于开源github仓库的flink hologres connector[1]来贡献(比如现在我看目前它最高支持flink 
1.17,可以试试贡献支持到1.18、1.19等等)




[1] https://github.com/aliyun/alibabacloud-hologres-connectors




--

Best!
Xuyang





在 2024-05-14 11:24:37,"casel.chen"  写道:
>我们有使用阿里云商业版Hologres数据库,同时我们有自研的Flink实时计算平台,为了实现在Hologres上实时建仓,我们基于开源Apache 
>Flink 1.17.1结合阿里云maven仓库的ververica-connector-hologres包[1]和开源的holo 
>client[2]开发了hologres 
>connector,修复了一些jar依赖问题。目前我们已经在生产环境使用了一段时间,暂时没有发现问题,现在想将它贡献给社区。
>
>
>请问:
>1. 贡献Flink Hologres连接器是否合规?
>2. 如果合规的话,PR应该提到哪个项目代码仓库?
>3. 还是说要像 https://flink-packages.org/categories/connectors 
>这样链接到自己的github仓库?如果是的话要怎么在flink-packages.org上面注册呢?
>
>
>[1] 
>https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-hologres/1.17-vvr-8.0.4-1/
>[2] 
>https://github.com/aliyun/alibabacloud-hologres-connectors/tree/master/holo-client


  1   2   3   4   5   6   7   8   9   10   >