Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Gen Luo
Hi,
Thanks for driving this @Till Rohrmann  . I would
give +1 on reducing the heartbeat timeout and interval, though I'm not sure
if 15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in
Flink is totally relied, reducing the heartbeat can also help JM to find
out faster TaskExecutors in abnormal conditions that can not respond to the
heartbeat requests, e.g., continuously Full GC, though the process of
TaskExecutor is alive and may not be known by the deployment system. Since
there are cases that can benefit from this change, I think it could be done
if it won't break the experience in other scenarios.

If we can address what will block the main threads from processing
heartbeats, or enlarge the GC costs, we can try to get rid of them to have
a more predictable response time of heartbeat, or give some advices to
users if their jobs may encounter these issues. For example, as far as I
know JM of a large scale job will be more busy and may not able to process
heartbeats in time, then we can give a advice that users working with job
large than 5000 tasks should enlarge there heartbeat interval to 10s and
timeout to 50s. The numbers are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be
a main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann  wrote:

> Thanks for sharing these insights.
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operations from the main thread. With FLINK-22483 [2] we will
> get rid of Filesystem accesses to retrieve completed checkpoints. This
> leaves us with one additional file system access from the main thread which
> is the one completing a pending checkpoint. I think it should be possible
> to get rid of this access because as Stephan said it only writes
> information to disk that is already written before. Maybe solving these two
> issues could ease concerns about long pauses of unresponsiveness of Flink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
> [2] https://issues.apache.org/jira/browse/FLINK-22483
>
> Cheers,
> Till
>
> On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:
>
>> Thanks @Till Rohrmann   for starting this
>> discussion
>>
>> Firstly, I try to understand the benefit of shorter heartbeat timeout.
>> IIUC, it will make the JobManager aware of
>> TaskManager faster. However, it seems that only the standalone cluster
>> could benefit from this. For Yarn and
>> native Kubernetes deployment, the Flink ResourceManager should get the
>> TaskManager lost event in a very short time.
>>
>> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
>> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>>
>> Secondly, I am not very confident to decrease the timeout to 15s. I have
>> quickly checked the TaskManager GC logs
>> in the past week of our internal Flink workloads and find more than 100
>> 10-seconds Full GC logs, but no one is bigger than 15s.
>> We are using CMS GC for old generation.
>>
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>>
>>> Hi everyone,
>>>
>>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>>> and blocking operations that were executed in the main threads of Flink's
>>> components. Since then, there were quite some advancements wrt the JVM's
>>> GCs and we also got rid of a lot of blocking calls that were executed in
>>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>>> times in case of a TaskManager loss because the system can only properly
>>> recover after the dead TaskManager has been removed from the scheduler.
>>> Hence, I wanted to propose to change the timeout and interval to:
>>>
>>> heartbeat.timeout: 15s
>>> heartbeat.interval: 3s
>>>
>>> Since there is no perfect solution that fits all use cases, I would
>>> really
>>> like to hear from you what you think about it and how you configure these
>>> heartbeat options. Based on your experience we might actually come up
>>> with
>>> better default values that allow us to be resilient but also to detect
>>> failed components fast. FLIP-185 can be found here [1].
>>>
>>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>>
>>> Cheers,
>>> Till
>>>
>>


Flink TaskManager container got restarted by K8S very frequently

2021-07-21 Thread Fan Xie
Hi Flink Community,

Recently I deployed a Flink cluster(1 JM, 1TM) with k8s standalone mode. Later 
on I notice that the pod which the TM is running on got restarted by k8s very 
frequently (3 times within 10 minutes).  And I didn't see any error log for 
this pod. I tried to increase the container memory in both flink-conf.yaml file 
and k8s yaml file but that didn't help to solve this problem either.  Are there 
any other issues that may cause this problem? My k8s cluster has 5 nodes, each 
node has 4 vcpu and 16GB memory and the TM is not running any job.

flink-conf.yaml:

jobmanager.memory.process.size: 1600Mb
jobmanager.rpc.address: flink-test-job-jobmanager-service
blob.server.port: 6124
query.server.port: 6125

taskmanager.memory.process.size: 2048Mb
taskmanager.numberOfTaskSlots: 1

state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory


heartbeat.interval: 1000
heartbeat.timeout: 5000

task manager yaml file:

spec:
  containers:
  - name: taskmanager
image: ###
imagePullPolicy: Always
command: ["taskmanager.sh"]
args: ["start-foreground"]
env:
  - name: JOB_MANAGER_RPC_ADDRESS
value: flink-test-job-jobmanager-service
resources:
  limits:
cpu: 4
memory: "4096Mi"
  requests:
cpu: 1
memory: "2048Mi"
ports:
- containerPort: 6122
  name: rpc
livenessProbe:
  tcpSocket:
port: 6122
  initialDelaySeconds: 30
  periodSeconds: 60
volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf/
securityContext:
  runAsUser: 
  volumes:
  - name: flink-config-volume
configMap:
  name: test-job-config
  items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties
  - key: log4j-cli.properties
path: log4j-cli.properties



flink-conf.yaml
Description: flink-conf.yaml


Need help of deploying Flink HA on kubernetes cluster

2021-07-21 Thread Dhiru
hi ,    I am very new to flink , I am planning to install Flink HA setup on eks 
cluster with 5 worker nodes . Please can some one point me to right materials 
or direction how to install as well as any sample job which I can run only for 
testing and confirm all things are working as expected .
--Dhirendra 


Re: confirm subscribe to user@flink.apache.org

2021-07-21 Thread Dhiru
 need to be part of flink mailing list 
On Wednesday, July 21, 2021, 11:22:14 PM AST, user-h...@flink.apache.org 
 wrote:  
 
 Hi! This is the ezmlm program. I'm managing the
user@flink.apache.org mailing list.

To confirm that you would like

  userdh...@yahoo.com

added to the user mailing list, please send
a short reply to this address:

  user-sc.1626924133.plfdinihnlgmmkeoefpi-userdhiru=yahoo@flink.apache.org

Usually, this happens when you just hit the "reply" button.
If this does not work, simply copy the address and paste it into
the "To:" field of a new message.

This confirmation serves two purposes. First, it verifies that I am able
to get mail through to you. Second, it protects you in case someone
forges a subscription request in your name.

Please note that ALL Apache dev- and user- mailing lists are publicly
archived.  Do familiarize yourself with Apache's public archive policy at

    http://www.apache.org/foundation/public-archives.html

prior to subscribing and posting messages to user@flink.apache.org.
If you're not sure whether or not the policy applies to this mailing list,
assume it does unless the list name contains the word "private" in it.

Some mail programs are broken and cannot handle long addresses. If you
cannot reply to this request, instead send a message to
 and put the
entire address listed above into the "Subject:" line.


--- Administrative commands for the user list ---

I can handle administrative requests automatically. Please
do not send them to the list address! Instead, send
your message to the correct command address:

To subscribe to the list, send a message to:
  

To remove your address from the list, send a message to:
  

Send mail to the following for info and FAQ for this list:
  
  

Similar addresses exist for the digest list:
  
  

To get messages 123 through 145 (a maximum of 100 per request), mail:
  

To get an index with subject and author for messages 123-456 , mail:
  

They are always returned as sets of 100, max 2000 per request,
so you'll actually get 100-499.

To receive all messages with the same subject as message 12345,
send a short message to:
  

The messages should contain one line or word of text to avoid being
treated as sp@m, but I will ignore their content.
Only the ADDRESS you send to is important.

You can start a subscription for an alternate address,
for example "john@host.domain", just add a hyphen and your
address (with '=' instead of '@') after the command word:


To stop subscription for this address, mail:


In both cases, I'll send a confirmation message to that address. When
you receive it, simply reply to it to complete your subscription.

If despite following these instructions, you do not get the
desired results, please contact my owner at
user-ow...@flink.apache.org. Please be patient, my owner is a
lot slower than I am ;-)

--- Enclosed is a copy of the request I received.

Return-Path: 
Received: (qmail 92990 invoked by uid 99); 22 Jul 2021 03:22:12 -
Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) 
(116.203.196.100)
    by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jul 2021 03:22:12 +
Received: from localhost (localhost [127.0.0.1])
    by spamproc1-he-de.apache.org (ASF Mail Server at 
spamproc1-he-de.apache.org) with ESMTP id 27EC21FF507
    for ; Thu, 22 Jul 2021 03:22:12 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
X-Spam-Flag: NO
X-Spam-Score: 0.098
X-Spam-Level:
X-Spam-Status: No, score=0.098 tagged_above=-999 required=6.31
    tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
    DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, MIME_HTML_MOSTLY=0.1,
    RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled
Authentication-Results: spamproc1-he-de.apache.org (amavisd-new);
    dkim=pass (2048-bit key) header.d=yahoo.com
Received: from mx1-ec2-va.apache.org ([116.203.227.195])
    by localhost (spamproc1-he-de.apache.org [116.203.196.100]) (amavisd-new, 
port 10024)
    with ESMTP id U1jECHmUiO7J for ;
    Thu, 22 Jul 2021 03:22:11 + (UTC)
Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=74.6.129.125; 
helo=sonic309-15.consmr.mail.bf2.yahoo.com; envelope-from=userdh...@yahoo.com; 
receiver= 
Received: from sonic309-15.consmr.mail.bf2.yahoo.com 
(sonic309-15.consmr.mail.bf2.yahoo.com [74.6.129.125])
    by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with 
ESMTPS id 443AABC48D
    for ; Thu, 22 Jul 2021 03:22:11 + (UTC)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=yahoo.com; s=s2048; 
t=1626924125; bh=87+mgLe3FBWRiH96WaynBmel0Zi/LRTFPH6z8EvR8LY=; 
h=Date:From:To:Subject:References:From:Subject:Reply-To; 

Questions about keyed streams

2021-07-21 Thread Dan Hill
Hi.

1) If I use the same key in downstream operators (my key is a user id),
will the rows stay on the same TaskManager machine?  I join in more info
based on the user id as the key.  I'd like for these to stay on the same
machine rather than shuffle a bunch of user-specific info to multiple task
manager machines.

2) What are best practices to reduce the number of shuffles when having
multiple kafka topics with similar keys (user id).  E.g. should I make make
sure the same key writes to the same partition number and then manually
which flink tasks get which kafka partitions?


请教union算子union多个source 流时的健壮性如何保证

2021-07-21 Thread Fisher Xiang
请问大家在使用 union算子union多个 stream时,比如 stream1.union(stream2, stream3, … stream
n) ,其中1到n分别来自不同的MQ 集群MQ1, MQ2… MQ n, 当其中几个集群挂掉时,
整个flink 应用都会重启,那么该场景下怎么可以做到 某几条stream 异常挂掉后,而不影响其他流的 union,让整个 flink继续运行呢?

[image: image.png]

BR
Fisher


Re: Recover from savepoints with Kubernetes HA

2021-07-21 Thread Austin Cawley-Edwards
Hi Thomas,

I've got a few questions that will hopefully help get to find an answer:

What job properties are you trying to change? Something like parallelism?

What mode is your job running in? i.e., Session, Per-Job, or Application?

Can you also describe how you're redeploying the job? Are you using the
Native Kubernetes integration or Standalone (i.e. writing k8s  manifest
files yourself)? It sounds like you are using the Flink CLI as well, is
that correct?

Thanks,
Austin

On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm  wrote:

> Hey,
>
> we have some application clusters running on Kubernetes and explore the HA
> mode which is working as expected. When we try to upgrade a job, e.g.
> trigger a savepoint, cancel the job and redeploy, Flink is not restarting
> from the savepoint we provide using the -s parameter. So all state is lost.
>
> If we just trigger the savepoint without canceling the job and redeploy
> the HA mode picks up from the latest savepoint.
>
> But this way we can not upgrade job properties as they were picked up from
> the savepoint as it seems.
>
> Is there any advice on how to do upgrades with HA enabled?
>
> Flink version is 1.12.2.
>
> Thanks for your help.
>
> Kr thomas
>


回复:请教on yarn per job 作业采集日志进行监控方案

2021-07-21 Thread comsir
直接配置influxdb reporter,用gafana大盘展示,非常方便一台机器即可。




--原始邮件--
发件人:
"user-zh"   
 


Re: Kafka data sources, multiple interval joins and backfilling

2021-07-21 Thread David Morávek
Hi Dan,

unfortunately Flink currently provides no source level synchronization,
except for Kinesis [1], so it's easy to run into large states, when
processing historical data.

There is an on-going effort, to provide a generic watermark-based alignment
of FLIP-27 sources [2], that will most likely help to mitigate the issue.

[1] https://issues.apache.org/jira/browse/FLINK-10886
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

Best,
D.

On Wed, Jul 21, 2021 at 8:43 AM JING ZHANG  wrote:

> Hi Dan,
> > I've tried playing around with parallelism and resources.  It does help.
> Glad to hear your problem is solved .
>
> > Does Flink have special logic with the built in interval join code that
> impacts how kafka data sources are read?
> No. If you said the way I mentioned in the last email, I mean to add
> control the consumption order of each source in a custom Kafka connector.
>
> Dan Hill  于2021年7月21日周三 下午2:10写道:
>
>> Thanks JING and Caizhi!
>>
>> Yea, I've tried playing around with parallelism and resources.  It does
>> help.
>>
>> We have our own join operator that acts like an interval join (with fuzzy
>> matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
>> after the internal interval join code.  Does Flink have special logic with
>> the built in interval join code that impacts how kafka data sources are
>> read?
>>
>>
>>
>> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG  wrote:
>>
>>> Hi Dan,
>>> You are right. In interval join, if one of input stream is far ahead of
>>> the other one, its data would be buffered into state until watermark of the
>>> other input stream catches up.
>>> This is a known issue of interval join. And this situation is even worse
>>> in your example because of the following reasons:
>>> 1. Running as backfills
>>> 2. There are cascading interval joins in the topology
>>>
>>> There is a hack way to walk around, hope it helps. Control the consume
>>> data of each source based on the following sequence:
>>> 1. Consume the larger data source in the same join after the smaller
>>> source consumption finished.
>>> 2. Consume the source in the following join after the previous join
>>> finished
>>>
>>> BTW: Please double check you use interval join instead of regular join,
>>> this would happen if compare two field with regular timestamp type in join
>>> condition instead of time attribute.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Dan Hill  于2021年7月21日周三 上午4:25写道:
>>>
 Hi.  My team's flink job has cascading interval joins.  The problem I'm
 outlining below is fine when streaming normally.  It's an issue with
 backfills.  We've been running into a bunch of backfills to evaluate the
 job over older data.

 When running as backfills, I've noticed that sometimes one of
 downstream kafka inputs will read in a lot of data from it's kafka source
 before the upstream kafka sources makes much progress.  The downstream
 kafka source gets far ahead of the interval join window constrained by the
 upstream sources.  This appears to cause the state to grow unnecessarily
 and has caused checkpoint failures.  I assumed there was built in Flink
 code to not get too far ahead for a single downstream kafka source.
 Looking through the code, I don't think this exists.

 Is this a known issue with trying to use Flink to backfill?  Am I
 misunderstanding something?

 Here's an example flow chart for a cascading join job.  One of the
 right kafka data sources goes 10x-100x more records than the left data
 sources and causes state to grow.
 [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]

>>>


Re: Stateful Functions Status

2021-07-21 Thread Igal Shilman
Not yet unfortunately,

But I'd be very much happy to work with the community on a JS SDK.


On Tue, Jul 20, 2021 at 4:32 PM Omid Bakhshandeh 
wrote:

> Igal,
>
> Thanks for the answers. Is there any JS SDK available?
>
> Best,
> --Omid
>
> On Tue, Jul 20, 2021 at 10:23 AM Igal Shilman  wrote:
>
>> Hi Omid,
>>
>> I'm glad to hear that you are evaluating StateFun in your company! let me
>> try to answer your questions:
>>
>> 1. In version 2.x, StateFun only supported messages of type
>> com.google.protobuf.Any, and we had a tiny optimization that
>> reads type hints and unpacked the real message out of the Any message.
>> Version 3.x removed protobuf from the API surface (hence no more Any) but
>> while Protobuf is not a requirement,  you can still use Protobuf to send
>> and receive messages by using [1][2].
>>
>> 2. The benefits of gRPC in StateFun can be a long discussion, since
>> currently StateFun does use Protobuf over HTTP/2 (if the remote function's
>> app server supports that), and with a built-in backpressure mechanism
>> (backpressure comes from Flink).
>> Having said that, we are currently working on making the transport
>> pluggable, and gRPC based transport is a natural candidate.
>>
>> 3. I think this is a very interesting point, and StateFun doesn't promote
>> any specific paradigm here.
>>
>> The basic building block is a function, a specific function is uniquely
>> addressed by providing a namespace, a function type, and an id.
>> a group of functions that implement a specific API can share a namespace
>> prefix, for example "com.foo.api.auth/".
>> You can perhaps (by convention) have a public function per namespace that
>> exposes some sort of an API (list of messages that it supports)
>> And it can dispatch the messages internally to the various functions.
>>
>> Alternatively, a "client" library for your auth API can be a Python class
>> with named methods that accepts a StateFun context, and translates a method
>> invocation to a message sent to the corresponding function. The clients
>> of your functions will basically invoke methods on an object.
>>
>> Perhaps a generated interface described by gRPC, is a good idea to
>> explore further :-)
>>
>> 4. I'm not sure what KNative example you are looking for, as StateFun
>> remote functions do not require any specific type of deployment, they are
>> like regular Flask services.
>>
>> Looking forward to learning what you've built :-)
>> Good luck!
>> Igal.
>>
>> [1]
>> https://github.com/apache/flink-statefun-playground/blob/release-3.0/python/showcase/showcase/showcase_custom_types.py#L28
>> [2]
>> https://github.com/apache/flink-statefun-playground/blob/release-3.0/python/showcase/showcase/__main__.py#L89,L91
>>
>>
>>
>> On Tue, Jul 20, 2021 at 3:07 AM Omid Bakhshandeh <
>> omidbakhshan...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We are evaluating Flink Stateful Functions in our company and we are
>>> trying to see if it fits our needs. I'm hoping to get some help from the
>>> community as we do this.
>>>
>>> There are a couple of primary questions that can speed up our process:
>>>
>>> 1- It seems in version 2.2.0, in the Python SDK, it was possible to have
>>> messages with a specific type because everything was Protobuf but in 3.0.0
>>> that is not possible and there is always some boilerplate to convert
>>> messages.
>>>
>>> @functions.bind("showcase/messaging")
 def messaging(context: Context, message: Message):
>>>
>>> vs
>>>
 def greet(context, greet_request: GreetRequest):
>>>
>>>
>>> Is that right?
>>>
>>>
>>> 2- Is GRPC and maybe more efficient protocols part of the roadmap in the
>>> near future?
>>>
>>> 3- All of the examples I found on the Python SDK, all the function has
>>> been written in a single file with no specific structure (e.g.
>>> implementing an API or ...), is there a better way to create Functions in a
>>> more structured way? How can one share these functions within teams and
>>> other projects? It would be great if something like GRPC services and API
>>> exists for functions so other users can get into the dev cycle.
>>>
>>> 4- Is there any KNative example?
>>>
>>> I hope these questions make sense.
>>> Thanks,
>>> --
>>> 
>>> Omid
>>>
>>
>
> --
> ---
> Omid
>


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Till Rohrmann
Thanks for sharing these insights.

I think it is no longer true that the ResourceManager notifies the
JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.

Given the GC pauses, would you then be ok with decreasing the heartbeat
timeout to 20 seconds? This should give enough time to do the GC and then
still send/receive a heartbeat request.

I also wanted to add that we are about to get rid of one big cause of
blocking I/O operations from the main thread. With FLINK-22483 [2] we will
get rid of Filesystem accesses to retrieve completed checkpoints. This
leaves us with one additional file system access from the main thread which
is the one completing a pending checkpoint. I think it should be possible
to get rid of this access because as Stephan said it only writes
information to disk that is already written before. Maybe solving these two
issues could ease concerns about long pauses of unresponsiveness of Flink.

[1] https://issues.apache.org/jira/browse/FLINK-23216
[2] https://issues.apache.org/jira/browse/FLINK-22483

Cheers,
Till

On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:

> Thanks @Till Rohrmann   for starting this discussion
>
> Firstly, I try to understand the benefit of shorter heartbeat timeout.
> IIUC, it will make the JobManager aware of
> TaskManager faster. However, it seems that only the standalone cluster
> could benefit from this. For Yarn and
> native Kubernetes deployment, the Flink ResourceManager should get the
> TaskManager lost event in a very short time.
>
> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>
> Secondly, I am not very confident to decrease the timeout to 15s. I have
> quickly checked the TaskManager GC logs
> in the past week of our internal Flink workloads and find more than 100
> 10-seconds Full GC logs, but no one is bigger than 15s.
> We are using CMS GC for old generation.
>
>
> Best,
> Yang
>
> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>
>> Hi everyone,
>>
>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>> and blocking operations that were executed in the main threads of Flink's
>> components. Since then, there were quite some advancements wrt the JVM's
>> GCs and we also got rid of a lot of blocking calls that were executed in
>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>> times in case of a TaskManager loss because the system can only properly
>> recover after the dead TaskManager has been removed from the scheduler.
>> Hence, I wanted to propose to change the timeout and interval to:
>>
>> heartbeat.timeout: 15s
>> heartbeat.interval: 3s
>>
>> Since there is no perfect solution that fits all use cases, I would really
>> like to hear from you what you think about it and how you configure these
>> heartbeat options. Based on your experience we might actually come up with
>> better default values that allow us to be resilient but also to detect
>> failed components fast. FLIP-185 can be found here [1].
>>
>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>
>> Cheers,
>> Till
>>
>


退订

2021-07-21 Thread huangxinbao7


退订 

Recover from savepoints with Kubernetes HA

2021-07-21 Thread Thms Hmm
Hey,

we have some application clusters running on Kubernetes and explore the HA
mode which is working as expected. When we try to upgrade a job, e.g.
trigger a savepoint, cancel the job and redeploy, Flink is not restarting
from the savepoint we provide using the -s parameter. So all state is lost.

If we just trigger the savepoint without canceling the job and redeploy the
HA mode picks up from the latest savepoint.

But this way we can not upgrade job properties as they were picked up from
the savepoint as it seems.

Is there any advice on how to do upgrades with HA enabled?

Flink version is 1.12.2.

Thanks for your help.

Kr thomas


Re: 请教on yarn per job 作业采集日志进行监控方案

2021-07-21 Thread cyril cui
source和sink端监控 input/output qps波动,效果还可以,方案也比较成熟

yihan xu  于2021年7月21日周三 下午12:48写道:

> 原本作业基本处于半裸奔的状态,最近线上出了一次小事故后,在考虑如何实时采集作业日志或者metric再配置告警。
> 网上初步搜了一下,好像就是prometheus+grafana或者elk。
>
> 请教各位大佬的项目目前都是用什么方式,我们小公司就我一个人搞flink,半路出家水平也有限,请大佬们推荐个易维护坑少点的方式?谢谢。
>
> 发自我的iPhone
>
>
> 发自我的iPhone


Re: Kafka data sources, multiple interval joins and backfilling

2021-07-21 Thread JING ZHANG
Hi Dan,
> I've tried playing around with parallelism and resources.  It does help.
Glad to hear your problem is solved .

> Does Flink have special logic with the built in interval join code that
impacts how kafka data sources are read?
No. If you said the way I mentioned in the last email, I mean to add
control the consumption order of each source in a custom Kafka connector.

Dan Hill  于2021年7月21日周三 下午2:10写道:

> Thanks JING and Caizhi!
>
> Yea, I've tried playing around with parallelism and resources.  It does
> help.
>
> We have our own join operator that acts like an interval join (with fuzzy
> matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
> after the internal interval join code.  Does Flink have special logic with
> the built in interval join code that impacts how kafka data sources are
> read?
>
>
>
> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG  wrote:
>
>> Hi Dan,
>> You are right. In interval join, if one of input stream is far ahead of
>> the other one, its data would be buffered into state until watermark of the
>> other input stream catches up.
>> This is a known issue of interval join. And this situation is even worse
>> in your example because of the following reasons:
>> 1. Running as backfills
>> 2. There are cascading interval joins in the topology
>>
>> There is a hack way to walk around, hope it helps. Control the consume
>> data of each source based on the following sequence:
>> 1. Consume the larger data source in the same join after the smaller
>> source consumption finished.
>> 2. Consume the source in the following join after the previous join
>> finished
>>
>> BTW: Please double check you use interval join instead of regular join,
>> this would happen if compare two field with regular timestamp type in join
>> condition instead of time attribute.
>>
>> Best,
>> JING ZHANG
>>
>> Dan Hill  于2021年7月21日周三 上午4:25写道:
>>
>>> Hi.  My team's flink job has cascading interval joins.  The problem I'm
>>> outlining below is fine when streaming normally.  It's an issue with
>>> backfills.  We've been running into a bunch of backfills to evaluate the
>>> job over older data.
>>>
>>> When running as backfills, I've noticed that sometimes one of downstream
>>> kafka inputs will read in a lot of data from it's kafka source before the
>>> upstream kafka sources makes much progress.  The downstream kafka source
>>> gets far ahead of the interval join window constrained by the upstream
>>> sources.  This appears to cause the state to grow unnecessarily and has
>>> caused checkpoint failures.  I assumed there was built in Flink code to not
>>> get too far ahead for a single downstream kafka source.  Looking through
>>> the code, I don't think this exists.
>>>
>>> Is this a known issue with trying to use Flink to backfill?  Am I
>>> misunderstanding something?
>>>
>>> Here's an example flow chart for a cascading join job.  One of the right
>>> kafka data sources goes 10x-100x more records than the left data sources
>>> and causes state to grow.
>>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>>>
>>


Re: Kafka data sources, multiple interval joins and backfilling

2021-07-21 Thread Dan Hill
Thanks JING and Caizhi!

Yea, I've tried playing around with parallelism and resources.  It does
help.

We have our own join operator that acts like an interval join (with fuzzy
matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
after the internal interval join code.  Does Flink have special logic with
the built in interval join code that impacts how kafka data sources are
read?



On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG  wrote:

> Hi Dan,
> You are right. In interval join, if one of input stream is far ahead of
> the other one, its data would be buffered into state until watermark of the
> other input stream catches up.
> This is a known issue of interval join. And this situation is even worse
> in your example because of the following reasons:
> 1. Running as backfills
> 2. There are cascading interval joins in the topology
>
> There is a hack way to walk around, hope it helps. Control the consume
> data of each source based on the following sequence:
> 1. Consume the larger data source in the same join after the smaller
> source consumption finished.
> 2. Consume the source in the following join after the previous join
> finished
>
> BTW: Please double check you use interval join instead of regular join,
> this would happen if compare two field with regular timestamp type in join
> condition instead of time attribute.
>
> Best,
> JING ZHANG
>
> Dan Hill  于2021年7月21日周三 上午4:25写道:
>
>> Hi.  My team's flink job has cascading interval joins.  The problem I'm
>> outlining below is fine when streaming normally.  It's an issue with
>> backfills.  We've been running into a bunch of backfills to evaluate the
>> job over older data.
>>
>> When running as backfills, I've noticed that sometimes one of downstream
>> kafka inputs will read in a lot of data from it's kafka source before the
>> upstream kafka sources makes much progress.  The downstream kafka source
>> gets far ahead of the interval join window constrained by the upstream
>> sources.  This appears to cause the state to grow unnecessarily and has
>> caused checkpoint failures.  I assumed there was built in Flink code to not
>> get too far ahead for a single downstream kafka source.  Looking through
>> the code, I don't think this exists.
>>
>> Is this a known issue with trying to use Flink to backfill?  Am I
>> misunderstanding something?
>>
>> Here's an example flow chart for a cascading join job.  One of the right
>> kafka data sources goes 10x-100x more records than the left data sources
>> and causes state to grow.
>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>>
>