Re: Encryption of parameters in flink-conf.yaml

2023-05-09 Thread Anuj Jain
Hi,

Thanks for the reply.

I don't think I can use IAM integration and avoid distributing keys to the
application because my Flink application is running outside AWS EC2, in
native K8s cluster nodes, from where I am distributing to S3 services
hosted on AWS.
If there is a procedure to still integrate with IAM, please point me to
some documentation.

Let me try if using environment variables passes our security checks.
I am also trying to see if hadoop credential providers can work with Flink
S3a file sink.
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Storing_secrets_with_Hadoop_Credential_Providers

Thanks!!

Regards
Anuj

On Tue, May 9, 2023 at 6:23 PM Gabor Somogyi 
wrote:

> hi Anuj,
>
> As Martijn said IAM is the preferred option but if you've no other way
> than access keys then environment variables is a better choice.
> Such case conf doesn't contain plain text keys.
>
> Just a side note, putting `s3a.access.key` into Flink conf file is not
> configuring Hadoop S3. The way how it goes is to set
> `flink.hadoop.s3a.access.key`.
> Practically all configs must be prefixed w/ `flink.hadoop.` to notify
> Flink that these must be forwarded to Hadoop.
>
> G
>
>
> On Tue, May 9, 2023 at 1:50 PM Martijn Visser 
> wrote:
>
>> Hi Anuj,
>>
>> You can't provide the values for S3 in job code, since the S3 filesystems
>> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
>> recommended method for setting up credentials is by using IAM, not via
>> Access Keys. See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> for more details.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, May 9, 2023 at 1:35 PM Anuj Jain  wrote:
>>
>>> Hi,
>>>
>>> Thanks for the reply.
>>>
>>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
>>> If i understood correctly, even with init-container the flink-conf.yaml
>>> (inside the container) would finally contain unencrypted values for access
>>> tokens. We don't want to persist such sensitive data unencrypted even
>>> inside running containers in files or config maps, due to some security
>>> constraints in my project.
>>> Can you please let me know if I missed something with the suggested
>>> solution.
>>>
>>> Problem with overriding configuration programmatically:
>>> When I removed the S3 properties from flink-conf.yaml and tried to
>>> provide it programmatically from the job code, the connection to S3 failed.
>>> I tried it with Application mode also on a standalone cluster but the
>>> behavior is the same.
>>>
>>> //My job main method (with default flink-conf.yaml):
>>> Configuration conf = new Configuration();
>>> conf.setString("s3a.access.key", );
>>> conf.setString("s3a.secret.key", );
>>> conf.setString("s3a.aws.credentials.provider",
>>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>>> conf.setString("s3a.assumed.role.arn", );
>>> conf.setString("s3a.assumed.role.session.name", );
>>> conf.setString("s3a.assumed.role.session.duration", );
>>> conf.setString("s3a.assumed.role.sts.endpoint", );
>>> conf.setString("s3a.assumed.role.sts.endpoint.region", );
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>>>
>>> // flink job program using DataStream
>>>
>>> env.execute("My job");
>>>
>>> With this i got connection exception
>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>> com.amazonaws.SdkClientException: Unable to load AWS credentials from
>>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
>>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
>>> at
>>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
>>> ~[?:?]
>>> at
>>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
>>> ~[?:?]
>>>
>>> When these values are given in flink-conf.yaml instead of job code, then
>>> connection was successful. Please guide if i am doing something incorrect
>>> w.r.t the job program.
>>>
>>> Regards
>>> Anuj
>>>
>>> On Mon, May 8, 2023 at 12:36 PM Biao Geng  wrote:
>>>
 Hi Anuj,

 To my best knowledge, flink does not provide the encryption strategy
 support for now. If you are using flink on k8s, it is possible to achieve
 the encryption of parameters using the init container. You can check this
 SO
 
  for
 more detailed instructions.
 Besides, it should be possible to override Configuration object in your
 job code. Are you using Application mode to run the job?

 Best regards,
 Biao Geng

 Anuj Jain  于2023年5月8日周一 13:55写道:

> Hi Community,
> I am trying to create an amazon S3 filesystem distributor using flink
> and for this I 

flink1.16.1 jdbc-3.1.0-1.16  There is a problem trying left join

2023-05-09 Thread yangxueyong
flink1.16.1
mysql8.0.33
jdbc-3.1.0-1.16 




I have a sql,
insert into test_flink_res2(id,name,address)
select a.id,a.name,a.address from test_flink_res1 a left join test_flink_res2 b 
on a.id=b.id where a.name='abc0.11317691217472489' and b.id is null;
Why does flinksql convert this statement into the following statement?
SELECT `address` FROM `test_flink_res1` WHERE ((`name` = 
'abc0.11317691217472489')) AND ((`id` IS NULL))
As a result, there is no data in test_flink_res2,why?

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Weihua Hu
Hi,

if for some reason there exists a checkpoint by same name.
>
Could you give more details about your scenarios here?
>From your description, I guess this problem occurred when a job restart,
does this restart is triggered personally?

In common restart processing, the job will retrieve the latest checkpoint
from a high-available service(zookeeper or kubernetes),
and then restore from it and make a new checkpoint with a new checkpoint-id.
In this case, the job does not recover from the old checkpoint, but the old
checkpoint path already exists.

Best,
Weihua


On Wed, May 10, 2023 at 11:07 AM Hang Ruan  wrote:

> Hi, amenreet,
>
> As Hangxiang said, we should use a new checkpoint dir if the new job has
> the same jobId as the old one . Or else you should not use a fixed jobId
> and the checkpoint dir will not conflict.
>
> Best,
> Hang
>
> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>
>> Hi,
>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>> as before ?
>> And you may also start the job without before state ?
>> The new job cannot know anything about before checkpoints, that's why the
>> new job will fail when it tries to generate a new checkpoint.
>> I'd like to suggest you to use different JOB_ID for different jobs, or
>> set a different checkpoint dir for a new job.
>>
>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>> wrote:
>>
>>> Hi all,
>>>
>>> Is there any way to prevent restart of flink job, or override the
>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>> name. I get the following exception and my job restarts, have been trying
>>> to find solution for a very long time but havent found anything useful yet,
>>> other than manually cleaning.
>>>
>>> 2023-02-27 10:00:50,360 WARN  
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>> [] - Failed to trigger or complete checkpoint 1 for job
>>> 6e6b1332. (0 consecutive failed attempts so far)
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>> finalize checkpoint.
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> [?:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> [?:?]
>>>
>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>
>>> Caused by: java.io.IOException: Target file
>>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>>> already exists.
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> ... 7 more
>>>
>>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>>> [] - Error while processing AcknowledgeCheckpoint
>>> message
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> finalize the pending checkpoint 1. Failure reason: Failure to finalize
>>> checkpoint.
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> 

Re: Flink Kafka Source rebalancing - 1.14.2

2023-05-09 Thread Madan D via user
 Hello Shammon/Team,
We are using same Flink version which is 1.14.2 but only change is earlier we 
were using Flink Kafka Consumer not we are moving with Kafka Source. I dont see 
any difference in Job planner but I see Kafka source is introducing more 
latency while performing Flinka Kafka consumer.

Attached job plans for both.
Regards,Madan
On Tuesday, 9 May 2023 at 05:50:49 pm GMT-7, Shammon FY  
wrote:  
 
 Hi Madan,
Could you give the old and new versions of flink and provide the job plan? I 
think it will help community to find the root cause
Best,Shammon FY
On Wed, May 10, 2023 at 2:04 AM Madan D via user  wrote:

Hello Team,
 We have been using Flink Kafka consumer and recently we have been moving to 
Flink Kafka source to get more advanced features but we have been observing 
more rebalances right after data consumed and moving to next operator than 
Flink Kafka consumer.

Can you please let us know what might be causing even though we are using same 
parallelism in old and new jobs and we are on Flink 1.14.2.

Regards,Madan 
  

newJar_JobPlan.json
Description: application/json


old_Jar_plan.json
Description: application/json


Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hang Ruan
Hi, amenreet,

As Hangxiang said, we should use a new checkpoint dir if the new job has
the same jobId as the old one . Or else you should not use a fixed jobId
and the checkpoint dir will not conflict.

Best,
Hang

Hangxiang Yu  于2023年5月10日周三 10:35写道:

> Hi,
> I guess you used a fixed JOB_ID, and configured the same checkpoint dir as
> before ?
> And you may also start the job without before state ?
> The new job cannot know anything about before checkpoints, that's why the
> new job will fail when it tries to generate a new checkpoint.
> I'd like to suggest you to use different JOB_ID for different jobs, or set
> a different checkpoint dir for a new job.
>
> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi  wrote:
>
>> Hi all,
>>
>> Is there any way to prevent restart of flink job, or override the
>> checkpoint metadata, if for some reason there exists a checkpoint by same
>> name. I get the following exception and my job restarts, have been trying
>> to find solution for a very long time but havent found anything useful yet,
>> other than manually cleaning.
>>
>> 2023-02-27 10:00:50,360 WARN  
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> [] - Failed to trigger or complete checkpoint 1 for job
>> 6e6b1332. (0 consecutive failed attempts so far)
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>> finalize checkpoint.
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>>
>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>
>> Caused by: java.io.IOException: Target file
>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>> already exists.
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> ... 7 more
>>
>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Error while processing AcknowledgeCheckpoint message
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> finalize the pending checkpoint 1. Failure reason: Failure to finalize
>> checkpoint.
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>>
>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>
>> Caused 

Re: StatsdMetricsReporter is emitting all metric types as gauges

2023-05-09 Thread Hang Ruan
Hi, Iris,

The metrics have already be calculated  in Flink. So we only need to report
these metric as the gauges.
For example, the metric `metricA` is a Flink counter and is increased from
1 to 2. The statsd gauge will be 2 now. If we register it as a statsd
counter, we will send 1 and 2 to the statsd counter. The statsd counter
will be 3, which is a wrong result.

Best,
Hang

Iris Grace Endozo  于2023年5月9日周二 19:19写道:

> Hey folks trying to troubleshoot why counter metrics are appearing as
> gauges on my end. Is it expected that the StatsdMetricsReporter is
> reporting gauges for counters as well?
>
> Looking at this one:
> https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
> the statsd specifications state that counters need to be reported as
> :|c[|@] but it seems it's defaulting to
> "%s:%s|g" in the above. Ref: https://github.com/b/statsd_spec#counters
>
> Wondering if anyone else has hit this issue or there's an existing issue?
>
> Cheers, Iris.
>
> --
>
> *Iris Grace Endozo,* Senior Software Engineer
> *M *+61 435 108 697
> *E* iris.end...@gmail.com
>


Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hangxiang Yu
Hi,
I guess you used a fixed JOB_ID, and configured the same checkpoint dir as
before ?
And you may also start the job without before state ?
The new job cannot know anything about before checkpoints, that's why the
new job will fail when it tries to generate a new checkpoint.
I'd like to suggest you to use different JOB_ID for different jobs, or set
a different checkpoint dir for a new job.

On Tue, May 9, 2023 at 9:38 PM amenreet sodhi  wrote:

> Hi all,
>
> Is there any way to prevent restart of flink job, or override the
> checkpoint metadata, if for some reason there exists a checkpoint by same
> name. I get the following exception and my job restarts, have been trying
> to find solution for a very long time but havent found anything useful yet,
> other than manually cleaning.
>
> 2023-02-27 10:00:50,360 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> [] - Failed to trigger or complete checkpoint 1 for job
> 6e6b1332. (0 consecutive failed attempts so far)
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
> finalize checkpoint.
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:834) [?:?]
>
> Caused by: java.io.IOException: Target file
> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
> already exists.
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> ... 7 more
>
> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
> [] - Error while processing AcknowledgeCheckpoint message
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 1. Failure reason: Failure to finalize
> checkpoint.
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:834) [?:?]
>
> Caused by: java.io.IOException: Target file
> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
> already exists.
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
>
> Please let me know if anyone knows how to resolve 

Re: Flink Kafka Source rebalancing - 1.14.2

2023-05-09 Thread Shammon FY
Hi Madan,

Could you give the old and new versions of flink and provide the job plan?
I think it will help community to find the root cause

Best,
Shammon FY

On Wed, May 10, 2023 at 2:04 AM Madan D via user 
wrote:

> Hello Team,
>
> We have been using Flink Kafka consumer and recently we have been moving
> to Flink Kafka source to get more advanced features but we have been
> observing more rebalances right after data consumed and moving to next
> operator than Flink Kafka consumer.
>
> Can you please let us know what might be causing even though we are using
> same parallelism in old and new jobs and we are on Flink 1.14.2.
>
>
> Regards,
> Madan
>


Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 Thread Shammon FY
Hi

如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
DataStream s1 = ...;
DataStream s2 = ...;
DataStream s = s1.union(s1)...;
Pattern = Pattern.begin("first")
.subtype(E1.class)
.where(...)
.followedBy("second")
.subtype(E2.class)
.where(...)

如果使用Flink SQL,可以直接使用双流Join+窗口实现

Best,
Shammon FY




On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:

> 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
>
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。


Re: Failed to initialize delegation token receiver s3

2023-05-09 Thread Hangxiang Yu
Hi,这个应该是FLINK-31839已经确定的ISSUE,在1.17.1中已经修复了,可以参考:
https://issues.apache.org/jira/browse/FLINK-31839

On Sat, May 6, 2023 at 5:00 PM maker_d...@foxmail.com <
maker_d...@foxmail.com> wrote:

> flink version:flink-1.17.0
> k8s application模式模式
>
> 已经在flink-conf中禁用delegation token:
> security.delegation.tokens.enabled: false
>
> 程序原本是1.13版本开发,正常使用,升级flink版本为1.17.0之后无法启动。
> 起初没有禁用delegation token,JobManager无法启动,禁用delegation
> token后JobManager可以正常启动,TaskManager报错如下:
>
> 2023-05-06 16:52:45,720 INFO
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Delegation token receiver s3 loaded and initialized
> 2023-05-06 16:52:45,722 INFO
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Delegation token receiver s3 loaded and initialized
> 2023-05-06 16:52:45,723 ERROR
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
> [] - Failed to initialize delegation token receiver s3
> java.lang.IllegalStateException: Delegation token receiver with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.(DelegationTokenReceiverRepository.java:60)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:245)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:293)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:486)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.main(KubernetesTaskExecutorRunner.java:46)
> [flink-dist-1.17.0.jar:1.17.0]
> 2023-05-06 16:52:45,729 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
> Terminating TaskManagerRunner with exit code 1.
> org.apache.flink.util.FlinkException: Failed to start the
> TaskManagerRunner.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:488)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.main(KubernetesTaskExecutorRunner.java:46)
> [flink-dist-1.17.0.jar:1.17.0]
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> java.lang.IllegalStateException: Delegation token receiver with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:93)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> 

Re: 退订

2023-05-09 Thread Yuxin Tan
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到
user-zh-unsubscr...@flink.apache.org ,参考[1]

[1] https://flink.apache.org/zh/community/

Best,
Yuxin


胡家发 <15802974...@163.com> 于2023年5月7日周日 22:14写道:

> 退订


使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 Thread casel.chen
需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。

Flink Kafka Source rebalancing - 1.14.2

2023-05-09 Thread Madan D via user
Hello Team,
 We have been using Flink Kafka consumer and recently we have been moving to 
Flink Kafka source to get more advanced features but we have been observing 
more rebalances right after data consumed and moving to next operator than 
Flink Kafka consumer.

Can you please let us know what might be causing even though we are using same 
parallelism in old and new jobs and we are on Flink 1.14.2.

Regards,Madan 

退订

2023-05-09 Thread 张胜军



退订












The following is the content of the forwarded email
From:"胡家发" <15802974...@163.com>
To:user-zh 
Date:2023-05-07 22:13:55
Subject:退订

退订





Re: 退订

2023-05-09 Thread Hongshun Wang
Please send email to  user-zh-unsubscr...@flink.apache.org
 if you want to unsubscribe the mail from
user-zh-unsubscr...@flink.apache.org , and you can
refer[1][2] for more details.
请发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理邮件订阅。

Best Hongshun,

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2]https://flink.apache.org/community.html#mailing-lists


On Sun, May 7, 2023 at 10:14 PM 胡家发 <15802974...@163.com> wrote:

> 退订


退订

2023-05-09 Thread Zhanshun Zou
退订


Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-09 Thread Andrew Otto
> Could you please open a jira

Done: https://issues.apache.org/jira/browse/FLINK-32041

> PR (in case you fixed this already)
Haven't fixed it yet! But if I find time to do it I will!

Thanks!


On Tue, May 9, 2023 at 4:49 AM Tamir Sagi 
wrote:

> Hey,
>
> I also encountered something similar with different error.  I enabled HA
> with RBAC.
>
> org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
> executing: GET at: https://172.20.0.1/api/v1/nodes. Message:
> Forbidden!Configured service account doesn't have access. Service account
> may have been revoked. nodes is forbidden: User
> "system:serviceaccount:dev-0-flink-clusters:
> *dev-0-xsight-flink-operator-sa*" cannot list resource "nodes" in API
> group "" at the cluster scope."
>
> I checked the rolebinding between the service account 
> `dev-0-flink-clusters:dev-0-xsight-flink-operator-sa`
> and the corresponded role(*flink-operator*) which has been created by the
> operator using *rbac.nodesRule.create=true.*
>
> role binding
>
>
> role: flink-operator
>
>
> am I missing something?*​*
>
>
> --
> *From:* Gyula Fóra 
> *Sent:* Tuesday, May 9, 2023 7:43 AM
> *To:* Andrew Otto 
> *Cc:* User 
> *Subject:* Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?
>
>
> *EXTERNAL EMAIL*
>
>
> Hey!
>
> Sounds like a bug :) Could you please open a jira / PR (in case you fixed
> this already)?
>
> Thanks
> Gyula
>
> On Mon, 8 May 2023 at 22:20, Andrew Otto  wrote:
>
> Hi,
>
> I'm trying to enable HA for flink-kubernetes-operator
> 
> with Helm.  We are using namespaced RBAC via watchedNamespaces.
>
> I've followed instructions and set
> kubernetes.operator.leader-election.enabled and
> kubernetes.operator.leader-election.lease-name, and increased replicas to
> 2.  When I deploy, the second replica comes online, but errors with:
>
> Exception occurred while acquiring lock 'LeaseLock: flink-operator -
> flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
> Failure executing: GET at:
> https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. leases.coordination.k8s.io
> "flink-operator-lease" is forbidden: User
> "system:serviceaccount:flink-operator:flink-operator" cannot get resource
> "leases" in API group "coordination.k8s.io" in the namespace
> "flink-operator".
>
> Looking at the rbac.yaml helm template
> ,
> it looks like the Role and RoleBindings that grant access to the leases
> resource are created for the configured watchNamespaces, but not for the
> namespace in which the flink-kubernetes-operator is deployed.  I think that
> for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
> in its own namespace, right?
>
> Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
> betcha I'm just doing something wrong (unless I'm the first person who's
> tried to use HA + namespaced RBAC with the helm charts?).
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread amenreet sodhi
Hi all,

Is there any way to prevent restart of flink job, or override the
checkpoint metadata, if for some reason there exists a checkpoint by same
name. I get the following exception and my job restarts, have been trying
to find solution for a very long time but havent found anything useful yet,
other than manually cleaning.

2023-02-27 10:00:50,360 WARN
org.apache.flink.runtime.checkpoint.CheckpointFailureManager
[] - Failed to trigger or complete checkpoint 1 for job
6e6b1332. (0 consecutive failed attempts so far)

org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
finalize checkpoint.

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]

at java.lang.Thread.run(Thread.java:834) [?:?]

Caused by: java.io.IOException: Target file
file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
already exists.

at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
~[event_executor-1.0-SNAPSHOT.jar:?]

... 7 more

2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
  [] - Error while processing AcknowledgeCheckpoint message

org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize
the pending checkpoint 1. Failure reason: Failure to finalize checkpoint.

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]

at java.lang.Thread.run(Thread.java:834) [?:?]

Caused by: java.io.IOException: Target file
file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
already exists.

at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
~[event_executor-1.0-SNAPSHOT.jar:?]


Please let me know if anyone knows how to resolve this issue.

Thanks and Regards

Amenreet Singh Sodhi


Failed Kafka Offset Commit

2023-05-09 Thread amenreet sodhi
Hi all,
Hi Team,
I am deploying my job in application mode on Flink-1.16.0, but keep
constantly receiving this error from a long time:

2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=event-executor-client-1,
groupId=event-executor-grp] Node 1 disconnected.
2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=event-executor-client-1,
groupId=event-executor-grp] Cancelled in-flight FETCH request with
correlation id 8759 due to node 1 being disconnected (elapsed time
since creation: 0ms, elapsed time since send: 0ms, request timeout:
3ms)
2023-04-10 13:48:39,366 INFO
org.apache.kafka.clients.FetchSessionHandler [] -
[Consumer clientId=event-executor-client-1,
groupId=event-executor-grp] Error sending fetch request
(sessionId=INVALID, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.DisconnectException: null
2023-04-10 13:48:40,317 INFO  org.apache.kafka.clients.NetworkClient
[] - [Producer clientId=producer-122] Node 19
disconnected.
2023-04-10 13:51:20,059 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=event-executor-client-1,
groupId=event-executor-grp] Node 16 disconnected.
2023-04-10 13:51:20,060 INFO  org.apache.kafka.clients.NetworkClient
[] - [Consumer clientId=event-executor-client-1,
groupId=event-executor-grp] Cancelled in-flight FETCH request with
correlation id 10312 due to node 16 being disconnected (elapsed time
since creation: 0ms, elapsed time since send: 0ms, request timeout:
3ms)
2023-04-10 13:51:20,060 INFO
org.apache.kafka.clients.FetchSessionHandler [] -
[Consumer clientId=event-executor-client-1,
groupId=event-executor-grp] Error sending fetch request
(sessionId=INVALID, epoch=INITIAL) to node 16:
org.apache.kafka.common.errors.DisconnectException: null
2023-04-10 13:51:48,469 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] -
Failed to commit consumer offsets for checkpoint 4
org.apache.kafka.clients.consumer.RetriableCommitFailedException:
Offset commit failed with a retriable exception. You should retry
committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
The coordinator is not available.
2023-04-10 13:51:48,829 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] -
Failed to commit consumer offsets for checkpoint 4
org.apache.kafka.clients.consumer.RetriableCommitFailedException:
Offset commit failed with a retriable exception. You should retry
committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
The coordinator is not available.

Also, found this ticket with the above issue:
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
Seems the issue is still not resolved but the ticket appears to be closed.
Someone in flink community said, there is an updated ticket for this issue.
Can some one share the latest ticket to keep track of this issue. Also, If
anyone knows how to resolve this issue, please help.Thanks

Regards
Amenreet Singh Sodhi


Re: Encryption of parameters in flink-conf.yaml

2023-05-09 Thread Gabor Somogyi
hi Anuj,

As Martijn said IAM is the preferred option but if you've no other way than
access keys then environment variables is a better choice.
Such case conf doesn't contain plain text keys.

Just a side note, putting `s3a.access.key` into Flink conf file is not
configuring Hadoop S3. The way how it goes is to set
`flink.hadoop.s3a.access.key`.
Practically all configs must be prefixed w/ `flink.hadoop.` to notify Flink
that these must be forwarded to Hadoop.

G


On Tue, May 9, 2023 at 1:50 PM Martijn Visser 
wrote:

> Hi Anuj,
>
> You can't provide the values for S3 in job code, since the S3 filesystems
> are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
> recommended method for setting up credentials is by using IAM, not via
> Access Keys. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
> for more details.
>
> Best regards,
>
> Martijn
>
> On Tue, May 9, 2023 at 1:35 PM Anuj Jain  wrote:
>
>> Hi,
>>
>> Thanks for the reply.
>>
>> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
>> If i understood correctly, even with init-container the flink-conf.yaml
>> (inside the container) would finally contain unencrypted values for access
>> tokens. We don't want to persist such sensitive data unencrypted even
>> inside running containers in files or config maps, due to some security
>> constraints in my project.
>> Can you please let me know if I missed something with the suggested
>> solution.
>>
>> Problem with overriding configuration programmatically:
>> When I removed the S3 properties from flink-conf.yaml and tried to
>> provide it programmatically from the job code, the connection to S3 failed.
>> I tried it with Application mode also on a standalone cluster but the
>> behavior is the same.
>>
>> //My job main method (with default flink-conf.yaml):
>> Configuration conf = new Configuration();
>> conf.setString("s3a.access.key", );
>> conf.setString("s3a.secret.key", );
>> conf.setString("s3a.aws.credentials.provider",
>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>> conf.setString("s3a.assumed.role.arn", );
>> conf.setString("s3a.assumed.role.session.name", );
>> conf.setString("s3a.assumed.role.session.duration", );
>> conf.setString("s3a.assumed.role.sts.endpoint", );
>> conf.setString("s3a.assumed.role.sts.endpoint.region", );
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>>
>> // flink job program using DataStream
>>
>> env.execute("My job");
>>
>> With this i got connection exception
>> Caused by: org.apache.flink.util.SerializedThrowable:
>> com.amazonaws.SdkClientException: Unable to load AWS credentials from
>> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
>> at
>> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
>> ~[?:?]
>> at
>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
>> ~[?:?]
>>
>> When these values are given in flink-conf.yaml instead of job code, then
>> connection was successful. Please guide if i am doing something incorrect
>> w.r.t the job program.
>>
>> Regards
>> Anuj
>>
>> On Mon, May 8, 2023 at 12:36 PM Biao Geng  wrote:
>>
>>> Hi Anuj,
>>>
>>> To my best knowledge, flink does not provide the encryption strategy
>>> support for now. If you are using flink on k8s, it is possible to achieve
>>> the encryption of parameters using the init container. You can check this
>>> SO
>>> 
>>>  for
>>> more detailed instructions.
>>> Besides, it should be possible to override Configuration object in your
>>> job code. Are you using Application mode to run the job?
>>>
>>> Best regards,
>>> Biao Geng
>>>
>>> Anuj Jain  于2023年5月8日周一 13:55写道:
>>>
 Hi Community,
 I am trying to create an amazon S3 filesystem distributor using flink
 and for this I am using hadoop S3a connector with Flink filesystem sink.
 My flink application would run in a non-AWS environment, on native
 cluster; so I need to put my access keys in flink configuration.

 For connecting to S3 storage, i am configuring flink-conf.yaml with the
 access credentials like
 s3.access.key: 
 s3.secret.key: 
 ... and some other parameters required for assuming AWS IAM role with
 s3a AssumedRoleCredentialProvider

 Is there a way to encrypt these parameters rather than putting them
 directly or is there any other way to supply them programmatically.

 I tried to set them programmatically using the Configuration object and
 supplying them with
 StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
 job (rather than from 

Re: Encryption of parameters in flink-conf.yaml

2023-05-09 Thread Martijn Visser
Hi Anuj,

You can't provide the values for S3 in job code, since the S3 filesystems
are loaded via plugins. Credentials must be stored in flink-conf.yaml. The
recommended method for setting up credentials is by using IAM, not via
Access Keys. See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
for more details.

Best regards,

Martijn

On Tue, May 9, 2023 at 1:35 PM Anuj Jain  wrote:

> Hi,
>
> Thanks for the reply.
>
> Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
> If i understood correctly, even with init-container the flink-conf.yaml
> (inside the container) would finally contain unencrypted values for access
> tokens. We don't want to persist such sensitive data unencrypted even
> inside running containers in files or config maps, due to some security
> constraints in my project.
> Can you please let me know if I missed something with the suggested
> solution.
>
> Problem with overriding configuration programmatically:
> When I removed the S3 properties from flink-conf.yaml and tried to provide
> it programmatically from the job code, the connection to S3 failed.
> I tried it with Application mode also on a standalone cluster but the
> behavior is the same.
>
> //My job main method (with default flink-conf.yaml):
> Configuration conf = new Configuration();
> conf.setString("s3a.access.key", );
> conf.setString("s3a.secret.key", );
> conf.setString("s3a.aws.credentials.provider",
> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
> conf.setString("s3a.assumed.role.arn", );
> conf.setString("s3a.assumed.role.session.name", );
> conf.setString("s3a.assumed.role.session.duration", );
> conf.setString("s3a.assumed.role.sts.endpoint", );
> conf.setString("s3a.assumed.role.sts.endpoint.region", );
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>
> // flink job program using DataStream
>
> env.execute("My job");
>
> With this i got connection exception
> Caused by: org.apache.flink.util.SerializedThrowable:
> com.amazonaws.SdkClientException: Unable to load AWS credentials from
> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
> at
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
> ~[?:?]
> at
> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
> ~[?:?]
>
> When these values are given in flink-conf.yaml instead of job code, then
> connection was successful. Please guide if i am doing something incorrect
> w.r.t the job program.
>
> Regards
> Anuj
>
> On Mon, May 8, 2023 at 12:36 PM Biao Geng  wrote:
>
>> Hi Anuj,
>>
>> To my best knowledge, flink does not provide the encryption strategy
>> support for now. If you are using flink on k8s, it is possible to achieve
>> the encryption of parameters using the init container. You can check this
>> SO
>> 
>>  for
>> more detailed instructions.
>> Besides, it should be possible to override Configuration object in your
>> job code. Are you using Application mode to run the job?
>>
>> Best regards,
>> Biao Geng
>>
>> Anuj Jain  于2023年5月8日周一 13:55写道:
>>
>>> Hi Community,
>>> I am trying to create an amazon S3 filesystem distributor using flink
>>> and for this I am using hadoop S3a connector with Flink filesystem sink.
>>> My flink application would run in a non-AWS environment, on native
>>> cluster; so I need to put my access keys in flink configuration.
>>>
>>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>>> access credentials like
>>> s3.access.key: 
>>> s3.secret.key: 
>>> ... and some other parameters required for assuming AWS IAM role with
>>> s3a AssumedRoleCredentialProvider
>>>
>>> Is there a way to encrypt these parameters rather than putting them
>>> directly or is there any other way to supply them programmatically.
>>>
>>> I tried to set them programmatically using the Configuration object and
>>> supplying them with
>>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>>> think flink creates the connection pool at startup even before the job is
>>> started.
>>>
>>> Thanks and Regards
>>> Anuj Jain
>>>
>>


Re: Encryption of parameters in flink-conf.yaml

2023-05-09 Thread Anuj Jain
Hi,

Thanks for the reply.

Yes my flink deployment is on K8s but I am not using Flink-k8s operator.
If i understood correctly, even with init-container the flink-conf.yaml
(inside the container) would finally contain unencrypted values for access
tokens. We don't want to persist such sensitive data unencrypted even
inside running containers in files or config maps, due to some security
constraints in my project.
Can you please let me know if I missed something with the suggested
solution.

Problem with overriding configuration programmatically:
When I removed the S3 properties from flink-conf.yaml and tried to provide
it programmatically from the job code, the connection to S3 failed.
I tried it with Application mode also on a standalone cluster but the
behavior is the same.

//My job main method (with default flink-conf.yaml):
Configuration conf = new Configuration();
conf.setString("s3a.access.key", );
conf.setString("s3a.secret.key", );
conf.setString("s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
conf.setString("s3a.assumed.role.arn", );
conf.setString("s3a.assumed.role.session.name", );
conf.setString("s3a.assumed.role.session.duration", );
conf.setString("s3a.assumed.role.sts.endpoint", );
conf.setString("s3a.assumed.role.sts.endpoint.region", );
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);

// flink job program using DataStream

env.execute("My job");

With this i got connection exception
Caused by: org.apache.flink.util.SerializedThrowable:
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
at
com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
~[?:?]
at
org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
~[?:?]

When these values are given in flink-conf.yaml instead of job code, then
connection was successful. Please guide if i am doing something incorrect
w.r.t the job program.

Regards
Anuj

On Mon, May 8, 2023 at 12:36 PM Biao Geng  wrote:

> Hi Anuj,
>
> To my best knowledge, flink does not provide the encryption strategy
> support for now. If you are using flink on k8s, it is possible to achieve
> the encryption of parameters using the init container. You can check this
> SO
> 
>  for
> more detailed instructions.
> Besides, it should be possible to override Configuration object in your
> job code. Are you using Application mode to run the job?
>
> Best regards,
> Biao Geng
>
> Anuj Jain  于2023年5月8日周一 13:55写道:
>
>> Hi Community,
>> I am trying to create an amazon S3 filesystem distributor using flink and
>> for this I am using hadoop S3a connector with Flink filesystem sink.
>> My flink application would run in a non-AWS environment, on native
>> cluster; so I need to put my access keys in flink configuration.
>>
>> For connecting to S3 storage, i am configuring flink-conf.yaml with the
>> access credentials like
>> s3.access.key: 
>> s3.secret.key: 
>> ... and some other parameters required for assuming AWS IAM role with s3a
>> AssumedRoleCredentialProvider
>>
>> Is there a way to encrypt these parameters rather than putting them
>> directly or is there any other way to supply them programmatically.
>>
>> I tried to set them programmatically using the Configuration object and
>> supplying them with
>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
>> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
>> think flink creates the connection pool at startup even before the job is
>> started.
>>
>> Thanks and Regards
>> Anuj Jain
>>
>


StatsdMetricsReporter is emitting all metric types as gauges

2023-05-09 Thread Iris Grace Endozo
Hey folks trying to troubleshoot why counter metrics are appearing as gauges on 
my end. Is it expected that the StatsdMetricsReporter is reporting gauges for 
counters as well?

Looking at this one: 
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
 the statsd specifications state that counters need to be reported as :|c[|@] but it seems it's defaulting to "%s:%s|g" in 
the above. Ref: https://github.com/b/statsd_spec#counters

Wondering if anyone else has hit this issue or there's an existing issue?

Cheers, Iris.

--

Iris Grace Endozo, Senior Software Engineer
M +61 435 108 697
E iris.end...@gmail.com


Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-09 Thread Tamir Sagi
Hey,

I also encountered something similar with different error.  I enabled HA with 
RBAC.

org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: GET at: https://172.20.0.1/api/v1/nodes. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. nodes is forbidden: User 
"system:serviceaccount:dev-0-flink-clusters:dev-0-xsight-flink-operator-sa" 
cannot list resource "nodes" in API group "" at the cluster scope."

I checked the rolebinding between the service account 
`dev-0-flink-clusters:dev-0-xsight-flink-operator-sa` and the corresponded 
role(flink-operator) which has been created by the operator using 
rbac.nodesRule.create=true.

role binding
[cid:f406839c-3d43-4f87-91c4-a3f07fbeb388]

role: flink-operator

[cid:a05c06e2-48ea-48f6-9efb-017edebafea3]

am I missing something?​



From: Gyula Fóra 
Sent: Tuesday, May 9, 2023 7:43 AM
To: Andrew Otto 
Cc: User 
Subject: Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?


EXTERNAL EMAIL


Hey!

Sounds like a bug :) Could you please open a jira / PR (in case you fixed this 
already)?

Thanks
Gyula

On Mon, 8 May 2023 at 22:20, Andrew Otto 
mailto:o...@wikimedia.org>> wrote:
Hi,

I'm trying to enable HA for 
flink-kubernetes-operator
 with Helm.  We are using namespaced RBAC via watchedNamespaces.

I've followed instructions and set kubernetes.operator.leader-election.enabled 
and kubernetes.operator.leader-election.lease-name, and increased replicas to 
2.  When I deploy, the second replica comes online, but errors with:

Exception occurred while acquiring lock 'LeaseLock: flink-operator - 
flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
Failure executing: GET at: 
https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
 Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. 
leases.coordination.k8s.io 
"flink-operator-lease" is forbidden: User 
"system:serviceaccount:flink-operator:flink-operator" cannot get resource 
"leases" in API group "coordination.k8s.io" in the 
namespace "flink-operator".

Looking at the rbac.yaml helm 
template,
 it looks like the Role and RoleBindings that grant access to the leases 
resource are created for the configured watchNamespaces, but not for the 
namespace in which the flink-kubernetes-operator is deployed.  I think that for 
HA, the flink-kubernetes-operator is going to be asking k8s for Leases in its 
own namespace, right?

Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I betcha 
I'm just doing something wrong (unless I'm the first person who's tried to use 
HA + namespaced RBAC with the helm charts?).

Thanks!
-Andrew Otto
 Wikimedia Foundation






Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-05-09 Thread Hang Ruan
Hi, Pritam,

I see Martijn has responsed the ticket.

Kafka source (FLIP-27) will commit offsets in two places: kafka consumer
auto commit and invoke `consumer.commitAsync` when checkpoint is completed.
- If the checkpoint is enabled and commit.offsets.on.checkpoint = true,
kafka connector commits offsets when a checkpoint is completed.
- If properties.enable.auto.commit = true, kafka consumers auto commit
offsets periodically (the interval depends on `
properties.auto.commit.interval.ms`).

If the checkpoint is enabled and commit.offsets.on.checkpoint = true and
properties.enable.auto.commit = true, I think the offsets will be committed
in both two places.

Best,
Hang

Pritam Agarwala  于2023年5月9日周二 14:24写道:

> Hi Team,
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka
> committed offset. Flink is updating the offsets only after checkpointing to
> make it consistent.
>
> Default Behaviour as per doc :
> If checkpoint is enabled, but consumer.setCommitOffsetsOnCheckpoints set
> to false, then offset will not be committed at all even if the
> enable.auto.commit is set to true.
>
> So, when consumer.setCommitOffsetsOnCheckpoints set to false, *shouldn't
> it fall back on the enable.auto.commit to do offset commit regularly since*
>  *in any case flink doesn't use consumer committed offsets for recovery.*
>
>
> Jira Ticket : https://issues.apache.org/jira/browse/FLINK-32038
>
> Thanks & Regards,
> Pritam Agarwala
> Senior Data Engineer
>


OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-05-09 Thread Pritam Agarwala
Hi Team,

I need to get kafka-lag to prepare a graph and its dependent on kafka
committed offset. Flink is updating the offsets only after checkpointing to
make it consistent.

Default Behaviour as per doc :
If checkpoint is enabled, but consumer.setCommitOffsetsOnCheckpoints set to
false, then offset will not be committed at all even if the
enable.auto.commit is set to true.

So, when consumer.setCommitOffsetsOnCheckpoints set to false, *shouldn't it
fall back on the enable.auto.commit to do offset commit regularly since* *in
any case flink doesn't use consumer committed offsets for recovery.*


Jira Ticket : https://issues.apache.org/jira/browse/FLINK-32038

Thanks & Regards,
Pritam Agarwala
Senior Data Engineer