ple *FileInputSplit*s. And you may need to
> define in your *InputFormat* on how to process the new *InputSplit*.
>
> Thanks,
> Zhu Zhu
>
> Lu Niu 于2019年8月15日周四 上午12:26写道:
>
>> Hi,
>>
>> I have a data set backed by a directory of files in which file nam
Hi,
I have a data set backed by a directory of files in which file names are
meaningful.
folder1
+-file01
+-file02
+-file03
+-file04
I want to control the file assignments in my flink application. For
example, when parallelism is 2, worker 1 get file01 and file02 to
in your case.
> The underlying is ParallelIteratorInputFormat and its processing is not
> matched to a certain subtask index.
>
> Thanks,
> Zhu Zhu
>
> Lu Niu 于2019年8月16日周五 上午12:48写道:
>
>> Hi, Zhu
>>
>> Thanks for reply! I found using SplittableIterator i
Hi, Team
I am implementing a custom InputFormat. Shall I
implement CheckpointableInputFormat interface? If I don't, does that mean
the whole job has to restart given only one task fails? I ask because I
found all InputFormat implements CheckpointableInputFormat, which makes me
confused. Thank
Hi,
When run flink application in yarn mode, is there a way to limit maximum
cpu usage per TaskManager?
I tried this application with just source and sink operator. parallelism of
source is 60 and parallelism of sink is 1. When running in default config,
there are 60 TaskManager assigned. I
t; https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
>>
>>
>>
>> Best,
>>
>> Victor
>>
>>
>>
>> *From: *Vino Yang
>> *Date: *Wednesday, 6 November 2019 at 4:26 PM
>> *To: *L
Hi,
I read that rocksDB memory is managed off heap. Is there a way to monitor
the memory usage there then?
Best
Lu
rics
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Lu Niu
> *Date: *Friday, November 8, 2019 at 8:18 AM
> *To: *user
> *Subject: *Monitor rocksDB memory usage
>
>
>
> Hi,
>
>
>
> I read that rocksDB memory is managed off heap. Is there a way to monitor
> the memory usage there then?
>
>
>
> Best
>
> Lu
>
Hi, flink users
I have some question regarding memory allocation. According to doc,
containerized.heap-cutoff-ratio means:
```
Percentage of heap space to remove from containers (YARN / Mesos), to
compensate for other JVM memory usage
```
However, I find cutoff memory is actually treated as
t;
> Best, Fabian
>
> Am Do., 5. Sept. 2019 um 19:01 Uhr schrieb Lu Niu :
>
>> Hi, Team
>>
>> I am implementing a custom InputFormat. Shall I
>> implement CheckpointableInputFormat interface? If I don't, does that mean
>> the whole job has to restart
time - $event_time - ($source_current_time -
> $event_time) = $sink_current_time - $source_current_time as the latency of
> end to end。
>
> Oscar Westra van Holthe - Kind 于2020年3月30日周一
> 下午5:15写道:
>
>> On Mon, 30 Mar 2020 at 05:08, Lu Niu wrote:
>>
>>> $current_
Hi, flink users
Did anyone encounter such error? The error comes from S3AFileSystem. But
there is no capacity issue on any disk. we are using hadoop 2.7.1.
```
Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not open output stream for state backend
at
nd there is some relative page[1], could you please make sure
> there is enough space on the local dis.
>
> [1]
> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
> Best,
> Congxian
&
Hi, Flink users
We notice sometimes async checkpointing can be extremely slow, leading to
checkpoint timeout. For example, For a state size around 2.5MB, it could
take 7~12min in async checkpointing:
[image: Screen Shot 2020-04-09 at 5.04.30 PM.png]
Notice all the slowness comes from async
tom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community can
>> help you in this case, or maybe you can also reach out to
Hi,
I am looking for end to end latency monitoring of link job. Based on my
study, I have two options:
1. flink provide a latency tracking feature. However, the documentation
says it cannot show actual latency of business logic as it will bypass all
operators.
$current_processing - $event_time is something ok, but
> keep the things in mind: the event time may not be the time ingested in
> Flink.
>
> Best,
> Congxian
>
>
> Lu Niu 于2020年3月28日周六 上午6:25写道:
> Hi,
>
> I am looking for end to end latency monitoring of l
On Tue, Apr 21, 2020 at 1:46 PM Lu Niu wrote:
> Cool, thanks!
>
> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger
> wrote:
>
>> I'm not aware of anything. I think the presto s3 file system is generally
>> the recommended S3 FS implementation.
>>
>> On Mon,
>
> Best,
> Robert
>
>
> On Tue, Apr 21, 2020 at 10:50 PM Lu Niu wrote:
>
>> Hi, Robert
>>
>> Thanks for replying. To improve observability , do you think we should
>> expose more metrics in checkpointing? for example, in incremental
>> checkpoint
em during checkpointing?
>
> I'm not aware of any metrics in Flink that could be helpful in this
> situation.
>
> Best,
> Robert
>
> On Tue, Apr 14, 2020 at 12:02 AM Lu Niu wrote:
>
>> Hi, Flink users
>>
>> We notice sometimes async checkpointing c
Cool, thanks!
On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger wrote:
> I'm not aware of anything. I think the presto s3 file system is generally
> the recommended S3 FS implementation.
>
> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu wrote:
>
>> Thank you both. Given the debug o
n other words, new incremental checkpoint has no relationship with older
> savepoint from which restored.
>
> Best
> Yun Tang
> ------
> *From:* Lu Niu
> *Sent:* Saturday, July 18, 2020 5:48
> *To:* user
> *Subject:* Are files in savepoint still neede
Hi,
We are using end to end exact-once flink + kafka and encountered belowing
exception which usually came after checkpoint failures:
```
*Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
were no records in the
> transactions).
>
> Regards,
> Roman
>
>
> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu wrote:
>
>> Hi,
>>
>> We are using end to end exact-once flink + kafka and encountered belowing
>> exception which usually came after checkpoint failur
Hi, Flink Users
Assuming one flink job turns incremental checkpointing and restores from a
savepoint. It runs fine for a while and commits one checkpoint and then it
fully restarts because of one error. At this time, is it possible that the
job still needs files in the original savepoint for
RM for a
> certain period of time, it will revoke the leadership and notify
> other components. You can look into the ZooKeeper logs checking why RM's
> leadership is revoked.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu wro
Hi, Flink users
Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
Hi,
We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot
consume from savepoint taken in 1.9.1. Here is the list of operator id and
max parallelism of savepoints taken in both versions. The only code change
is version upgrade.
savepoint 1.9.1:
```
Id:
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Dec 18, 2020 at 8:53 AM Lu Niu wrote:
>
>> Hi, Xintong
>>
>> Thanks for replying and your suggestion. I did check the ZK side but
>> there is nothing interesting. The error message actually shows that only
>
HI, Flink Users
We use a Zk cluster of 5 node for JM HA. When we terminate one node for
maintenance, we notice lots of flink job fully restarts. The error looks
like:
```
org.apache.flink.util.FlinkException: ResourceManager leader changed to new
address null
at
a) is timeout before it can
> mark the TM failed and continue the failover procedure.
>
> We suppose that the phase from CANCELING to CANCELED takes
> min(akka.ask.timeout, heartbeat.timeout), though not confirmed yet.
>
> Hope it helps. Please let me know if there's anything wro
Thanks Gen! cc flink-dev to collect more inputs.
Best
Lu
On Wed, Jun 30, 2021 at 12:55 AM Gen Luo wrote:
> I'm also wondering here.
>
> In my opinion, it's because the JM can not confirm whether the TM is lost
> or it's a temporary network trouble and will recover soon, since I can see
> in
ent. On the upside, if you mark the TaskExecutor dead on
>> the first
>> >>>>> connection loss (assuming you have a stable network environment),
>> then it
>> >>>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>&g
g the cancellation operation: Flink currently does not listen
>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>> primary means to fail the future result of a rpc which could not be sent.
>>> This is also an improvement we should add to Flink's
ntime.executiongraph.ExecutionGraph(time when
all tasks switch from CREATED to RUNNING)
```
Best
Lu
On Thu, Jul 1, 2021 at 12:06 PM Lu Niu wrote:
> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionall
Another side question, Shall we add metric to cover the complete restarting
time (phase 1 + phase 2)? Current metric jm.restartingTime only covers
phase 1. Thanks!
Best
Lu
On Thu, Jul 1, 2021 at 12:09 PM Lu Niu wrote:
> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
&
might be
>>> some problems with setting the maxParallelism in the TableAPI.
>>>
>>> Keep in mind that you could use the State Processor API [1] to adjust
>>> the maxParallelism per Operator in a Savepoint.
>>>
>>> Best,
>>> Matthias
>&g
Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sat, Jan 30, 2021 at 8:27 AM Xintong Song
> wrote:
>
> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
> aware of any similar issue reported since the upgrading.
>
> I would suggest the following:
&
gt;
> Piotrek
>
>
> pon., 5 kwi 2021 o 22:54 Lu Niu napisał(a):
>
> > Hi,
> >
> > we need to update our email system then :) . Here are the links:
> >
> >
> >
> https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharin
Hi, Flink Users
When we migrate from flink 1.9.1 to flink 1.11, we notice job will always
fail on checkpoint if job uses Iterator Operator, no matter we use
unaligned checkpoint or not. Those jobs don't have checkpoint issues in
1.9. Is this a known issue? Thank you!
Best
Lu
cause of the problem.*
On Wed, Mar 31, 2021 at 2:01 PM Lu Niu wrote:
> Hi, Colletta
>
> Thanks for sharing! Do you mind share one stacktrace for that error as
> well? Thanks!
>
> Best
> Lu
>
> On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward
> wrote:
>
>>
&
gt; CC user-list
>
> On 15. Apr 2021, at 22:34, Lu Niu wrote:
>
> Hi, Flink Users
>
> When we migrate from flink 1.9.1 to flink 1.11, we notice job will always
> fail on checkpoint if job uses Iterator Operator, no matter we use
> unaligned checkpoint or not. Those jobs
t;
> Martijn
>
> On Wed, 24 Nov 2021 at 00:57, Lu Niu wrote:
>
>> Hi, Flink Users
>>
>> I am wondering whether there is a REST service for submitting flinkSQL,
>> similar like Livy to SparkSQL? I found
>> https://github.com/ververica/flink-sql-gateway/ but I am not sure
>> whether it's still active.
>>
>> Best
>> Lu
>>
>
Hi, Flink Users
I am wondering whether there is a REST service for submitting flinkSQL,
similar like Livy to SparkSQL? I found
https://github.com/ververica/flink-sql-gateway/ but I am not sure
whether it's still active.
Best
Lu
Hi, Flink users
How to express multiple stream window join in flink sql? in datastream api,
that's
stream.join(otherStream)
.where()
.equalTo()
.window()
.apply()
(
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/joining/
)
For example, in
This is our yarn related settings:
yarn.scheduler.fair.assignmultiple: "true"
yarn.scheduler.fair.dynamic.max.assign: "false"
yarn.scheduler.fair.max.assign: 1
any suggestions?
Best
Lu
On Wed, Sep 6, 2023 at 9:16 AM Lu Niu wrote:
> Hi, Thanks for all your help. Are t
Hi, Flink users
We have recently observed that the allocation of Flink TaskManagers in our
YARN cluster is not evenly distributed. We would like to hear your thoughts
on this matter.
1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0.
2. The uneven distribution is that out of a
lt;https://aka.ms/o0ukef>
> --
> *发件人:* Chen Zhanghao
> *发送时间:* Tuesday, August 29, 2023 12:14:53 PM
> *收件人:* Lu Niu ; Weihua Hu
> *抄送:* Kenan Kılıçtepe ; user
> *主题:* Re: Uneven TM Distribution of Flink on YARN
>
> CCing @Weihua Hu , who is an expert on this. Do
&g
Hi, Thanks for all your help. Are there any other insights?
Best
Lu
On Wed, Aug 30, 2023 at 11:29 AM Lu Niu wrote:
> No. we don't use yarn.taskmanager.node-label
>
> Best
> Lu
>
> On Tue, Aug 29, 2023 at 12:17 AM Geng Biao wrote:
>
>> Maybe you can check if you hav
between flink
> and spark is that most flink jobs are unbounded while spark jobs are
> bounded. It is possible that under same YARN scheduling strategy, the final
> distribution of apps after some time is different.
>
>
>
> Best,
>
> Biao Geng
>
>
>
> *From
Thanks for the reply. We've already set cluster.evenly-spread-out-slots =
true
Best
Lu
On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe
wrote:
> Have you checked config param cluster.evenly-spread-out-slots ?
>
>
> On Mon, Aug 28, 2023 at 10:31 PM Lu Niu wrote:
>
>> Hi,
Thanks for your reply.
The interesting fact is that we also managed spark on yarn. However. Only
the flink cluster is having the issue. I am wondering whether there is a
difference in the implementation on flink side.
Best
Lu
On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao
wrote:
> Hi Lu
Hi, Flink Users
We have a user case that requests running ad hoc queries to query flink
state. There are several options:
1. Dump flink state to external data systems, like kafka, s3 etc. from
there we can query the data. This is a very straightforward approach, but
adds system complexity and
t;
> Best,
> Shammon FY
>
> On Thu, Apr 27, 2023 at 1:33 AM Lu Niu wrote:
>
>> Hi, Flink users
>>
>> I am trying to understand the internals of flink batch mode. some
>> questions:
>>
>> 1. Does flink batch mode use columnar in-memory format
Hi, Flink users
I am trying to understand the internals of flink batch mode. some questions:
1. Does flink batch mode use columnar in-memory format?
2. Does flink batch mode use vectorization technique?
3. any performance benchmark available compared with batch engines like
spark or presto?
Hi, Flink dev and users
If I want to async write to an external service, which API shall I use,
AsyncFunction or Async Sink?
My understanding after checking the code are:
1. Both APIs guarantee at least once write to external service. As both
API internally stores in-flight requests in
ap -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink
>
> We can be sure that the updates to DynamoDB for a particular record
> happens before the record is written to the Kinesis Sink.
>
>
> Hope the above clarifies your question!
>
> Regards,
> Hong
>
>
> On 14 Ju
For example, if a flink job reads from kafka do something and writes to
kafka. Do we need to take any actions when the job kafka consumer lag is
low or 0 but some tasks have constant backpressure? Do we need to increase
the parallelism or do some network tuning so that backpressure is constant
0?
58 matches
Mail list logo