Hi Vinay,

I think the issue is tracked here: 
https://github.com/facebook/rocksdb/issues/1988 
<https://github.com/facebook/rocksdb/issues/1988>.

Best,
Stefan

> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath 
> <vishnu.viswanat...@gmail.com>:
> 
> Hi Stephan,
> 
> Is there a ticket number/link to track this, My job has all the conditions 
> you mentioned.
> 
> Thanks,
> Vishnu
> 
> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> Hi Vinay!
> 
> We just discovered a bug in RocksDB. The bug affects windows without reduce() 
> or fold(), windows with evictors, and ListState.
> 
> A certain access pattern in RocksDB starts being so slow after a certain 
> size-per-key that it basically brings down the streaming program and the 
> snapshots.
> 
> We are reaching out to the RocksDB folks and looking for workarounds in Flink.
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> @vinay  Can you try to not set the buffer timeout at all? I am actually not 
> sure what would be the effect of setting it to a negative value, that can be 
> a cause of problems...
> 
> 
> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <swies...@mediamath.com 
> <mailto:swies...@mediamath.com>> wrote:
> Vinay,
> 
>  
> 
> The bucketing sink performs rename operations during the checkpoint and if it 
> tries to rename a file that is not yet consistent that would cause a 
> FileNotFound exception which would fail the checkpoint.
> 
>  
> 
> Stephan,
> 
>  
> 
> Currently my aws fork contains some very specific assumptions about the 
> pipeline that will in general only hold for my pipeline. This is because 
> there were still some open questions that  I had about how to solve 
> consistency issues in the general case. I will comment on the Jira issue with 
> more specific.
> 
>  
> 
> Seth Wiesman
> 
>  
> 
> From: vinay patil <vinay18.pa...@gmail.com <mailto:vinay18.pa...@gmail.com>>
> Reply-To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Date: Monday, February 27, 2017 at 1:05 PM
> To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> 
> 
> Subject: Re: Checkpointing with RocksDB as statebackend
> 
>  
> 
> Hi Seth,
> 
> Thank you for your suggestion.
> 
> But if the issue is only related to S3, then why does this happen when I 
> replace the S3 sink  to HDFS as well (for checkpointing I am using HDFS only )
> 
> Stephan,
> 
> Another issue I see is when I set env.setBufferTimeout(-1) , and keep the 
> checkpoint interval to 10minutes, I have observed that nothing gets written 
> to sink (tried with S3 as well as HDFS), atleast I was expecting pending 
> files here.
> 
> This issue gets worst when checkpointing is disabled  as nothing is written.
> 
>  
> 
> 
> 
> Regards,
> 
> Vinay Patil
> 
>  
> 
> On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User Mailing 
> List archive.] <[hidden email] <>> wrote:
> 
> Hi Seth!
> 
>  
> 
> Wow, that is an awesome approach.
> 
>  
> 
> We have actually seen these issues as well and we are looking to eventually 
> implement our own S3 file system (and circumvent Hadoop's S3 connector that 
> Flink currently relies on): https://issues.apache.org/jira/browse/FLINK-5706 
> <https://issues.apache.org/jira/browse/FLINK-5706>
>  
> 
> Do you think your patch would be a good starting point for that and would you 
> be willing to share it?
> 
>  
> 
> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible to fork 
> officially, if necessary...
> 
>  
> 
> Greetings,
> 
> Stephan
> 
>  
> 
>  
> 
>  
> 
> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote:
> 
> Just wanted to throw in my 2cts.  
> 
>  
> 
> I’ve been running pipelines with similar state size using rocksdb which 
> externalize to S3 and bucket to S3. I was getting stalls like this and ended 
> up tracing the problem to S3 and the bucketing sink. The solution was two 
> fold:
> 
>  
> 
> 1)       I forked hadoop-aws and have it treat flink as a source of truth. 
> Emr uses a dynamodb table to determine if S3 is inconsistent. Instead I say 
> that if flink believes that a file exists on S3 and we don’t see it then I am 
> going to trust that flink is in a consistent state and S3 is not. In this 
> case, various operations will perform a back off and retry up to a certain 
> number of times.
> 
>  
> 
> 2)       The bucketing sink performs multiple renames over the lifetime of a 
> file, occurring when a checkpoint starts and then again on notification after 
> it completes. Due to S3’s consistency guarantees the second rename of file 
> can never be assured to work and will eventually fail either during or after 
> a checkpoint. Because there is no upper bound on the time it will take for a 
> file on S3 to become consistent, retries cannot solve this specific problem 
> as it could take upwards of many minutes to rename which would stall the 
> entire pipeline. The only viable solution I could find was to write a custom 
> sink which understands S3. Each writer will write file locally and then copy 
> it to S3 on checkpoint. By only interacting with S3 once per file it can 
> circumvent consistency issues all together.
> 
>  
> 
> Hope this helps,
> 
>  
> 
> Seth Wiesman
> 
>  
> 
> From: vinay patil <[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11943&i=1>>
> Reply-To: "[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11943&i=3>>
> Date: Saturday, February 25, 2017 at 10:50 AM
> To: "[hidden email] <http://user/SendEmail.jtp?type=node&node=11943&i=4>" 
> <[hidden email] <http://user/SendEmail.jtp?type=node&node=11943&i=5>>
> Subject: Re: Checkpointing with RocksDB as statebackend
> 
>  
> 
> HI Stephan,
> 
> Just to avoid the confusion here, I am using S3 sink for writing the data, 
> and using HDFS for storing checkpoints.
> 
> There are 2 core nodes (HDFS) and two task nodes on EMR
> 
> 
> I replaced s3 sink with HDFS for writing data in my last test.
> 
> Let's say the checkpoint interval is 5 minutes, now within 5minutes of run 
> the state size grows to 30GB ,  after checkpointing the 30GB state that is 
> maintained in rocksDB has to be copied to HDFS, right ?  is this causing the 
> pipeline to stall ?
> 
> 
> 
> Regards,
> 
> Vinay Patil
> 
>  
> 
> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
> 
> Hi Stephan,
> 
> To verify if S3 is making teh pipeline stall, I have replaced the S3 sink 
> with HDFS and kept minimum pause between checkpoints to 5minutes, still I see 
> the same issue with checkpoints getting failed.
> 
> If I keep the  pause time to 20 seconds, all checkpoints are completed , 
> however there is a hit in overall throughput.
> 
>  
> 
> 
> 
> Regards,
> 
> Vinay Patil
> 
>  
> 
> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User Mailing 
> List archive.] <[hidden email]> wrote:
> 
> Flink's state backends currently do a good number of "make sure this exists" 
> operations on the file systems. Through Hadoop's S3 filesystem, that 
> translates to S3 bucket list operations, where there is a limit in how many 
> operation may happen per time interval. After that, S3 blocks.
> 
>  
> 
> It seems that operations that are totally cheap on HDFS are hellishly 
> expensive (and limited) on S3. It may be that you are affected by that.
> 
>  
> 
> We are gradually trying to improve the behavior there and be more S3 aware.
> 
>  
> 
> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.
> 
>  
> 
> Best,
> 
> Stephan
> 
>  
> 
>  
> 
> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote:
> 
> Hi Stephan,
> 
> So do you mean that S3 is causing the stall , as I have mentioned in my 
> previous mail, I could not see any progress for 16minutes as checkpoints were 
> getting failed continuously.
> 
>  
> 
> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing List 
> archive.]" <[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote:
> 
> Hi Vinay!
> 
>  
> 
> True, the operator state (like Kafka) is currently not asynchronously 
> checkpointed.
> 
>  
> 
> While it is rather small state, we have seen before that on S3 it can cause 
> trouble, because S3 frequently stalls uploads of even data amounts as low as 
> kilobytes due to its throttling policies.
> 
>  
> 
> That would be a super important fix to add!
> 
>  
> 
> Best,
> 
> Stephan
> 
>  
> 
>  
> 
> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email] 
> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote:
> 
> Hi,
> 
> I have attached a snapshot for reference:
> As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3 it
> is stuck at the Kafka source after 50%
> (The data sent till now by Kafka source 1 is 65GB and sent by source 2 is
> 15GB )
> 
> Within 10minutes 15M records were processed, and for the next 16minutes the
> pipeline is stuck , I don't see any progress beyond 15M because of
> checkpoints getting failed consistently.
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11882/Checkpointing_Failed.png
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11882/Checkpointing_Failed.png>>
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11882.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11882.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.
> 
>  
> 
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11885.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11885.html>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden email] <http://user/SendEmail.jtp?type=node&node=11887&i=1> 
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>  
> 
> View this message in context: Re: Checkpointing with RocksDB as statebackend 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
> Nabble.com.
> 
>  
> 
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11891.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11891.html>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden email] 
> To unsubscribe from Apache Flink User Mailing List archive., click here.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>  
> 
>  
> 
>  
> 
> View this message in context: Re: Checkpointing with RocksDB as statebackend 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
> Nabble.com.
> 
>  
> 
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11943.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11943.html>
> To start a new topic under Apache Flink User Mailing List archive., email 
> [hidden email] <> 
> To unsubscribe from Apache Flink User Mailing List archive., click here <>.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>  
> 
>  
> 
> View this message in context: Re: Checkpointing with RocksDB as statebackend 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
> Nabble.com.
> 
> 
> 
> 
> 

Reply via email to