Hi Stephan,

Thank you for making me aware of this.

Yes I am using a window without reduce function (Apply function). The
discussion happening on JIRA is exactly what I am observing, consistent
failure of checkpoints after some time and the stream halts.

We want to go live in next month, not sure how this will affect in
production as we are going to get above 200 million data.

As a workaround can I take the savepoint while the pipeline is running ?
Let's say if I take savepoint after every 30minutes, will it work ?



Regards,
Vinay Patil

On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n12209...@n4.nabble.com> wrote:

> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>
> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=12209&i=0>> wrote:
>
>> Hi Vinay,
>>
>> I think the issue is tracked here: https://github.com/faceb
>> ook/rocksdb/issues/1988.
>>
>> Best,
>> Stefan
>>
>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=12209&i=1>>:
>>
>> Hi Stephan,
>>
>> Is there a ticket number/link to track this, My job has all the
>> conditions you mentioned.
>>
>> Thanks,
>> Vishnu
>>
>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=12209&i=2>> wrote:
>>
>>> Hi Vinay!
>>>
>>> We just discovered a bug in RocksDB. The bug affects windows without
>>> reduce() or fold(), windows with evictors, and ListState.
>>>
>>> A certain access pattern in RocksDB starts being so slow after a certain
>>> size-per-key that it basically brings down the streaming program and the
>>> snapshots.
>>>
>>> We are reaching out to the RocksDB folks and looking for workarounds in
>>> Flink.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=3>> wrote:
>>>
>>>> @vinay  Can you try to not set the buffer timeout at all? I am actually
>>>> not sure what would be the effect of setting it to a negative value, that
>>>> can be a cause of problems...
>>>>
>>>>
>>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=4>> wrote:
>>>>
>>>>> Vinay,
>>>>>
>>>>>
>>>>>
>>>>> The bucketing sink performs rename operations during the checkpoint
>>>>> and if it tries to rename a file that is not yet consistent that would
>>>>> cause a FileNotFound exception which would fail the checkpoint.
>>>>>
>>>>>
>>>>>
>>>>> Stephan,
>>>>>
>>>>>
>>>>>
>>>>> Currently my aws fork contains some very specific assumptions about
>>>>> the pipeline that will in general only hold for my pipeline. This is
>>>>> because there were still some open questions that  I had about how to 
>>>>> solve
>>>>> consistency issues in the general case. I will comment on the Jira issue
>>>>> with more specific.
>>>>>
>>>>>
>>>>>
>>>>> Seth Wiesman
>>>>>
>>>>>
>>>>>
>>>>> *From: *vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=5>>
>>>>> *Reply-To: *"[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=6>" <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=7>>
>>>>> *Date: *Monday, February 27, 2017 at 1:05 PM
>>>>> *To: *"[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=8>" <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=9>>
>>>>>
>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>>
>>>>>
>>>>>
>>>>> Hi Seth,
>>>>>
>>>>> Thank you for your suggestion.
>>>>>
>>>>> But if the issue is only related to S3, then why does this happen when
>>>>> I replace the S3 sink  to HDFS as well (for checkpointing I am using HDFS
>>>>> only )
>>>>>
>>>>> Stephan,
>>>>>
>>>>> Another issue I see is when I set env.setBufferTimeout(-1) , and keep
>>>>> the checkpoint interval to 10minutes, I have observed that nothing gets
>>>>> written to sink (tried with S3 as well as HDFS), atleast I was expecting
>>>>> pending files here.
>>>>>
>>>>> This issue gets worst when checkpointing is disabled  as nothing is
>>>>> written.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Vinay Patil
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User
>>>>> Mailing List archive.] <[hidden email]> wrote:
>>>>>
>>>>> Hi Seth!
>>>>>
>>>>>
>>>>>
>>>>> Wow, that is an awesome approach.
>>>>>
>>>>>
>>>>>
>>>>> We have actually seen these issues as well and we are looking to
>>>>> eventually implement our own S3 file system (and circumvent Hadoop's S3
>>>>> connector that Flink currently relies on): https://issues.apache.org
>>>>> /jira/browse/FLINK-5706
>>>>>
>>>>>
>>>>>
>>>>> Do you think your patch would be a good starting point for that and
>>>>> would you be willing to share it?
>>>>>
>>>>>
>>>>>
>>>>> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible
>>>>> to fork officially, if necessary...
>>>>>
>>>>>
>>>>>
>>>>> Greetings,
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote:
>>>>>
>>>>> Just wanted to throw in my 2cts.
>>>>>
>>>>>
>>>>>
>>>>> I’ve been running pipelines with similar state size using rocksdb
>>>>> which externalize to S3 and bucket to S3. I was getting stalls like this
>>>>> and ended up tracing the problem to S3 and the bucketing sink. The 
>>>>> solution
>>>>> was two fold:
>>>>>
>>>>>
>>>>>
>>>>> 1)       I forked hadoop-aws and have it treat flink as a source of
>>>>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent.
>>>>> Instead I say that if flink believes that a file exists on S3 and we don’t
>>>>> see it then I am going to trust that flink is in a consistent state and S3
>>>>> is not. In this case, various operations will perform a back off and retry
>>>>> up to a certain number of times.
>>>>>
>>>>>
>>>>>
>>>>> 2)       The bucketing sink performs multiple renames over the
>>>>> lifetime of a file, occurring when a checkpoint starts and then again on
>>>>> notification after it completes. Due to S3’s consistency guarantees the
>>>>> second rename of file can never be assured to work and will eventually 
>>>>> fail
>>>>> either during or after a checkpoint. Because there is no upper bound on 
>>>>> the
>>>>> time it will take for a file on S3 to become consistent, retries cannot
>>>>> solve this specific problem as it could take upwards of many minutes to
>>>>> rename which would stall the entire pipeline. The only viable solution I
>>>>> could find was to write a custom sink which understands S3. Each writer
>>>>> will write file locally and then copy it to S3 on checkpoint. By only
>>>>> interacting with S3 once per file it can circumvent consistency issues all
>>>>> together.
>>>>>
>>>>>
>>>>>
>>>>> Hope this helps,
>>>>>
>>>>>
>>>>>
>>>>> Seth Wiesman
>>>>>
>>>>>
>>>>>
>>>>> *From: *vinay patil <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=1>>
>>>>> *Reply-To: *"[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=3>>
>>>>> *Date: *Saturday, February 25, 2017 at 10:50 AM
>>>>> *To: *"[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=5>>
>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>>
>>>>>
>>>>>
>>>>> HI Stephan,
>>>>>
>>>>> Just to avoid the confusion here, I am using S3 sink for writing the
>>>>> data, and using HDFS for storing checkpoints.
>>>>>
>>>>> There are 2 core nodes (HDFS) and two task nodes on EMR
>>>>>
>>>>>
>>>>> I replaced s3 sink with HDFS for writing data in my last test.
>>>>>
>>>>> Let's say the checkpoint interval is 5 minutes, now within 5minutes of
>>>>> run the state size grows to 30GB ,  after checkpointing the 30GB state 
>>>>> that
>>>>> is maintained in rocksDB has to be copied to HDFS, right ?  is this 
>>>>> causing
>>>>> the pipeline to stall ?
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Vinay Patil
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
>>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> To verify if S3 is making teh pipeline stall, I have replaced the S3
>>>>> sink with HDFS and kept minimum pause between checkpoints to 5minutes,
>>>>> still I see the same issue with checkpoints getting failed.
>>>>>
>>>>> If I keep the  pause time to 20 seconds, all checkpoints are completed
>>>>> , however there is a hit in overall throughput.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Vinay Patil
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User
>>>>> Mailing List archive.] <[hidden email]> wrote:
>>>>>
>>>>> Flink's state backends currently do a good number of "make sure this
>>>>> exists" operations on the file systems. Through Hadoop's S3 filesystem,
>>>>> that translates to S3 bucket list operations, where there is a limit in 
>>>>> how
>>>>> many operation may happen per time interval. After that, S3 blocks.
>>>>>
>>>>>
>>>>>
>>>>> It seems that operations that are totally cheap on HDFS are hellishly
>>>>> expensive (and limited) on S3. It may be that you are affected by that.
>>>>>
>>>>>
>>>>>
>>>>> We are gradually trying to improve the behavior there and be more S3
>>>>> aware.
>>>>>
>>>>>
>>>>>
>>>>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote:
>>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> So do you mean that S3 is causing the stall , as I have mentioned in
>>>>> my previous mail, I could not see any progress for 16minutes as 
>>>>> checkpoints
>>>>> were getting failed continuously.
>>>>>
>>>>>
>>>>>
>>>>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing
>>>>> List archive.]" <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote:
>>>>>
>>>>> Hi Vinay!
>>>>>
>>>>>
>>>>>
>>>>> True, the operator state (like Kafka) is currently not asynchronously
>>>>> checkpointed.
>>>>>
>>>>>
>>>>>
>>>>> While it is rather small state, we have seen before that on S3 it can
>>>>> cause trouble, because S3 frequently stalls uploads of even data amounts 
>>>>> as
>>>>> low as kilobytes due to its throttling policies.
>>>>>
>>>>>
>>>>>
>>>>> That would be a super important fix to add!
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I have attached a snapshot for reference:
>>>>> As you can see all the 3 checkpointins failed , for checkpoint ID 2
>>>>> and 3 it
>>>>> is stuck at the Kafka source after 50%
>>>>> (The data sent till now by Kafka source 1 is 65GB and sent by source 2
>>>>> is
>>>>> 15GB )
>>>>>
>>>>> Within 10minutes 15M records were processed, and for the next
>>>>> 16minutes the
>>>>> pipeline is stuck , I don't see any progress beyond 15M because of
>>>>> checkpoints getting failed consistently.
>>>>>
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>>>> bble.com/file/n11882/Checkpointing_Failed.png>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-flink-user-maili
>>>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-
>>>>> RocksDB-as-statebackend-tp11752p11882.html
>>>>>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> *If you reply to this email, your message will be added to the
>>>>> discussion below:*
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>>> 2p11885.html
>>>>>
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>>> statebackend
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html>
>>>>>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> *If you reply to this email, your message will be added to the
>>>>> discussion below:*
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>>> 2p11891.html
>>>>>
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>>> statebackend
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> *If you reply to this email, your message will be added to the
>>>>> discussion below:*
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>>> 2p11943.html
>>>>>
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>>> statebackend
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>>
>>>>
>>>
>>
>>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-tp11752p12209.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p12224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to