Re:are there any ways to test the performance of rocksdb state backend?
Hi makeyang, there are some cases under _org.apache.flink.contrib.streaming.state.benchmark.*_ that you can refer to. But, I not sure whether it's possible to upgrade the RocksDB to any higher version because the regression of the merge operator, the comments in this PR https://github.com/apache/flink/pull/5937 may also give you some help. Best, Sihua On 05/18/2018 11:05,makeyang wrote: I'd like to integrate newer version of rocksdb with flink. I'd like to know if there are existing tools/ways to benchmark the performance of rocksdb state backend to see if there are performence improve or drop? MaKeyang TIG.JD.COM -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
are there any ways to test the performance of rocksdb state backend?
I'd like to integrate newer version of rocksdb with flink. I'd like to know if there are existing tools/ways to benchmark the performance of rocksdb state backend to see if there are performence improve or drop? MaKeyang TIG.JD.COM -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: flink list -r shows CANCELED jobs - Flink 1.5
Hi Edward, I dug a little into the CLIFrontend code and seems like there's some discrepancies between the description and the result return from flink list -r I have documented the issue in [1]. Please feel free to comment if I missed anything. Thanks, Rong Reference: [1] https://issues.apache.org/jira/browse/FLINK-9398 On Thu, May 17, 2018 at 4:48 PM, Rong Rong wrote: > This sounds like a bug to me, I can reproduce with the latest RC#3 of > Flink 1.5 with: > > bin/start-cluster.sh >> bin/flink run examples/streaming/WordCount.jar >> bin/flink list -r >> > > Would you please file a JIRA bug report? I will look into it > > -- > Rong > > On Thu, May 17, 2018 at 10:50 AM, Edward Rojas > wrote: > >> I forgot to add an example of the execution: >> >> $ ./bin/flink list -r >> Waiting for response... >> -- Running/Restarting Jobs --- >> 17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob >> (CANCELED) >> 17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob >> (RUNNING) >> >> Note that from the title it's supposed to return only Running or >> Restarting >> Jobs. >> >> I'm using this response on a script that updates a job by canceling with >> savepoint... I just want to know if I have ti update my script :) >> >> Thanks in advance >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > >
Re: flink list -r shows CANCELED jobs - Flink 1.5
This sounds like a bug to me, I can reproduce with the latest RC#3 of Flink 1.5 with: bin/start-cluster.sh > bin/flink run examples/streaming/WordCount.jar > bin/flink list -r > Would you please file a JIRA bug report? I will look into it -- Rong On Thu, May 17, 2018 at 10:50 AM, Edward Rojas wrote: > I forgot to add an example of the execution: > > $ ./bin/flink list -r > Waiting for response... > -- Running/Restarting Jobs --- > 17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob > (CANCELED) > 17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob > (RUNNING) > > Note that from the title it's supposed to return only Running or Restarting > Jobs. > > I'm using this response on a script that updates a job by canceling with > savepoint... I just want to know if I have ti update my script :) > > Thanks in advance > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >
Re: Message guarantees with S3 Sink
Hi Amit, The BucketingSink doesn't have well defined semantics when used with S3. Data loss is possible but I am not sure whether it is the only problem. There are plans to rewrite the BucketingSink in Flink 1.6 to enable eventually consistent file systems [1][2]. Best, Gary [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html [2] https://issues.apache.org/jira/browse/FLINK-6306 On Thu, May 17, 2018 at 11:57 AM, Amit Jain wrote: > Hi, > > We are using Flink to process click stream data from Kafka and pushing > the same in 128MB file in S3. > > What is the message processing guarantees with S3 sink? In my > understanding, S3A client buffers the data on memory/disk. In failure > scenario on particular node, TM would not trigger Writer#close hence > buffered data can lose entirely assuming this buffer contains data of > last successful checkpointing. > > -- > Thanks, > Amit >
Re: flink list -r shows CANCELED jobs - Flink 1.5
I forgot to add an example of the execution: $ ./bin/flink list -r Waiting for response... -- Running/Restarting Jobs --- 17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob (CANCELED) 17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob (RUNNING) Note that from the title it's supposed to return only Running or Restarting Jobs. I'm using this response on a script that updates a job by canceling with savepoint... I just want to know if I have ti update my script :) Thanks in advance -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Message guarantees with S3 Sink
Hi Rong, We are using BucketingSink only. I'm looking for the case where TM does not get the chance to call Writer#flush like YARN killed the TM because of OOM. We have configured fs.s3.impl to com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so BucketingSink is using S3 client internally. When we write data using S3A client, it buffers up the data in memory or disk until it hit multipart file size or call to close of OutputStream happens. Now suppose, S3A client buffers up 40MB data in TM's local disk and same time checkpoint barrier comes in at Sink and got successfully completed. Write process in sink resumes and now buffer data size reaches to 60MB and now YARN killed the TM. What would happen to original 40MB of data ? -- Thanks, Amit On Thu, May 17, 2018 at 10:28 PM, Rong Rong wrote: > Hi Amit, > > Can you elaborate how you write using "S3 sink" and which version of Flink > you are using? > > If you are using BucketingSink[1], you can checkout the API doc and > configure to flush before closing your sink. > This way your sink is "integrated with the checkpointing mechanism to > provide exactly once semantics"[2] > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html > > On Thu, May 17, 2018 at 2:57 AM, Amit Jain wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > >
flink list -r shows CANCELED jobs - Flink 1.5
Hello all, On Flink 1.5, the CLI returns the CANCELED jobs when requesting only the running job by using the -r flag... is this an intended behavior ? On 1.4 CANCELED jobs does not appear when running this command. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Message guarantees with S3 Sink
Hi Amit, Can you elaborate how you write using "S3 sink" and which version of Flink you are using? If you are using BucketingSink[1], you can checkout the API doc and configure to flush before closing your sink. This way your sink is "integrated with the checkpointing mechanism to provide exactly once semantics"[2] Thanks, Rong [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html [2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html On Thu, May 17, 2018 at 2:57 AM, Amit Jain wrote: > Hi, > > We are using Flink to process click stream data from Kafka and pushing > the same in 128MB file in S3. > > What is the message processing guarantees with S3 sink? In my > understanding, S3A client buffers the data on memory/disk. In failure > scenario on particular node, TM would not trigger Writer#close hence > buffered data can lose entirely assuming this buffer contains data of > last successful checkpointing. > > -- > Thanks, > Amit >
LocatableInputSplit flink and not Assigning properly
So I have a cluster with one job manager and two task manager with the IP 192.168.112.74 and 192.168.112.75. Each task manager specific has specific data I want to read and for that, I created LocatableInputSPlit one for each task manager. I am using the LocatableInputSplitAssigner, however, when I run the job it doesn't run on the host that I have on the LocatableInputSPlit sometimes it runs both on the same task manager. Should it run on the host I specified on the LocatableInputSplit? Thanks in advance -- Daniel Tavares
Re: Async Source Function in Flink
I see, thank you very much for your answer! I'll look into pool connection handling. Alternatively, I suppose that since it is a SourceFunction, even synchronous calls may be used without side effects in Flink? Thank you, Federico Il giorno mar 15 mag 2018 alle ore 16:16 Timo Walther ha scritto: > Hi Frederico, > > Flink's AsyncFunction is meant for enriching a record with information > that needs to be queried externally. So I guess you can't use it for your > use case because an async call is initiated by the input. However, your > custom SourceFunction could implement a similar asynchronous logic. By > having a pool of open connections that request asynchronously and emit the > response to the stream, once available, you can improve your throughput > (see [0]). > > Depending on your use case maybe the SourceFunction can only be > responsible for determining e.g. ids and the AsyncFunction is requesting > these ids via REST. This way you could leverage the available async > capabilities. > > I hope this helps. > > Regards, > Timo > > [0] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/asyncio.html#the-need-for-asynchronous-io-operations > > > Am 14.05.18 um 14:51 schrieb Federico D'Ambrosio: > > Hello everyone, > > just wanted to ask a quick question: I have to retrieve data from 2 web > services via REST calls, use them as sources and push these data to Kafka. > So far, I implemented a SourceFunction which deals with making the calls > with the respective clients. > > Now, the function does use, for each REST call, Await.result(). Do I > need to use Flink's AsyncFunction instead? What are the best practices when > it comes to AsyncSources? > > Thank you, > -- > Federico D'Ambrosio > > > -- Federico D'Ambrosio
Re: Missing MapState when Timer fires after restored state
Hi, > > This raises a couple of questions: > - Is it a bug though, that the state restoring goes wrong like it does for my > job? Based on my experience it seems like rescaling sometimes works, but then > you can have these random errors. If there is a problem, I would still consider it a bug because it should work correctly. > - If it's not supported properly, why not refuse to restore a checkpoint if > it would require rescaling? It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved. > - We have sometimes had Flink jobs where the state has become so heavy that > cancelling with a savepoint times out & fails. Incremental checkpoints are > still working because they don't timeout as long as the state is growing > linearly. In that case if we want to scale up (for example to enable > successful savepoint creation ;) ), the only thing we can do is to restore > from the latest checkpoint. But then we have no way to scale up by increasing > the cluster size, because we can't create a savepoint with a smaller cluster > but on the other hand can't restore a checkpoint to a bigger cluster, if > rescaling from a checkpoint is not supposed to be relied on. So in this case > we're stuck and forced to start from an empty state? IMO there is a very good chance that this will simply become a normal feature in the near future. Best, Stefan
Message guarantees with S3 Sink
Hi, We are using Flink to process click stream data from Kafka and pushing the same in 128MB file in S3. What is the message processing guarantees with S3 sink? In my understanding, S3A client buffers the data on memory/disk. In failure scenario on particular node, TM would not trigger Writer#close hence buffered data can lose entirely assuming this buffer contains data of last successful checkpointing. -- Thanks, Amit