Re: Customize file assignments logic in flink application

2019-08-15 Thread Lu Niu
ple *FileInputSplit*s. And you may need to > define in your *InputFormat* on how to process the new *InputSplit*. > > Thanks, > Zhu Zhu > > Lu Niu 于2019年8月15日周四 上午12:26写道: > >> Hi, >> >> I have a data set backed by a directory of files in which file nam

Customize file assignments logic in flink application

2019-08-14 Thread Lu Niu
Hi, I have a data set backed by a directory of files in which file names are meaningful. folder1 +-file01 +-file02 +-file03 +-file04 I want to control the file assignments in my flink application. For example, when parallelism is 2, worker 1 get file01 and file02 to

Re: Customize file assignments logic in flink application

2019-08-26 Thread Lu Niu
in your case. > The underlying is ParallelIteratorInputFormat and its processing is not > matched to a certain subtask index. > > Thanks, > Zhu Zhu > > Lu Niu 于2019年8月16日周五 上午12:48写道: > >> Hi, Zhu >> >> Thanks for reply! I found using SplittableIterator i

Implementing CheckpointableInputFormat

2019-09-05 Thread Lu Niu
Hi, Team I am implementing a custom InputFormat. Shall I implement CheckpointableInputFormat interface? If I don't, does that mean the whole job has to restart given only one task fails? I ask because I found all InputFormat implements CheckpointableInputFormat, which makes me confused. Thank

Limit max cpu usage per TaskManager

2019-11-05 Thread Lu Niu
Hi, When run flink application in yarn mode, is there a way to limit maximum cpu usage per TaskManager? I tried this application with just source and sink operator. parallelism of source is 60 and parallelism of sink is 1. When running in default config, there are 60 TaskManager assigned. I

Re: Limit max cpu usage per TaskManager

2019-11-06 Thread Lu Niu
t; https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups >> >> >> >> Best, >> >> Victor >> >> >> >> *From: *Vino Yang >> *Date: *Wednesday, 6 November 2019 at 4:26 PM >> *To: *L

Monitor rocksDB memory usage

2019-11-07 Thread Lu Niu
Hi, I read that rocksDB memory is managed off heap. Is there a way to monitor the memory usage there then? Best Lu

Re: Monitor rocksDB memory usage

2019-11-11 Thread Lu Niu
rics > > > > Best > > Yun Tang > > > > *From: *Lu Niu > *Date: *Friday, November 8, 2019 at 8:18 AM > *To: *user > *Subject: *Monitor rocksDB memory usage > > > > Hi, > > > > I read that rocksDB memory is managed off heap. Is there a way to monitor > the memory usage there then? > > > > Best > > Lu >

Help to Understand cutoff memory

2019-12-10 Thread Lu Niu
Hi, flink users I have some question regarding memory allocation. According to doc, containerized.heap-cutoff-ratio means: ``` Percentage of heap space to remove from containers (YARN / Mesos), to compensate for other JVM memory usage ``` However, I find cutoff memory is actually treated as

Re: Implementing CheckpointableInputFormat

2019-10-02 Thread Lu Niu
t; > Best, Fabian > > Am Do., 5. Sept. 2019 um 19:01 Uhr schrieb Lu Niu : > >> Hi, Team >> >> I am implementing a custom InputFormat. Shall I >> implement CheckpointableInputFormat interface? If I don't, does that mean >> the whole job has to restart

Re: End to End Latency Tracking in flink

2020-04-09 Thread Lu Niu
time - $event_time - ($source_current_time - > $event_time) = $sink_current_time - $source_current_time as the latency of > end to end。 > > Oscar Westra van Holthe - Kind 于2020年3月30日周一 > 下午5:15写道: > >> On Mon, 30 Mar 2020 at 05:08, Lu Niu wrote: >> >>> $current_

Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
Hi, flink users Did anyone encounter such error? The error comes from S3AFileSystem. But there is no capacity issue on any disk. we are using hadoop 2.7.1. ``` Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend at

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
nd there is some relative page[1], could you please make sure > there is enough space on the local dis. > > [1] > https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out > Best, > Congxian &

Debug Slowness in Async Checkpointing

2020-04-13 Thread Lu Niu
Hi, Flink users We notice sometimes async checkpointing can be extremely slow, leading to checkpoint timeout. For example, For a state size around 2.5MB, it could take 7~12min in async checkpointing: [image: Screen Shot 2020-04-09 at 5.04.30 PM.png] Notice all the slowness comes from async

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-13 Thread Lu Niu
tom Hadoop > version. > > Best, > Robert > > > On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu > wrote: > >> Hi LU >> >> I'm not familiar with S3 file system, maybe others in Flink community can >> help you in this case, or maybe you can also reach out to

End to End Latency Tracking in flink

2020-03-27 Thread Lu Niu
Hi, I am looking for end to end latency monitoring of link job. Based on my study, I have two options: 1. flink provide a latency tracking feature. However, the documentation says it cannot show actual latency of business logic as it will bypass all operators.

Re: End to End Latency Tracking in flink

2020-03-29 Thread Lu Niu
$current_processing - $event_time is something ok, but > keep the things in mind: the event time may not be the time ingested in > Flink. > > Best, > Congxian > > > Lu Niu 于2020年3月28日周六 上午6:25写道: > Hi, > > I am looking for end to end latency monitoring of l

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-23 Thread Lu Niu
On Tue, Apr 21, 2020 at 1:46 PM Lu Niu wrote: > Cool, thanks! > > On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger > wrote: > >> I'm not aware of anything. I think the presto s3 file system is generally >> the recommended S3 FS implementation. >> >> On Mon,

Re: Debug Slowness in Async Checkpointing

2020-04-23 Thread Lu Niu
> > Best, > Robert > > > On Tue, Apr 21, 2020 at 10:50 PM Lu Niu wrote: > >> Hi, Robert >> >> Thanks for replying. To improve observability , do you think we should >> expose more metrics in checkpointing? for example, in incremental >> checkpoint

Re: Debug Slowness in Async Checkpointing

2020-04-21 Thread Lu Niu
em during checkpointing? > > I'm not aware of any metrics in Flink that could be helpful in this > situation. > > Best, > Robert > > On Tue, Apr 14, 2020 at 12:02 AM Lu Niu wrote: > >> Hi, Flink users >> >> We notice sometimes async checkpointing c

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-21 Thread Lu Niu
Cool, thanks! On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger wrote: > I'm not aware of anything. I think the presto s3 file system is generally > the recommended S3 FS implementation. > > On Mon, Apr 13, 2020 at 11:46 PM Lu Niu wrote: > >> Thank you both. Given the debug o

Re: Are files in savepoint still needed after restoring if turning on incremental checkpointing

2020-07-20 Thread Lu Niu
n other words, new incremental checkpoint has no relationship with older > savepoint from which restored. > > Best > Yun Tang > ------ > *From:* Lu Niu > *Sent:* Saturday, July 18, 2020 5:48 > *To:* user > *Subject:* Are files in savepoint still neede

Kafka transaction error lead to data loss under end to end exact-once

2020-08-03 Thread Lu Niu
Hi, We are using end to end exact-once flink + kafka and encountered belowing exception which usually came after checkpoint failures: ``` *Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer

Re: Kafka transaction error lead to data loss under end to end exact-once

2020-08-04 Thread Lu Niu
were no records in the > transactions). > > Regards, > Roman > > > On Mon, Aug 3, 2020 at 9:14 PM Lu Niu wrote: > >> Hi, >> >> We are using end to end exact-once flink + kafka and encountered belowing >> exception which usually came after checkpoint failur

Are files in savepoint still needed after restoring if turning on incremental checkpointing

2020-07-17 Thread Lu Niu
Hi, Flink Users Assuming one flink job turns incremental checkpointing and restores from a savepoint. It runs fine for a while and commits one checkpoint and then it fully restarts because of one error. At this time, is it possible that the job still needs files in the original savepoint for

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-17 Thread Lu Niu
RM for a > certain period of time, it will revoke the leadership and notify > other components. You can look into the ZooKeeper logs checking why RM's > leadership is revoked. > > Thank you~ > > Xintong Song > > > > On Thu, Dec 17, 2020 at 8:42 AM Lu Niu wro

Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-16 Thread Lu Niu
Hi, Flink users Recently we migrated to flink 1.11 and see exceptions like: ``` 2020-12-15 12:41:01,199 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->

Flink 1.11 checkpoint compatibility issue

2021-01-21 Thread Lu Niu
Hi, We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot consume from savepoint taken in 1.9.1. Here is the list of operator id and max parallelism of savepoints taken in both versions. The only code change is version upgrade. savepoint 1.9.1: ``` Id:

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-28 Thread Lu Niu
> Thank you~ > > Xintong Song > > > > On Fri, Dec 18, 2020 at 8:53 AM Lu Niu wrote: > >> Hi, Xintong >> >> Thanks for replying and your suggestion. I did check the ZK side but >> there is nothing interesting. The error message actually shows that only >

Flink job restart when one ZK node is down

2021-06-14 Thread Lu Niu
HI, Flink Users We use a Zk cluster of 5 node for JM HA. When we terminate one node for maintenance, we notice lots of flink job fully restarts. The error looks like: ``` org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null at

Re: Job Recovery Time on TM Lost

2021-06-29 Thread Lu Niu
a) is timeout before it can > mark the TM failed and continue the failover procedure. > > We suppose that the phase from CANCELING to CANCELED takes > min(akka.ask.timeout, heartbeat.timeout), though not confirmed yet. > > Hope it helps. Please let me know if there's anything wro

Re: Job Recovery Time on TM Lost

2021-06-30 Thread Lu Niu
Thanks Gen! cc flink-dev to collect more inputs. Best Lu On Wed, Jun 30, 2021 at 12:55 AM Gen Luo wrote: > I'm also wondering here. > > In my opinion, it's because the JM can not confirm whether the TM is lost > or it's a temporary network trouble and will recover soon, since I can see > in

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Lu Niu
ent. On the upside, if you mark the TaskExecutor dead on >> the first >> >>>>> connection loss (assuming you have a stable network environment), >> then it >> >>>>> can now detect lost TaskExecutors as fast as the heartbeat interval. >&g

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
g the cancellation operation: Flink currently does not listen >>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the >>> primary means to fail the future result of a rpc which could not be sent. >>> This is also an improvement we should add to Flink's

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
ntime.executiongraph.ExecutionGraph(time when all tasks switch from CREATED to RUNNING) ``` Best Lu On Thu, Jul 1, 2021 at 12:06 PM Lu Niu wrote: > Thanks TIll and Yang for help! Also Thanks Till for a quick fix! > > I did another test yesterday. In this test, I intentionall

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Another side question, Shall we add metric to cover the complete restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only covers phase 1. Thanks! Best Lu On Thu, Jul 1, 2021 at 12:09 PM Lu Niu wrote: > Thanks TIll and Yang for help! Also Thanks Till for a quick fix! > &

Re: Flink 1.11 checkpoint compatibility issue

2021-01-24 Thread Lu Niu
might be >>> some problems with setting the maxParallelism in the TableAPI. >>> >>> Keep in mind that you could use the State Processor API [1] to adjust >>> the maxParallelism per Operator in a Savepoint. >>> >>> Best, >>> Matthias >&g

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-03-31 Thread Lu Niu
Thank you~ > > Xintong Song > > > > > > On Sat, Jan 30, 2021 at 8:27 AM Xintong Song > wrote: > > There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not > aware of any similar issue reported since the upgrading. > > I would suggest the following: &

Re: Zigzag shape in TM JVM used memory

2021-04-07 Thread Lu Niu
gt; > Piotrek > > > pon., 5 kwi 2021 o 22:54 Lu Niu napisał(a): > > > Hi, > > > > we need to update our email system then :) . Here are the links: > > > > > > > https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharin

Iterate Operator Checkpoint Failure

2021-04-15 Thread Lu Niu
Hi, Flink Users When we migrate from flink 1.9.1 to flink 1.11, we notice job will always fail on checkpoint if job uses Iterator Operator, no matter we use unaligned checkpoint or not. Those jobs don't have checkpoint issues in 1.9. Is this a known issue? Thank you! Best Lu

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-04-13 Thread Lu Niu
cause of the problem.* On Wed, Mar 31, 2021 at 2:01 PM Lu Niu wrote: > Hi, Colletta > > Thanks for sharing! Do you mind share one stacktrace for that error as > well? Thanks! > > Best > Lu > > On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward > wrote: > >> &

Re: Iterate Operator Checkpoint Failure

2021-04-16 Thread Lu Niu
gt; CC user-list > > On 15. Apr 2021, at 22:34, Lu Niu wrote: > > Hi, Flink Users > > When we migrate from flink 1.9.1 to flink 1.11, we notice job will always > fail on checkpoint if job uses Iterator Operator, no matter we use > unaligned checkpoint or not. Those jobs

Re: REST service for flinkSQL

2021-11-29 Thread Lu Niu
t; > Martijn > > On Wed, 24 Nov 2021 at 00:57, Lu Niu wrote: > >> Hi, Flink Users >> >> I am wondering whether there is a REST service for submitting flinkSQL, >> similar like Livy to SparkSQL? I found >> https://github.com/ververica/flink-sql-gateway/ but I am not sure >> whether it's still active. >> >> Best >> Lu >> >

REST service for flinkSQL

2021-11-23 Thread Lu Niu
Hi, Flink Users I am wondering whether there is a REST service for submitting flinkSQL, similar like Livy to SparkSQL? I found https://github.com/ververica/flink-sql-gateway/ but I am not sure whether it's still active. Best Lu

window join in flink sql

2021-10-28 Thread Lu Niu
Hi, Flink users How to express multiple stream window join in flink sql? in datastream api, that's stream.join(otherStream) .where() .equalTo() .window() .apply() ( https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/joining/ ) For example, in

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
This is our yarn related settings: yarn.scheduler.fair.assignmultiple: "true" yarn.scheduler.fair.dynamic.max.assign: "false" yarn.scheduler.fair.max.assign: 1 any suggestions? Best Lu On Wed, Sep 6, 2023 at 9:16 AM Lu Niu wrote: > Hi, Thanks for all your help. Are t

Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Lu Niu
Hi, Flink users We have recently observed that the allocation of Flink TaskManagers in our YARN cluster is not evenly distributed. We would like to hear your thoughts on this matter. 1. Our setup includes Flink version 1.15.1 and Hadoop 2.10.0. 2. The uneven distribution is that out of a

Re: Uneven TM Distribution of Flink on YARN

2023-08-30 Thread Lu Niu
lt;https://aka.ms/o0ukef> > -- > *发件人:* Chen Zhanghao > *发送时间:* Tuesday, August 29, 2023 12:14:53 PM > *收件人:* Lu Niu ; Weihua Hu > *抄送:* Kenan Kılıçtepe ; user > *主题:* Re: Uneven TM Distribution of Flink on YARN > > CCing @Weihua Hu , who is an expert on this. Do &g

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
Hi, Thanks for all your help. Are there any other insights? Best Lu On Wed, Aug 30, 2023 at 11:29 AM Lu Niu wrote: > No. we don't use yarn.taskmanager.node-label > > Best > Lu > > On Tue, Aug 29, 2023 at 12:17 AM Geng Biao wrote: > >> Maybe you can check if you hav

Re: Uneven TM Distribution of Flink on YARN

2023-09-06 Thread Lu Niu
between flink > and spark is that most flink jobs are unbounded while spark jobs are > bounded. It is possible that under same YARN scheduling strategy, the final > distribution of apps after some time is different. > > > > Best, > > Biao Geng > > > > *From

Re: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Lu Niu
Thanks for the reply. We've already set cluster.evenly-spread-out-slots = true Best Lu On Mon, Aug 28, 2023 at 1:23 PM Kenan Kılıçtepe wrote: > Have you checked config param cluster.evenly-spread-out-slots ? > > > On Mon, Aug 28, 2023 at 10:31 PM Lu Niu wrote: > >> Hi,

Re: Uneven TM Distribution of Flink on YARN

2023-08-28 Thread Lu Niu
Thanks for your reply. The interesting fact is that we also managed spark on yarn. However. Only the flink cluster is having the issue. I am wondering whether there is a difference in the implementation on flink side. Best Lu On Mon, Aug 28, 2023 at 8:38 PM Chen Zhanghao wrote: > Hi Lu

Best Practice for Querying Flink State

2022-08-29 Thread Lu Niu
Hi, Flink Users We have a user case that requests running ad hoc queries to query flink state. There are several options: 1. Dump flink state to external data systems, like kafka, s3 etc. from there we can query the data. This is a very straightforward approach, but adds system complexity and

Re: flink batch execution mode

2023-04-26 Thread Lu Niu
t; > Best, > Shammon FY > > On Thu, Apr 27, 2023 at 1:33 AM Lu Niu wrote: > >> Hi, Flink users >> >> I am trying to understand the internals of flink batch mode. some >> questions: >> >> 1. Does flink batch mode use columnar in-memory format

flink batch execution mode

2023-04-26 Thread Lu Niu
Hi, Flink users I am trying to understand the internals of flink batch mode. some questions: 1. Does flink batch mode use columnar in-memory format? 2. Does flink batch mode use vectorization technique? 3. any performance benchmark available compared with batch engines like spark or presto?

AsyncFunction vs Async Sink

2023-06-14 Thread Lu Niu
Hi, Flink dev and users If I want to async write to an external service, which API shall I use, AsyncFunction or Async Sink? My understanding after checking the code are: 1. Both APIs guarantee at least once write to external service. As both API internally stores in-flight requests in

Re: AsyncFunction vs Async Sink

2023-06-14 Thread Lu Niu
ap -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink > > We can be sure that the updates to DynamoDB for a particular record > happens before the record is written to the Kinesis Sink. > > > Hope the above clarifies your question! > > Regards, > Hong > > > On 14 Ju

When does backpressure matter

2023-06-22 Thread Lu Niu
For example, if a flink job reads from kafka do something and writes to kafka. Do we need to take any actions when the job kafka consumer lag is low or 0 but some tasks have constant backpressure? Do we need to increase the parallelism or do some network tuning so that backpressure is constant 0?