Re: (Possible) bug in flink-kafka-connector (metrics rewriting)

2022-08-02 Thread zhanghao.chen
Hi, I suggest you creating a ticket for it on 
https://issues.apache.org/jira/projects/FLINK/summary.
Flink - ASF JIRA
Welcome to the Apache Flink project. Apache Flink is an open source platform 
for scalable batch and stream data processing.
issues.apache.org


Best,
Zhanghao Chen

From: Valentina Predtechenskaya 
Sent: Wednesday, August 3, 2022 1:32
To: user@flink.apache.org 
Subject: (Possible) bug in flink-kafka-connector (metrics rewriting)



Hello !


I would like to report a bug with metrics registration on KafkaProducer 
initialization.

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.


After long investigation we found the root-cause of this behavior:

  *   KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
initializes metric wrappers over existing KafkaProducer metrics (gauges)  - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330
  *   KafkaProducer itself in the constructor creates Sender to access brokers, 
starts a thread (kafka-producer-network-thread) and run Sender in this separate 
thread - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460
  *   After starting the Sender, metrics connected with topics and brokers 
register for some time. If they register quickly, KafkaWriter will see them 
before the end of initialization and these metrics will be wrapped as flink 
gauges. Otherwise, they will not.
  *   Some KafkaProducer metrics from producer and from broker has same names - 
for example, outgoing-byte-rate - 
https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
  *   In case if two metrics has same name, Flink KafkaWriter rewrites metric 
in wrapper - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360

I have debugged these libraries a lot and I'm sure in that behavior. If, for 
example, patch flink-kafka-connector with condition not to initialize metric if 
"producer-node-metrics".equals(metric.metricName().group()), our metrics all 
fine (outgoing-byte-rate is not 0).
Also, the bug does not reproduce if cluster is not very fast (for example, on 
local machine) and data from brokers comes only when all metrics initialized in 
KafkaWriter.

I suppose this is not an expected behavior, but even in the last version of 
flink-kafka-connector code is the same. Is there any treatement ? Maybe some 
workarounds ? To be honest, I don't really want to use my own patched version 
of connector.

Thank you !



“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”


Flink SQL and tumble window by size (number of rows)

2022-08-02 Thread Marco Villalobos
Is it possible in Flink SQL to tumble a window by row size instead of time?

Let's say that I want a window for every 1 rows for example using the Flink 
SQL API.

is that possible?  

I can't find any documentation on how to do that, and I don't know if it is 
supported.

HybridSource permanently failed after restoring from checkpoint

2022-08-02 Thread Benenson, Michael via user
Hi, folks

I’m running Flink application that use HybridSource, patched with fixes 
FLINK-27479 and FLINK-27529

This application use HybridSource and presto plugin to read from a few 
thousands s3 directories, and then switch to reading from Kafka.

Reading from s3 could cause intermittent errors, that usually are fixed after 
retrying, but there is a problem, when Flink try to recover from this failure 
and restart from checkpoint:
java.lang.NullPointerException: Source for index=0 not available
 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
 at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

Complete scenario:


  1.  CheckpointCoordinator - Completed checkpoint 14 for job 

  2.  HybridSource successfully completed processing a few SourceFactories, 
that reads from s3
  3.  Next SourceFactory try to read contents of s3 dir, and it cause an error 
Unable to execute HTTP request: Read timed out
  4.  CheckpointCoordinator - Restoring job  
from Checkpoint 14
  5.  HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47
  6.  This restoring fail, because of NullPointerException: in 
HybridSourceSplitEnumerator.close
  7.  Again, CheckpointCoordinator trying to - Restoring job 
 from Checkpoint 14
  8.  It causes

2022/08/02 22:26:52.469 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
exception in the SplitEnumerator for Source Source: hybrid-source while 
handling operator event 
SourceEventWrapper[SourceReaderFinishedEvent{sourceIndex=-1}] from subtask 10. 
Triggering job failover.

java.lang.NullPointerException: Source for index=0 not available

 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)

 at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

 at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:149)

 at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:223)

 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)

 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 at java.lang.Thread.run(Thread.java:750)

10.  And this pattern continues forever: Flink try restoring from checkpoint, 
but it fails, because of NullPointerException: Source for index=0 not available


Any idea, what could be the cause of the problem? Could some experts in 
HybridSource look at the issue?

I have attached extract of JobMgr log, that contains related information, I can 
send complete log, but its size is a few M.

The problem is reproducible, after a few hours run in my environment.

And I think we need Jira for this issue, could someone, please, create it?



bf-29-JM-err-analysis.log
Description: bf-29-JM-err-analysis.log


(Possible) bug in flink-kafka-connector (metrics rewriting)

2022-08-02 Thread Valentina Predtechenskaya

Hello !


I would like to report a bug with metrics registration on KafkaProducer 
initialization.

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.


After long investigation we found the root-cause of this behavior:

  *   KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
initializes metric wrappers over existing KafkaProducer metrics (gauges)  - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330
  *   KafkaProducer itself in the constructor creates Sender to access brokers, 
starts a thread (kafka-producer-network-thread) and run Sender in this separate 
thread - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460
  *   After starting the Sender, metrics connected with topics and brokers 
register for some time. If they register quickly, KafkaWriter will see them 
before the end of initialization and these metrics will be wrapped as flink 
gauges. Otherwise, they will not.
  *   Some KafkaProducer metrics from producer and from broker has same names - 
for example, outgoing-byte-rate - 
https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
  *   In case if two metrics has same name, Flink KafkaWriter rewrites metric 
in wrapper - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360

I have debugged these libraries a lot and I'm sure in that behavior. If, for 
example, patch flink-kafka-connector with condition not to initialize metric if 
"producer-node-metrics".equals(metric.metricName().group()), our metrics all 
fine (outgoing-byte-rate is not 0).
Also, the bug does not reproduce if cluster is not very fast (for example, on 
local machine) and data from brokers comes only when all metrics initialized in 
KafkaWriter.

I suppose this is not an expected behavior, but even in the last version of 
flink-kafka-connector code is the same. Is there any treatement ? Maybe some 
workarounds ? To be honest, I don't really want to use my own patched version 
of connector.

Thank you !



"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


Re: WELCOME to user@flink.apache.org

2022-08-02 Thread Tamir Sagi
Hey Karthikeyan,

Welcome to Flink.

Make sure that

  1.  IAM role has enough permissions to the required buckets. ( and 
/*)
  2.
  3.  Once you define the IAM role, you need to annotate the SA with the role 
ARN and attach the SA to Flink pods. ("kubernetes.service-account" under 
flink-conf.yaml
i.e
annotations = {
  "eks.amazonaws.com/role-arn" =
}

  4.  Add aws-java-sdk-sts dependency(1.12.+) to job's jar. (to read the SA 
properly).



From: Karthikeyan Muthusamy (karmuthu) 
Sent: Tuesday, August 2, 2022 5:25 PM
To: user-h...@flink.apache.org ; 
user@flink.apache.org 
Cc: Mohan S G (mosg) 
Subject: Re: WELCOME to user@flink.apache.org


EXTERNAL EMAIL



Hi Team,



We have deployed our Flink Cluster on AWS EKS using Flink Operator. We have 
created required service accounts with IAM OIDC integration, however flink-main 
container seems to bypass this service account role and directly tries to 
create and delete objects in s3 using Node role. As the flink application fails 
to use service account, the access is getting denied.



**Please note that we have enabled HA and our Storage Directory is s3 and if we 
log into the pod and check, it does have the required permission at the 
container level and are able to write and read from s3, however application is 
not using the service account.



Error:

Caused by: java.nio.file.AccessDeniedException: 
s3a://preint-us-east-1-flink/flink-peak-trunk-utilization/flink-deploy/submittedJobGraph50e30a08e280:
 delete on 
s3a://preint-us-east-1-flink/flink-peak-trunk-utilization/flink-deploy/submittedJobGraph50e30a08e280:
 com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: 
Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 
MCRDGVQN7X6EVJ69; S3 Extended Request ID: 
+MwyWabe4upNDsWmEJeEOxtvRYCJDa840uk5AtLam0c3O4vnZMD3k4tlI+VU+o/o2JgO9GpMDYY=; 
Proxy: null), S3 Extended Request ID: 
+MwyWabe4upNDsWmEJeEOxtvRYCJDa840uk5AtLam0c3O4vnZMD3k4tlI+VU+o/o2JgO9GpMDYY=:AccessDenied



-- Karthikeyan

   Technical Leader Engineering

   Cisco Systems

   Ph: 9019431391





From: user-h...@flink.apache.org 
Date: Tuesday, 2 August 2022 at 7:46 PM
To: Karthikeyan Muthusamy (karmuthu) 
Subject: WELCOME to user@flink.apache.org

Hi! This is the ezmlm program. I'm managing the
user@flink.apache.org mailing list.

Acknowledgment: I have added the address

   karmu...@cisco.com

to the user mailing list.

Welcome to user@flink.apache.org!

Please save this message so that you know the address you are
subscribed under, in case you later want to unsubscribe or change your
subscription address.


--- Administrative commands for the user list ---

I can handle administrative requests automatically. Please
do not send them to the list address! Instead, send
your message to the correct command address:

To subscribe to the list, send a message to:
   

To remove your address from the list, send a message to:
   

Send mail to the following for info and FAQ for this list:
   
   

Similar addresses exist for the digest list:
   
   

To get messages 123 through 145 (a maximum of 100 per request), mail:
   

To get an index with subject and author for messages 123-456 , mail:
   

They are always returned as sets of 100, max 2000 per request,
so you'll actually get 100-499.

To receive all messages with the same subject as message 12345,
send a short message to:
   

The messages should contain one line or word of text to avoid being
treated as sp@m, but I will ignore their content.
Only the ADDRESS you send to is important.

You can start a subscription for an alternate address,
for example "john@host.domain", just add a hyphen and your
address (with '=' instead of '@') after the command word:


To stop subscription for this address, mail:


In both cases, I'll send a confirmation message to that address. When
you receive it, simply reply to it to complete your subscription.

If despite following these instructions, you do not get the
desired results, please contact my owner at
user-ow...@flink.apache.org. Please be patient, my owner is a
lot slower than I am ;-)

--- Enclosed is a copy of the request I received.

Return-Path: 
Received: (qmail 3559598 invoked by uid 116); 2 Aug 2022 14:16:38 -
Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) 
(116.203.196.100)
 by apache.org (qpsmtpd/0.94) with ESMTP; Tue, 02 Aug 2022 14:16:38 +
Authentication-Results: apache.org; auth=none
Received: from localhost (localhost [127.0.0.1])
by spamproc1-he-de.apache.org (ASF Mail Server at 
spamproc1-he-de.apache.org) with ESMTP id 7573D1FF613
for 
; 
Tue,  2 Aug 2022 14:16:38 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
X-Spam-Flag: NO
X-Spam-Score: -7.51
X-Spam-Level:
X-Spam-Status: No, score=-7.51 tagged_above=-999 required=6.31
tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
DKIM_VALID_EF=-0.1, 

Failed to stop with savepoint

2022-08-02 Thread hjw
Env:
Flink version:1.15.0
deploy mode :K8s applicaiton Mode. local mini cluster also have this problem.
Kafka Connector : use Kafka SourceFunction . No new Api.

action:
I post a save with savepoint request to Flink Job throught rest api.
A Error happened in Kafka connector close.
The job will enter restarting .
It is successful to use savepoint command alone.


Error log:


13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source - nlp-clean 
(1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed
13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source - cpp-clean 
(1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info 
kafka.consumer for consumer-hjw-4 unregistered
13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source - cpp-clean 
(1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed
13:33:42.860 [Source: nlp-kafka-source - nlp-clean (1/1)#0] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask 
(operators closed: false, cancelled: false)
13:33:42.860 [jobmanager-io-thread-4] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
5 by task eeefcb27475446241861ad8db3f33144 of job 
d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @ 
127.0.0.1 (dataPort=-1).
org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: 
nlp-kafka-source - nlp-clean (1/1)#0 Failure reason: Task has failed.
at 
org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
at 
org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
Caused by: org.apache.flink.util.SerializedThrowable: 
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: null
at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
at 
org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
13:34:00.925 [flink-akka.actor.default-dispatcher-21] DEBUG 
org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request.
13:34:00.926 [flink-akka.actor.default-dispatcher-22] DEBUG 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger 
heartbeat request.
13:34:00.926 [flink-akka.actor.default-dispatcher-21] DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request 
from 

Re: WELCOME to user@flink.apache.org

2022-08-02 Thread Karthikeyan Muthusamy (karmuthu)
Hi Team,

We have deployed our Flink Cluster on AWS EKS using Flink Operator. We have 
created required service accounts with IAM OIDC integration, however flink-main 
container seems to bypass this service account role and directly tries to 
create and delete objects in s3 using Node role. As the flink application fails 
to use service account, the access is getting denied.

**Please note that we have enabled HA and our Storage Directory is s3 and if we 
log into the pod and check, it does have the required permission at the 
container level and are able to write and read from s3, however application is 
not using the service account.

Error:
Caused by: java.nio.file.AccessDeniedException: 
s3a://preint-us-east-1-flink/flink-peak-trunk-utilization/flink-deploy/submittedJobGraph50e30a08e280:
 delete on 
s3a://preint-us-east-1-flink/flink-peak-trunk-utilization/flink-deploy/submittedJobGraph50e30a08e280:
 com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: 
Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 
MCRDGVQN7X6EVJ69; S3 Extended Request ID: 
+MwyWabe4upNDsWmEJeEOxtvRYCJDa840uk5AtLam0c3O4vnZMD3k4tlI+VU+o/o2JgO9GpMDYY=; 
Proxy: null), S3 Extended Request ID: 
+MwyWabe4upNDsWmEJeEOxtvRYCJDa840uk5AtLam0c3O4vnZMD3k4tlI+VU+o/o2JgO9GpMDYY=:AccessDenied

-- Karthikeyan
   Technical Leader Engineering
   Cisco Systems
   Ph: 9019431391


From: user-h...@flink.apache.org 
Date: Tuesday, 2 August 2022 at 7:46 PM
To: Karthikeyan Muthusamy (karmuthu) 
Subject: WELCOME to user@flink.apache.org
Hi! This is the ezmlm program. I'm managing the
user@flink.apache.org mailing list.

Acknowledgment: I have added the address

   karmu...@cisco.com

to the user mailing list.

Welcome to user@flink.apache.org!

Please save this message so that you know the address you are
subscribed under, in case you later want to unsubscribe or change your
subscription address.


--- Administrative commands for the user list ---

I can handle administrative requests automatically. Please
do not send them to the list address! Instead, send
your message to the correct command address:

To subscribe to the list, send a message to:
   

To remove your address from the list, send a message to:
   

Send mail to the following for info and FAQ for this list:
   
   

Similar addresses exist for the digest list:
   
   

To get messages 123 through 145 (a maximum of 100 per request), mail:
   

To get an index with subject and author for messages 123-456 , mail:
   

They are always returned as sets of 100, max 2000 per request,
so you'll actually get 100-499.

To receive all messages with the same subject as message 12345,
send a short message to:
   

The messages should contain one line or word of text to avoid being
treated as sp@m, but I will ignore their content.
Only the ADDRESS you send to is important.

You can start a subscription for an alternate address,
for example "john@host.domain", just add a hyphen and your
address (with '=' instead of '@') after the command word:


To stop subscription for this address, mail:


In both cases, I'll send a confirmation message to that address. When
you receive it, simply reply to it to complete your subscription.

If despite following these instructions, you do not get the
desired results, please contact my owner at
user-ow...@flink.apache.org. Please be patient, my owner is a
lot slower than I am ;-)

--- Enclosed is a copy of the request I received.

Return-Path: 
Received: (qmail 3559598 invoked by uid 116); 2 Aug 2022 14:16:38 -
Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) 
(116.203.196.100)
 by apache.org (qpsmtpd/0.94) with ESMTP; Tue, 02 Aug 2022 14:16:38 +
Authentication-Results: apache.org; auth=none
Received: from localhost (localhost [127.0.0.1])
by spamproc1-he-de.apache.org (ASF Mail Server at 
spamproc1-he-de.apache.org) with ESMTP id 7573D1FF613
for 
; 
Tue,  2 Aug 2022 14:16:38 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
X-Spam-Flag: NO
X-Spam-Score: -7.51
X-Spam-Level:
X-Spam-Status: No, score=-7.51 tagged_above=-999 required=6.31
tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, SPF_PASS=-0.001,
T_SCC_BODY_TEXT_LINE=-0.01, URIBL_BLOCKED=0.001,
USER_IN_DEF_DKIM_WL=-7.5] autolearn=disabled
Authentication-Results: spamproc1-he-de.apache.org (amavisd-new);
dkim=pass (1024-bit key) header.d=cisco.com header.b=BtLzGoZh;
dkim=pass (1024-bit key) header.d=cisco.onmicrosoft.com
header.b=QVxJ3PPM
Received: from mx1-he-de.apache.org ([116.203.227.195])
by localhost (spamproc1-he-de.apache.org [116.203.196.100]) 
(amavisd-new, port 10024)
with ESMTP id oXPDzA0B_LlH
for 
;
Tue,  2 Aug 2022 14:16:36 + (UTC)
Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=173.37.142.91; 
helo=alln-iport-4.cisco.com; 

Re: Flink Operator - delete flinkdeployments

2022-08-02 Thread Sigalit Eliazov
Will do, thanks!

On Tue, Aug 2, 2022 at 3:39 PM Gyula Fóra  wrote:

> Before trying to solve any already fixed problems please upgrade to 1.1.0
> :)
>
>
>
> On Tue, Aug 2, 2022 at 2:33 PM Sigalit Eliazov 
> wrote:
>
>> we are working with 1.0.0
>>
>> On Tue, Aug 2, 2022 at 3:24 PM Gyula Fóra  wrote:
>>
>>> Are you running the latest 1.1.0 version of the operator?
>>>
>>> Gyula
>>>
>>> On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov 
>>> wrote:
>>>
 Hi,

 We are deploying a few flink clusters via the flink operator in our CI.

 In each run we first do a clean-up where one of the first steps is to
  run 'kubectl delete flinkdeployments --all -n '
 after that we also delete the flink operator pod and our all namespace.

 Lately we face issues where the deletion of the crd takes a lot of time
 and sometimes it just gets stuck and we need to manually modify finalizers
 so they will be deleted.

 Anyone faced this issue?
 Any suggestions on how to overcome it?

 Thanks
 Sigalit

>>>


Re: Flink Operator - delete flinkdeployments

2022-08-02 Thread Gyula Fóra
Before trying to solve any already fixed problems please upgrade to 1.1.0
:)



On Tue, Aug 2, 2022 at 2:33 PM Sigalit Eliazov  wrote:

> we are working with 1.0.0
>
> On Tue, Aug 2, 2022 at 3:24 PM Gyula Fóra  wrote:
>
>> Are you running the latest 1.1.0 version of the operator?
>>
>> Gyula
>>
>> On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov 
>> wrote:
>>
>>> Hi,
>>>
>>> We are deploying a few flink clusters via the flink operator in our CI.
>>>
>>> In each run we first do a clean-up where one of the first steps is to
>>>  run 'kubectl delete flinkdeployments --all -n '
>>> after that we also delete the flink operator pod and our all namespace.
>>>
>>> Lately we face issues where the deletion of the crd takes a lot of time
>>> and sometimes it just gets stuck and we need to manually modify finalizers
>>> so they will be deleted.
>>>
>>> Anyone faced this issue?
>>> Any suggestions on how to overcome it?
>>>
>>> Thanks
>>> Sigalit
>>>
>>


Re: Flink Operator - delete flinkdeployments

2022-08-02 Thread Sigalit Eliazov
we are working with 1.0.0

On Tue, Aug 2, 2022 at 3:24 PM Gyula Fóra  wrote:

> Are you running the latest 1.1.0 version of the operator?
>
> Gyula
>
> On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov 
> wrote:
>
>> Hi,
>>
>> We are deploying a few flink clusters via the flink operator in our CI.
>>
>> In each run we first do a clean-up where one of the first steps is to
>>  run 'kubectl delete flinkdeployments --all -n '
>> after that we also delete the flink operator pod and our all namespace.
>>
>> Lately we face issues where the deletion of the crd takes a lot of time
>> and sometimes it just gets stuck and we need to manually modify finalizers
>> so they will be deleted.
>>
>> Anyone faced this issue?
>> Any suggestions on how to overcome it?
>>
>> Thanks
>> Sigalit
>>
>


Re: Flink Operator - delete flinkdeployments

2022-08-02 Thread Gyula Fóra
Are you running the latest 1.1.0 version of the operator?

Gyula

On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov  wrote:

> Hi,
>
> We are deploying a few flink clusters via the flink operator in our CI.
>
> In each run we first do a clean-up where one of the first steps is to
>  run 'kubectl delete flinkdeployments --all -n '
> after that we also delete the flink operator pod and our all namespace.
>
> Lately we face issues where the deletion of the crd takes a lot of time
> and sometimes it just gets stuck and we need to manually modify finalizers
> so they will be deleted.
>
> Anyone faced this issue?
> Any suggestions on how to overcome it?
>
> Thanks
> Sigalit
>


Flink Operator - delete flinkdeployments

2022-08-02 Thread Sigalit Eliazov
Hi,

We are deploying a few flink clusters via the flink operator in our CI.

In each run we first do a clean-up where one of the first steps is to
 run 'kubectl delete flinkdeployments --all -n '
after that we also delete the flink operator pod and our all namespace.

Lately we face issues where the deletion of the crd takes a lot of time and
sometimes it just gets stuck and we need to manually modify finalizers so
they will be deleted.

Anyone faced this issue?
Any suggestions on how to overcome it?

Thanks
Sigalit


Re: Migration to application mode

2022-08-02 Thread Tamas Kiss
Thanks Lijie/Biao

To put it simply, what we like to achieve is to replace env.runAsync() with
some code that uses application mode.

@Lijie: When I set deployment mode to application I got the following
exception after submitting the job

Caused by: java.lang.IllegalStateException: No ExecutorFactory found to
execute the application.
at
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2036)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2015)

@Biao: So the only way currently to use application mode is to package our
main method and execute it on JM? Can this be done with Java code, only
found examples using Flink Cli?

Thanks
Tamas

On Tue, Aug 2, 2022 at 4:41 AM Biao Geng  wrote:

> Hi there,
> Currently, YARN Application mode only supports running a jar job. And as
> Lijie said, the main method is executed on JM, so if I understand
> correctly, your previous way of running execute() method on client side to
> submit a job to YARN cluster may not work. A quick workaround is to create
> such a job jar for submitting your SQL jobs and you can run this job jar
> using Application Mode. Thanks to Gyula's work, you can refer to
> https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
>  for
> more a minimal example.
>
> Best,
> Biao Geng
>
> Lijie Wang  于2022年8月2日周二 09:49写道:
>
>> Hi,
>> I think the difference between ApplicationMode and PerJob is just where
>> the main method is executed (ApplicationMode executes on JM, PerJob
>> executes on client side). So I think your original job code should work
>> well under ApplicationMode. Did you encounter any problems?
>> You can get more details about ApplicationMode in [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#application-mode
>>
>> Best,
>> Lijie
>>
>> Tamas Kiss  于2022年8月1日周一 19:30写道:
>>
>>> Hi Experts,
>>>
>>> We are planning to migrate our flink jobs from per-job mode to
>>> application mode in our platform. Our jobs are basically SQL scripts so we
>>> have some custom Java code to leverage Flink's SQL and Table API to build
>>> the execution environment and execute the jobs on Yarn. We would like to
>>> keep the current flow in our platform so we are looking for a way to run
>>> Flink SQL in application mode via some Java code. Does this seem to be a
>>> good approach to follow or should we look for a better solution? If this is
>>> a good choice, is there any API to support this?
>>>
>>> Thanks
>>> Tamas
>>>
>>


Re: Resources configuration on Kubernetes Session Cluster

2022-08-02 Thread Gyula Fóra
Hi Vladislav!

I am afraid there is no way to specify resources independently for jobs
within a session cluster currently in Flink.

For this I suggest using the Application Mode instead where each job can
have its own resources.

In any case you should check out the Flink Kubernetes Operator -
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
It allows you to manage session clusters, session jobs and application
deployments very conveniently :)

Cheers,
Gyula

On Tue, Aug 2, 2022 at 11:17 AM Vladislav Keda <
vladislav.k...@glowbyteconsulting.com> wrote:

> Hi,
>
> I'm trying to specify different TaskManager resources for different Flink
> jobs on Kubernetes Session Cluster. Can you help me to find a way to do
> that?
>
> I use this options, but Flink picks them up only when I start new
> Kubernetes Session deployment:
> Memory:
> jobmanager.memory.process.size
> 
> , taskmanager.memory.process.size
> 
> CPU:
> kubernetes.jobmanager.cpu
> 
> , kubernetes.taskmanager.cpu
> 
>
> *FYI*
> I deploy Flink jobs on cluster and set up specific configuration
> parameters for jobs using
> *org.apache.flink.client.program.rest.RestClusterClient*
>
> Flink version - 1.13.6.
>
> ---
>
> Best Regards,
> Vladislav Keda
>


Resources configuration on Kubernetes Session Cluster

2022-08-02 Thread Vladislav Keda
Hi,

I'm trying to specify different TaskManager resources for different Flink
jobs on Kubernetes Session Cluster. Can you help me to find a way to do
that?

I use this options, but Flink picks them up only when I start new
Kubernetes Session deployment:
Memory:
jobmanager.memory.process.size

, taskmanager.memory.process.size

CPU:
kubernetes.jobmanager.cpu

, kubernetes.taskmanager.cpu


*FYI*
I deploy Flink jobs on cluster and set up specific configuration parameters
for jobs using *org.apache.flink.client.program.rest.RestClusterClient*

Flink version - 1.13.6.

---

Best Regards,
Vladislav Keda


Re: Flink内部如何做到消息不丢失?

2022-08-02 Thread yidan zhao
我最近也在对比storm和flink。有没有大佬介绍下,storm这种ack模式的是不是恢复会更快点,目前我感觉storm的架构下,各个节点的fail
over更加独立感觉。
Flink 目前集群中任何一个机器失败都会导致整个任务重启,耗时会长点。
但是从全局资源来说,ckpt的资源占用貌似又比ack模式少。
不知道理解对不对。

tison  于2022年7月30日周六 14:28写道:
>
> 可以看下这两份材料
>
> *
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/checkpointing/
> * https://zhuanlan.zhihu.com/p/102607983
>
> 其实就是 Flink 里 Exactly-once 的实现方式,简单说来就是分布式快照批量提交,上游数据可以回放。
>
> Best,
> tison.
>
>
> m18814122325  于2022年7月30日周六 14:22写道:
>
> >
> > 在Storm中会有ack机制来保证消息是否被下个算子是否被处理,那么请问在Flink框架内部中上游算子通过Netty发送消息到下游时,如何做到消息不会因为网络原因等各种异常情况产生丢失情况?
> >
> > 谢谢


Flink task lifecycle listener/hook/SPI

2022-08-02 Thread Allen Zoo
Hi all,
We went to do some init env setup before the flink task run, And we have
noticed the Task Lifecycle | Apache Flink

doc
described, but we can't find  listener/hook/SPI interface do some custom
init jobs before task  running. Does flink now have relevant interfaces ?