Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <stevenz...@gmail.com> wrote:
>
>>
>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
>> setting in Flink UI. But I saw akka timeout of 10 s for metric query
>> service.
We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
setting in Flink UI. But I saw akka timeout of 10 s for metric query
service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file?
does it always use default 10 s value?
2) could this cause
We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
setting in Flink UI. But I saw akka timeout of 10 s for metric query
service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file?
does it always use default 10 s value?
2) could this cause
Hi ,
I was using Chaos Monkey to test Flink's behavior against frequent killing
of task manager nodes. I found that stopped/disposed StreamTask got
retained by java finalizer. It is kind like a memory leak. Since each
StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for
8-CPU
just to close the thread. akka death watch was triggered by high GC pause,
which is caused by memory leak in our code during Flink job restart.
noted that akka.ask.timeout wasn't related to akka death watch, which Flink
has documented and linked.
On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi Steven,
>>>
>>> thanks for reporting this issue.
>>> Looping in Till who's more fam
to hear that you could figure things out Steven.
>>
>> You are right. The death watch is no longer linked to the akka ask
>> timeout, because of FLINK-6495. Thanks for the feedback. I will correct the
>> documentation.
>>
>> Cheers,
>> Till
>>
>> On
cently and is already committed
> in the master. So with the next release 1.4 it will properly pick up the
> right timeout settings.
>
> Just out of curiosity, what's the instability issue you're observing?
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <ste
restart cause the timeouts?
>
>
> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Bowen,
>>
>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
>> pause and akka timeout was happening. So maybe memory allocatio
ch memory did you assign to TaskManager? How much the your CPU
> utilization when your taskmanager is considered 'killed'?
>
> Bowen
>
>
>
> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till,
>>
>> Once our job was restarted
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 9353
expired before completing
I might know why this happened in the first place. Our sink operator does
synchronous HTTP post, which had a 15-mint latency spike when this all
started. This could block flink threads and
re light onto it, but maybe you
> can help us identifying the problem by providing logs at DEBUG level
> (did akka report any connection loss and gated actors? or maybe some
> other error in there?) or even a minimal program to reproduce
ou can for example set the checkpoint interval to
> 2 mins and set the pause to 1.5 mins. That way, if a checkpoint takes
> longer than usual, the next one will still wait for 1.5 mins after the
> previous one completed or expired, giving the job time to catch up.
>
> Best,
> S
ints still in progress.
Or is the min pause just allowing Flink to catch up in-flight msgs in
various queues/buffers? is that the cascading impact?
On Sat, Dec 2, 2017 at 2:10 PM, Steven Wu <stevenz...@gmail.com> wrote:
> Stephan, thanks a lot for the explanation. Now everything makes sens
Here is my understanding of how job submission works in Flink. When
submitting a job to job manager via REST API, we provide a entry class. Job
manager then evaluate job graph and ship serialized operators to task
manager. Task manager then open operators and run tasks.
My app would typically
ed to only
> restart the JM container.
>
> Cheers,
> Till
>
> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till,
>>
>> We ran into the same issue. It started with high GC pause that caused
>> jobmanager to lose
ined. This is also not a
> desired behaviour.
>
> With Flink 1.5 the problem with quarantining should be gone since we don't
> rely anymore on Akka's death watch and instead use our own heartbeats.
>
> Cheers,
> Till
>
> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stev
I have a process function defined with these interfaces
public class MyProcessFunction extends ProcessFunction
implements CheckpointedFunction, ProcessingTimeCallback {...}
In snapshotState() method, I want to close files and emit the metadata
about the closed files to downstream operator.
; public void onTimer(long timestamp, OnTimerContext ctx, Collector<
> Integer> out) throws Exception {
>// …
> }
>
> Piotrek
>
> On 11 Jun 2018, at 01:07, Steven Wu wrote:
>
> I have a process function defined with these interfaces
>
> public cla
’ed. From the provided stats, I cannot say if maybe the JM is also
>>> holding references to too many ExecutionVertexes, but that would be a
>>> different story.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 29.06.2018 um 01:29 schrieb St
> but soon later, no results produced, and flink seems busy doing
something forever.
Jinhua, don't know if you have checked these things. if not, maybe worth a
look.
have you tried to do a thread dump?
How is the GC pause?
do you see flink restart? check the exception tab in Flink web UI for
Aljoscha/Stefan,
if incremental checkpoint is enabled, I assume the "checkpoint size" is
only the delta/incremental size (not the full state size), right?
Thanks,
Steven
On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek
wrote:
> Hi,
>
> I'm afraid there is currently no
There is also a discussion of side input
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
I would load the smaller data set as static reference data set. Then you
can just do single source streaming of the larger data set.
On Wed, Jan 31, 2018 at 1:09 AM,
cle
> of static context can be tricky (when to close it and release it’s
> resources).
>
> The questions is, whether you really need a static context?
>
> Thanks,
> Piotrek
>
>
> > On 21 Dec 2017, at 07:53, Steven Wu <stevenz...@gmail.com> wrote:
> >
Xiaochuan,
We are doing exactly as you described. We keep the injector as a global
static var.
But we extend from FlinkJobManager and FlinkTaskManager to override main
method and initialize the injector (and other things) during JVM startup,
which does cause tight code coupling. It is a little
Hi,
After this error/exception, it seems that taskmanager never connects to
jobmanager anymore. Job stuck in failed state because there is not enough
slots to recover the job.
let's assume there was a temp glitch btw jobmanager and zk. would it cause
such a permanent failure in Flink?
I
from Flink UI on jobmanager, sometimes I saw taskmanager connected and
heartbeat time got updated.
but then sometimes the taskmanager page become blank. maybe disconnected.
On Wed, Apr 11, 2018 at 1:31 PM, Steven Wu <stevenz...@gmail.com> wrote:
> Hi,
>
> After this error/exce
Syed, I am very curious about the motivation if you can share.
On Wed, Apr 11, 2018 at 1:35 AM, Chesnay Schepler
wrote:
> Hello,
>
> there is no way to manually trigger checkpoints or configure irregular
> intervals.
>
> You will have to modify the CheckpointCoordinator
>
; something like this?
>
>
> On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Xiaochuan,
>>
>> We are doing exactly as you described. We keep the injector as a global
>> static var.
>>
>> But we extend from FlinkJobManag
hould call static synchronized method `initializeGuiceContext`.
>> This method could search the classpath for classes with some specific
>> annotations, for example `@MyInitializationHook` and install/add all of
>> such hooks before actually using Guice?
>>
>> Piotrek
>&g
Till,
We ran into the same issue. It started with high GC pause that caused
jobmanager to lose zk conn and leadership and caused jobmanager to
quarantine taskmanager in akka. Once quarantined, akka association btw
jobmanager and taskmanager is locked forever.
Your suggestion of "
for rate limiting, would quota at Kafka brokers help?
On Tue, Nov 6, 2018 at 10:29 AM Ning Shi wrote:
> On Tue, Nov 06, 2018 at 07:44:50PM +0200, Nicos Maris wrote:
> > Ning can you provide another example except for rate limiting?
>
> Our main use case and concern is rate limiting because
FYI, here is the jira to support timeout in savepoint REST api
https://issues.apache.org/jira/browse/FLINK-10360
On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal wrote:
> Great, thanks for sharing that info.
>
> Gagan
>
> On Thu, Nov 1, 2018 at 1:50 PM Yun Tang wrote:
>
>> Haha, actually
Vipul,
it sounds like you don't want to checkpoint timer at all. since 1.7, we can
configure timer state backend (HEAP/ROCKSDB). I guess a new option (NONE)
can be added to support such requirement.
but it is interesting to see your reasons. can you elaborate?
thanks,
Steven
On Thu, Jan 17,
Flink has two backpressure related metrics: “lastCheckpointAlignmentBuffered”
and “checkpointAlignmentTime”. But they seems to always report zero.
Similar thing in web UI, “Buffered During Alignment” always shows zero,
even backpressure testing shows high backpressure for some operators. Has
s is not final, failure is not fatal: it is the courage to continue
> that counts. *
> *- Winston Churchill - *
>
>
> On Wed, Nov 21, 2018 at 1:50 PM Steven Wu wrote:
>
>>
>> Flink has two backpressure related metrics: “
>> lastCheckpointAlignmentBuffered” and “checkpoi
We are trying out Flink 1.7.0. We always get this exception when submitting
a job with external checkpoint via REST. Job parallelism is 1,600. state
size is probably in the range of 1-5 TBs. Job is actually started. Just
REST api returns this failure.
If we submitting the job without external
t;> submitting the job programmatically). I'd love to see a solution to this if
>> one exists!
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Thu, Jan 10, 2019 at 2:58 PM Steven Wu wrote:
>>
>>> We are trying out Flink 1.7.0. We always get this
.
Otherwise, could you provide the JM log for the further investigation?
Best,
Andrey
On Wed, Jan 23, 2019 at 10:06 PM Steven Wu wrote:
> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got
it might be related to this issue
https://issues.apache.org/jira/browse/FLINK-10774
On Tue, Mar 26, 2019 at 4:35 PM Fritz Budiyanto wrote:
> Hi All,
>
> We're using Flink-1.4.2 and noticed many dangling connections to Kafka
> after job deletion/recreation. The trigger here is Job
We have observe that sometimes job stuck in suspended state, and no job
restart/recover were attempted once job is suspended.
* it is a high-parallelism job (like close to 2,000)
* there were a few job restarts before this
* there were high GC pause during the period
* zookeeper timeout. probably
hould be published in the next days.
>
> Cheers,
> Till
>
> On Sat, Mar 2, 2019 at 12:47 AM Steven Wu wrote:
>
>> We have observe that sometimes job stuck in suspended state, and no job
>> restart/recover were attempted once job is suspended.
>> * it is a high
When we start a high-parallelism (1,600) job without any
checkpoint/savepoint, the job struggled to be deployed. After a few
restarts, it eventually got deployed and was running fine after the initial
struggle. jobmanager was very busy. Web UI was very slow. I saw these two
exceptions/failures
sion we make this time, I'd
>> suggest to make it final and document in our release note explicitly.
>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>> the change on default restart delay and we'd better learn from it this
>> time. Thanks.
>&
+1 on what Zhu Zhu said.
We also override the default to 10 s.
On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu wrote:
> In our production, we usually override the restart delay to be 10 s.
> We once encountered cases that external services are overwhelmed by
> reconnections from frequent restarted
etrics you add in you
> customized restart strategy?
>
> Thanks,
> Zhu Zhu
>
> Steven Wu 于2019年9月20日周五 上午7:11写道:
>
>> We do use config like "restart-strategy:
>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>> metrics than the
We do use config like "restart-strategy:
org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
metrics than the Flink provided ones.
On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu wrote:
> Thanks everyone for the input.
>
> The RestartStrategy customization is not recognized as a
d. If grained recovery (feature added 1.9.0) is enabled, the graph
> would not be restarted when task failures happen and the "fullRestart"
> value will not increment in such cases.
>
> I'd appreciate if you can help with these questions and we can make better
> decisi
pects
> fine grained recovery.
>
> [1] https://issues.apache.org/jira/browse/FLINK-14164
>
> Thanks,
> Zhu Zhu
>
> Steven Wu 于2019年9月24日周二 上午6:41写道:
>
>>
>> When we setup alert like "fullRestarts > 1" for some rolling window, we
>> want to use
Gary, FLIP-27 seems to get omitted in the 2nd update. below is the info
from update #1.
- FLIP-27: Refactor Source Interface [20]
- FLIP accepted. Implementation is in progress.
On Fri, Nov 1, 2019 at 7:01 AM Gary Yao wrote:
> Hi community,
>
> Because we have approximately one month
I have this DAG screenshot from Flink UI.
[image: image.png]
I am wondering why is the middle "icebergsink-writer" operator not chained
with the first operator chain?
Or an equivalent question is why is forward partitioner used here?
The first operator chain are all map functions after source.
by the user.
>
> Best,
> Guowei
>
>
> Steven Wu 于2019年11月23日周六 上午5:17写道:
>
>>
>> I have this DAG screenshot from Flink UI.
>> [image: image.png]
>> I am wondering why is the middle "icebergsink-writer" operator not
>> chained with th
ra/browse/FLINK-14164
>
> Thanks,
> Zhu Zhu
>
> Steven Wu 于2019年9月25日周三 上午2:30写道:
>
>> Zhu Zhu,
>>
>> Sorry, I was using different terminology. yes, Flink meter is what I was
>> talking about regarding "fullRestarts" for threshold based aler
We build and publish flink-dist locally. But the source jar turns out
empty. Other source jars (like flink-core) are good. Anyone else
experienced similar problem?
Thanks,
Steven
n 14/04/2020 06:42, Steven Wu wrote:
> >
> > We build and publish flink-dist locally. But the source jar turns out
> > empty. Other source jars (like flink-core) are good. Anyone else
> > experienced similar problem?
> >
> > Thanks,
> > Steven
>
>
>
This is a stateful stream join application using RocksDB state backend with
incremental checkpoint enabled.
-
JVM heap usage is pretty similar. Main difference is in non-heap usage,
probably related to RocksDB state.
-
Also observed cgroup memory failure count showing up in the
Jiahui,
Based on my reading on the doc, for containerized environment, it is
probably better to set `taskmanager.memory.process.size` to the container
memory limit.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-process-size
Then I typically set
Manish, might be related to this bug, which is fixed in 1.10.1.
https://issues.apache.org/jira/browse/FLINK-14316?focusedCommentId=16946580=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16946580
On Mon, May 4, 2020 at 5:52 AM Manish G
wrote:
> Hi,
>
> I have set up
ources of any bundled modules.
>
> How were you building the jars, and were you making any modifications to
> the Flink source?
>
> On 14/04/2020 15:07, Steven Wu wrote:
>
> flink-dist is a uber/shadow jar. before 1.10, its source jar contains the
> source files for the
Guowei,
Thanks a lot for the proposal and starting the discussion thread. Very
excited.
For the big question of "Is the sink an operator or a topology?", I have a
few related sub questions.
* Where should we run the committers?
* Is the committer parallel or single parallelism?
* Can a single
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.
On Sat, Sep 12, 2020
Yun, you mentioned that checkpoint also supports rescale. I thought the
recommendation [1] is to use savepoint for rescale.
[1]
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink
On Tue, May 26, 2020 at 6:46 AM Joey Pereira wrote:
> Following up: I've put
In a slightly different variation of sequence (checkpoint x, savepoint y,
redeploy/restart job from savepoint y, checkpoint x+1), checkpoint x+1
builds the incremental diff on savepoint y, right?
On Sun, Jul 5, 2020 at 8:08 PM Steven Wu wrote:
>
> In this sequence of (checkpoint x, savep
In this sequence of (checkpoint x, savepoint y, checkpoint x+1), does
checkpoint x+1 build the incremental diff based on checkpoint x or
savepoint y?
Thanks,
Steven
Internally, we have our own ConfigurableCredentialsProvider. Based on the
config in core-site.xml, it does assume-role with the proper IAM
credentials using STSAssumeRoleSessionCredentialsProvider. We just need to
grant permission for the instance credentials to be able to assume the IAM
role for
The configuration page has this "backup" section. Can I assume that they
are public interfaces? The name "backup" is a little confusing to me. There
are some important pipeline and execution checkpointing configs here.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#backup
just a data point. we actually enabled all RocksDb metrics by default
(including very large jobs in terms of parallelism and state size). We
didn't see any significant performance impact. There is probably a small
impact. At least, it didn't jump out for our workload.
On Tue, Dec 8, 2020 at 9:00
This is a performance optimization in JVM when the same exception is
thrown too frequently. You can set `-XX:-OmitStackTraceInFastThrow` to
disable the feature. You can typically find the full stack trace in the log
before the optimization kicks in.
On Sat, Dec 12, 2020 at 2:05 AM Till Rohrmann
> things are actually moving pretty smoothly
Do you mean the job is otherwise healthy? like there is no lag etc.
Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?
On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley wrote:
> Hi,
>
> We're running a job with on the order of
if you are running out of direct buffer, you will see
"java.lang.OutOfMemoryError:
Direct buffer memory"
On Wed, Dec 16, 2020 at 9:47 AM Rex Fenley wrote:
> Thanks for the reply. If what I'm understanding is correct there's no
> chance of an OOM, but since direct memory is for I/O, it being
not a solution, but a potential workaround. Maybe rename the operator uid
so that you can continue to leverage allowNonRestoredState?
On Thu, Oct 29, 2020 at 7:58 AM Peter Westermann
wrote:
> Does that actually allow removing a state completely (vs. just modifying
> the values stored in state)?
;are checkpointed.
>
>
> Steven Wu [via Apache Flink User Mailing List archive.] <
> ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道:
>
> >
> > I can see the benefits of control flow. E.g., it might help the old (and
> > inactive) FLIP-17 side
>>>> * Iteration: When a certain condition is met, we might want to
>>>>>> signal downstream operators with an event
>>>>>> * Mini-batch assembling: Flink currently uses special watermarks
>>>>>> for indicating the end of each mini-bat
mainstream, it would be helpful to have an event
>>>>> signaling the finishing of the bootstrap.
>>>>>
>>>>> ## Dynamic REST controlling
>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>> think it's
I am not sure if we should solve this problem in Flink. This is more like a
dynamic config problem that probably should be solved by some configuration
framework. Here is one post from google search:
Qingsheng,
For the scenario described by Mason in the original email, I think it is
safe to remove split/topic upson recovery without worrying about data loss,
since it is a conscious choice by the user to switch to a different set of
topics.
I thought the problem is that KafkaSourceReader just
76 matches
Mail list logo