Re: [Flink Kubernetes Operator] The "last-state" upgrade mode is only supported in FlinkDeployments

2024-04-29 Thread Gyula Fóra
Hi Alan!

I think it should be possible to address this gap for most cases. We don't
have the same robust way of getting the last-state information for session
jobs as we do for applications, so it will be slightly less reliable
overall.
For session jobs the last checkpoint info has to be queried from the JM
rest api, so as long that is available it should work fine.

I am not aware of anyone working on this at the moment, it would be great
if you could open a JIRA ticket to track this. If you are interested in
working on this, we can also support you but this is a fairly complex
feature that involves many layers of operator logic.

Cheers,
Gyula

On Tue, Apr 30, 2024 at 1:08 AM Alan Zhang  wrote:

> Hi,
>
> We wanted to use the Apache Flink Kubernetes operator to manage the
> lifecycle of our Flink jobs in Flink session clusters. And we wanted to
> have the "last-state" upgrade feature for our use cases.
>
> However, the latest official doc states the "last-state" upgrade mode is
> not supported in the session mode(aka. FlinkSessionJob) currently:
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>
> Last state upgrade mode is currently only supported for FlinkDeployments.
>
> Why didn't we support this upgrade mode in session mode? Do we have a plan
> to address this gap? Any suggestions for us if we want to stick with
> session mode?
>
> --
> Thanks,
> Alan
>


Flink sql retract to append

2024-04-29 Thread 焦童
Hello ,
 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by 
形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream 
中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位

[Flink Kubernetes Operator] The "last-state" upgrade mode is only supported in FlinkDeployments

2024-04-29 Thread Alan Zhang
Hi,

We wanted to use the Apache Flink Kubernetes operator to manage the
lifecycle of our Flink jobs in Flink session clusters. And we wanted to
have the "last-state" upgrade feature for our use cases.

However, the latest official doc states the "last-state" upgrade mode is
not supported in the session mode(aka. FlinkSessionJob) currently:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades

Last state upgrade mode is currently only supported for FlinkDeployments.

Why didn't we support this upgrade mode in session mode? Do we have a plan
to address this gap? Any suggestions for us if we want to stick with
session mode?

-- 
Thanks,
Alan


Re: Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Michael Marino
Hi Ahmed, hi Hong,

Thanks for your responses.

It sounds like the most promising would be to initially focus on the Global
Window with the custom trigger.

We don't need to be compatible with the aggregation used by the KPL
(actually we would likely combine records in protobuf, and my impression is
KPL is rather only supporting combining records like line-delimited json).
Regarding introducing a stateful operator, this is perhaps simply something
we have to accept, though it would be great if we could guarantee a flush
on snapshot generation.

Cheers,
Mike

On Mon, Apr 29, 2024 at 1:12 PM Hong Liang  wrote:

> Hi Michael, thanks for the question!
>
> Maybe you can consider using a global window with custom trigger
> (CountTrigger + ProcessingTimeTrigger/EventTimeTrigger)? CountTrigger
> should allow you to configure window closure after X elements.
> ProcessingTimeTrigger/EventTimeTrigger should allow you to flush the window
> even if the count is not hit after a specified amount of time.
>
> A possible issue with the above implementation is that some elements might
> be stored in snapshot state. I'm not sure of an easy way to "flush" items
> out of a custom window on snapshot action.
>
> Hope the above helps.
>
> Regards,
> Hong
>
> On Mon, Apr 29, 2024 at 11:15 AM Michael Marino 
> wrote:
>
>> Hi all,
>>
>> We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing
>> to Kinesis streams in several of our applications using the Table API.
>>
>> In our use case, we would like to be able to aggregate multiple records
>> (rows) together and emit them in a single Kinesis record.
>>
>> As far as I understand, with the usage of the Async Writer it is not
>> possible to aggregate records (Table rows) together into a single record as
>> was possible previously with the Kinesis Producer Library.
>>
>> I wanted to ask if anyone here has any suggestions of how to do this, or
>> perhaps if I missed it somewhere in the documentation? I was thinking about
>> moving the logic to use window functions (either in the Table or DataStream
>> API), but here I think I'd need to "close" the window based not only on
>> time, but also on record number. Anyways, any thoughts are appreciated!
>>
>> Thanks,
>> Mike
>>
>> --
>>
>> Michael Marino
>>
>> Principal Data Science & Analytics
>>
>> Phone:  +49 89 7167786 - 14
>>
>> linkedin.com/company/tadogmbh 
>>  | facebook.com/tado  | twitter.com/tado
>>  | youtube.com/tado
>> 
>>
>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>
>>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
>> Schwarz | Josef Wenzl
>>
>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
>> 280012558
>>
>

-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh  |
facebook.com/tado  | twitter.com/tado
 | youtube.com/tado


www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
Schwarz | Josef Wenzl

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558


Re: Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Ahmed Hamdy
Hi Michael,
Unfortunately the new `KinesisDataStreamsSink` doesn't support aggregation
yet.
My suggestion if you want to use native kinesis aggregation is to use the
latest connector version that supports KPL as sink for Table API, that
would be 1.14.x. you could package the connector of that version.

 > was thinking about moving the logic to use window functions (either in
the Table or DataStream API), but here I think I'd need to "close" the
window based not only on time, but also on record number

Regarding this approach, I believe a better way might be implementing a
custom process function to hold batches of records in state and emit an
aggregated record but this might not be consistent with KPL aggregation of
course and de-aggregated records could be not retrieved so I would advise
not to take this approach.


Best Regards
Ahmed Hamdy


On Mon, 29 Apr 2024 at 11:14, Michael Marino 
wrote:

> Hi all,
>
> We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing to
> Kinesis streams in several of our applications using the Table API.
>
> In our use case, we would like to be able to aggregate multiple records
> (rows) together and emit them in a single Kinesis record.
>
> As far as I understand, with the usage of the Async Writer it is not
> possible to aggregate records (Table rows) together into a single record as
> was possible previously with the Kinesis Producer Library.
>
> I wanted to ask if anyone here has any suggestions of how to do this, or
> perhaps if I missed it somewhere in the documentation? I was thinking about
> moving the logic to use window functions (either in the Table or DataStream
> API), but here I think I'd need to "close" the window based not only on
> time, but also on record number. Anyways, any thoughts are appreciated!
>
> Thanks,
> Mike
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh 
> | facebook.com/tado  | twitter.com/tado
>  | youtube.com/tado
> 
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
> Schwarz | Josef Wenzl
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>


Re: Flink SQL checkpoint failed when running on yarn

2024-04-29 Thread Biao Geng
Hi there,
Would you mind sharing the whole JM/TM log? It looks like the error log in
the previous email is not the root cause.

Best,
Biao Geng


ou...@139.com  于2024年4月29日周一 16:07写道:

> Hi all:
>When I ran flink sql datagen source and wrote to jdbc, checkpoint kept
> failing with the following error log.
>
> 2024-04-29 15:46:25,270 ERROR
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException:
> Unable to get JobMasterGateway for initializing job. The requested
> operation is not available while the JobManager is initializing.
>  at
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1465)
> ~[flink-dist-1.18.1.jar:1.18.1]
>  at
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1475)
> ~[flink-dist-1.18.1.jar:1.18.1]
>  at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestCheckpointStats(Dispatcher.java:927)
> ~[flink-dist-1.18.1.jar:1.18.1]
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_152]
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_152]
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_152]
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
>  at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
> ~[?:?]
>  at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[flink-dist-1.18.1.jar:1.18.1]
>  at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
> ~[?:?]
>  at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
> ~[?:?]
>  at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
> ~[?:?]
>  at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> ~[?:?]
>  at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> ~[?:?]
>  at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> ~[?:?]
>  at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> ~[flink-scala_2.12-1.18.1.jar:1.18.1]
>  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> ~[flink-scala_2.12-1.18.1.jar:1.18.1]
>  at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> ~[?:?]
>  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> ~[flink-scala_2.12-1.18.1.jar:1.18.1]
>  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> ~[flink-scala_2.12-1.
>
>
>


Re: Flink SQL Client does not start job with savepoint

2024-04-29 Thread Lee, Keith
Thanks Biao Geng for your response. Indeed, 1.19 documentation uses 
execution.savepoint.path, restoration works with said configuration name.

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sqlclient/#execute-sql-files

Regards
Keith


From: Biao Geng 
Date: Friday, 26 April 2024 at 11:37
To: "Lee, Keith" 
Cc: "user@flink.apache.org" 
Subject: RE: [EXTERNAL] Flink SQL Client does not start job with savepoint


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Lee,

A quick question: what version of flink are you using for testing 
execution.state-recovery.path?
It looks like this config is only supported in flink 
1.20 which is not released 
yet.


Best,
Biao Geng


Lee, Keith mailto:lee...@amazon.co.uk>> 于2024年4月26日周五 
04:51写道:
Apologies, I have included the jobmanager log for 
6969725a69ecc967aac2ce3eedcc274a  instead of 7881d53d28751f9bbbd3581976d9fe3d, 
however they looked exactly the same.

Can include if necessary.

Thanks
Keith

From: "Lee, Keith" mailto:lee...@amazon.co.uk>>
Date: Thursday, 25 April 2024 at 21:41
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Flink SQL Client does not start job with savepoint

Hi,

Referring to 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint

I’ve followed the instruction however I do not see evidence of the job being 
started with savepoint. See SQL statements excerpt below:


Flink SQL> STOP JOB '14de8cc898d56653b96872fc0ba03c91' WITH SAVEPOINT;

+--+

|   savepoint path |

+--+

| file:/tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc |

+--+

…

Flink SQL> CREATE TABLE Orders (order_number BIGINT,price DECIMAL(32,2),buyer 
ROW,order_time TIMESTAMP(3)) WITH 
('connector' = 'datagen');

[INFO] Execute statement succeed.



Flink SQL> CREATE TABLE OrdersBlackhole (order_number BIGINT,price 
DECIMAL(32,2),buyer ROW,order_time 
TIMESTAMP(3)) WITH ('connector' = 'blackhole');

[INFO] Execute statement succeed.



Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;

[INFO] Submitting SQL update statement to the cluster...

[INFO] SQL update statement has been successfully submitted to the cluster:

Job ID: 6969725a69ecc967aac2ce3eedcc274a





Flink SQL> STOP JOB '6969725a69ecc967aac2ce3eedcc274a';

[INFO] Execute statement succeed.



Flink SQL> SET 'execution.state-recovery.path' = 
'file:///tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc';

[INFO] Execute statement succeed.



Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;

[INFO] Submitting SQL update statement to the cluster...

[INFO] SQL update statement has been successfully submitted to the cluster:

Job ID: 7881d53d28751f9bbbd3581976d9fe3d



I have attempted with and without the prefix file:// and file:/. Additionally, 
I’ve also attempted the following in config.yml
state.savepoints.dir: file:///tmp/flink-savepoints/
state.checkpoints.dir: file:///tmp/flink-checkpoints/

Am I missing something? The jobmanager log did not indicate a start from 
savepoint.

Received JobGraph submission 
'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
Submitting job 'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
JobMasterServiceLeadershipRunner for job 6969725a69ecc967aac2ce3eedcc274a was 
granted leadership with leader id ----. 
Creating new JobMasterServiceProcess.
Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
pekko://flink/user/rpc/jobmanager_4 .
Initializing job 'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
Using restart back off time strategy NoRestartBackoffTimeStrategy for 
insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Created execution graph 9905f321e9958b6c36b71e0601a85a59 for job 
6969725a69ecc967aac2ce3eedcc274a.
Running initialization on master for job 
insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Successfully ran initialization on master in 0 ms.
Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
State backend is set to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@78e93599
State backend loader loads the state backend as HashMapStateBackend
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@acb26a25
No 

Flink SQL checkpoint failed when running on yarn

2024-04-29 Thread ou...@139.com

Hi all:

   When I ran flink sql datagen source and wrote to jdbc, checkpoint kept 
failing with the following error log.



2024-04-29 15:46:25,270 ERROR 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
 [] - Unhandled exception.

org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: 
Unable to get JobMasterGateway for initializing job. The requested operation is 
not available while the JobManager is initializing.

 at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:1465)
 ~[flink-dist-1.18.1.jar:1.18.1]

 at 
org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:1475)
 ~[flink-dist-1.18.1.jar:1.18.1]

 at 
org.apache.flink.runtime.dispatcher.Dispatcher.requestCheckpointStats(Dispatcher.java:927)
 ~[flink-dist-1.18.1.jar:1.18.1]

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_152]

 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_152]

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_152]

 at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]

 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
 ~[?:?]

 at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-dist-1.18.1.jar:1.18.1]

 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
 ~[?:?]

 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
 ~[?:?]

 at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
 ~[?:?]

 at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[?:?]

 at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
~[?:?]

 at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
~[?:?]

 at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
~[flink-scala_2.12-1.18.1.jar:1.18.1]

 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
~[flink-scala_2.12-1.18.1.jar:1.18.1]

 at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
~[?:?]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
~[flink-scala_2.12-1.18.1.jar:1.18.1]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[flink-scala_2.12-1.





Suggestions for aggregating records to a Kinesis Sink, (or generic Async Sink)?

2024-04-29 Thread Michael Marino
Hi all,

We are currently using Flink 1.18.1 (AWS Managed Flink) and are writing to
Kinesis streams in several of our applications using the Table API.

In our use case, we would like to be able to aggregate multiple records
(rows) together and emit them in a single Kinesis record.

As far as I understand, with the usage of the Async Writer it is not
possible to aggregate records (Table rows) together into a single record as
was possible previously with the Kinesis Producer Library.

I wanted to ask if anyone here has any suggestions of how to do this, or
perhaps if I missed it somewhere in the documentation? I was thinking about
moving the logic to use window functions (either in the Table or DataStream
API), but here I think I'd need to "close" the window based not only on
time, but also on record number. Anyways, any thoughts are appreciated!

Thanks,
Mike

-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh  |
facebook.com/tado  | twitter.com/tado
 | youtube.com/tado


www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
Schwarz | Josef Wenzl

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558