Re: FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 Thread LakeShen
Hi fanrui,

thank you so much!

Best,
LakeShen


范瑞 <836961...@qq.com> 于2021年4月1日周四 下午7:36写道:

> Hi Lake:
>
>
> 目前的 Flink 版本应该都是不支持的,也就是说换了 StateBackend 不能正常恢复。Flink 1.13
> 做了这个事情,具体参考:FLIP41 和 FLINK-20976
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>
>
> https://issues.apache.org/jira/browse/FLINK-2097
>
>
> Best,
> fanrui
>
> ---原始邮件---
> 发件人: "LakeShen" 发送时间: 2021年4月1日(周四) 晚上7:16
> 收件人: "user-zh" 主题: FS StateBackend 到 RocksDB StateBackend 状态恢复问题
>
>
> Hi 社区,
>  如果实时任务状态后端之前是 FS StateBackend
> ,然后任务停止后,换成 RocksDB StateBackend
> 做恢复,作业状态能恢复吗?
>
> Best,
> LakeShen


Re: FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 Thread LakeShen
确定了 不能

LakeShen  于2021年4月1日周四 下午7:15写道:

> Hi 社区,
>如果实时任务状态后端之前是 FS StateBackend ,然后任务停止后,换成 RocksDB StateBackend
> 做恢复,作业状态能恢复吗?
>
> Best,
> LakeShen
>


回复:FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 Thread 范瑞
Hi Lake:


目前的 Flink 版本应该都是不支持的,也就是说换了 StateBackend 不能正常恢复。Flink 1.13 做了这个事情,具体参考:FLIP41 和 
FLINK-20976


https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


https://issues.apache.org/jira/browse/FLINK-2097


Best,
fanrui

---原始邮件---
发件人: "LakeShen"

FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 Thread LakeShen
Hi 社区,
   如果实时任务状态后端之前是 FS StateBackend ,然后任务停止后,换成 RocksDB StateBackend
做恢复,作业状态能恢复吗?

Best,
LakeShen


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-05-04 Thread Tony Wei
ementation that might
>> leads to some problems or about this issue, please advice me.
>> Thank you ver much for taking your time to pay attention on this issue!!
>> = )
>>
>> p.s. the attachment is about the experiment I mentioned above. I didn't
>> record the stack trace because the only difference is only Time Trigger's
>> state were runnable and the operator were blocked.
>>
>> Best Regards,
>> Tony Wei
>>
>>
>> 2018-03-06 17:00 GMT+08:00 周思华 <summerle...@163.com>:
>>
>>> Hi Tony,
>>>
>>> I agree with you.
>>>
>>> Best Regards,
>>>
>>> Sihua Zhou
>>>
>>>
>>> 发自网易邮箱大师
>>>
>>> On 03/6/2018 15:34,Tony Wei<tony19920...@gmail.com>
>>> <tony19920...@gmail.com> wrote:
>>>
>>> Hi Sihua,
>>>
>>> You are right. The incremental checkpoint might release machine from
>>> high cpu loading and make the bad machines recover quickly, but I was
>>> wondering why the first checkpoint failed by timeout. You can see when the
>>> bad machine recovered, the cpu loading for each checkpoint is not so high,
>>> although there were still peeks in each checkpoint happened. I think the
>>> high cpu loading that might be caused by those timeout checkpointing
>>> threads is not the root cause. I will use the incremental checkpoint
>>> eventually but I will decide if change my persistence filesystem after we
>>> find out the root cause or stop the investigation and make the
>>> conclusion in this mailing thread. What do you think?
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com>:
>>>
>>>> Hi Tony,
>>>>
>>>> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu
>>>> load is so much higher that the 'good tm', so I think maybe it also a
>>>> reason that could lead to timeout. Since you were using "full checkpoint",
>>>> it need to iterate all the records in the RocksDB with some `if` check,
>>>> when the state is huge this is cpu costly. Let me try to explain the full
>>>> checkpoint a bit more, it contains two parts.
>>>>
>>>> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint
>>>> Duration (sync) " on the checkpoint detail page)
>>>>
>>>> Part2. Loop the records of the snapshot, along with some `if` check to 
>>>> ensure
>>>> that the data is sent to s3 in the order of the key group. (This can map to
>>>> the "Checkpoint Duration(Async)").
>>>>
>>>> So part2 could be cpu costly and network costly, if the CPU load is too
>>>> high, then sending data will slow down, because there are in a single loop.
>>>> If cpu is the reason, this phenomenon will disappear if you use increment
>>>> checkpoint, because it almost only send data to s3. In the all, for now
>>>> trying out the incremental checkpoint is the best thing to do I think.
>>>>
>>>> Best Regards,
>>>> Sihua Zhou
>>>>
>>>>
>>>> 发自网易邮箱大师
>>>>
>>>> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com>
>>>> <tony19920...@gmail.com> wrote:
>>>>
>>>> Sent to the wrong mailing list. Forward it to the correct one.
>>>>
>>>> -- Forwarded message --
>>>> From: Tony Wei <tony19920...@gmail.com>
>>>> Date: 2018-03-06 14:43 GMT+08:00
>>>> Subject: Re: checkpoint stuck with rocksdb statebackend and s3
>>>> filesystem
>>>> To: 周思华 <summerle...@163.com>, Stefan Richter <
>>>> s.rich...@data-artisans.com>
>>>> Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>
>>>>
>>>>
>>>> Hi Sihua,
>>>>
>>>> Thanks a lot. I will try to find out the problem from machines'
>>>> environment. If you and Stefan have any new suggestions or thoughts, please
>>>> advise me. Thank you !
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> I think the two things you mentioned can both lead to a bad network.
>>>>> But from my side, I think it 

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-09 Thread Tony Wei
t checkpointing
>> threads is not the root cause. I will use the incremental checkpoint
>> eventually but I will decide if change my persistence filesystem after we
>> find out the root cause or stop the investigation and make the
>> conclusion in this mailing thread. What do you think?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com>:
>>
>>> Hi Tony,
>>>
>>> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu
>>> load is so much higher that the 'good tm', so I think maybe it also a
>>> reason that could lead to timeout. Since you were using "full checkpoint",
>>> it need to iterate all the records in the RocksDB with some `if` check,
>>> when the state is huge this is cpu costly. Let me try to explain the full
>>> checkpoint a bit more, it contains two parts.
>>>
>>> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint
>>> Duration (sync) " on the checkpoint detail page)
>>>
>>> Part2. Loop the records of the snapshot, along with some `if` check to 
>>> ensure
>>> that the data is sent to s3 in the order of the key group. (This can map to
>>> the "Checkpoint Duration(Async)").
>>>
>>> So part2 could be cpu costly and network costly, if the CPU load is too
>>> high, then sending data will slow down, because there are in a single loop.
>>> If cpu is the reason, this phenomenon will disappear if you use increment
>>> checkpoint, because it almost only send data to s3. In the all, for now
>>> trying out the incremental checkpoint is the best thing to do I think.
>>>
>>> Best Regards,
>>> Sihua Zhou
>>>
>>>
>>> 发自网易邮箱大师
>>>
>>> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com>
>>> <tony19920...@gmail.com> wrote:
>>>
>>> Sent to the wrong mailing list. Forward it to the correct one.
>>>
>>> -- Forwarded message --
>>> From: Tony Wei <tony19920...@gmail.com>
>>> Date: 2018-03-06 14:43 GMT+08:00
>>> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
>>> To: 周思华 <summerle...@163.com>, Stefan Richter <
>>> s.rich...@data-artisans.com>
>>> Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>
>>>
>>>
>>> Hi Sihua,
>>>
>>> Thanks a lot. I will try to find out the problem from machines'
>>> environment. If you and Stefan have any new suggestions or thoughts, please
>>> advise me. Thank you !
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:
>>>
>>>> Hi Tony,
>>>>
>>>> I think the two things you mentioned can both lead to a bad network.
>>>> But from my side, I think it more likely that it is the unstable network
>>>> env that cause the poor network performance itself, because as far as
>>>> I know I can't found out the reason that the flink would slow down the
>>>> network so much (even It does, the effect should not be that so much).
>>>>
>>>> Maybe stefan could tell more about that. ;)
>>>>
>>>> Best Regards,
>>>> Sihua Zhou
>>>>
>>>> 发自网易邮箱大师
>>>>
>>>> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com>
>>>> <tony19920...@gmail.com> wrote:
>>>>
>>>> Hi Sihua,
>>>>
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> About to your question: average end to end latency of checkpoint is
>>>>> less than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, 
>>>>> it
>>>>> determined byt the max end to end latency (the slowest one), a checkpoint
>>>>> truly completed only after all task's checkpoint have completed.
>>>>>
>>>>
>>>> Sorry for my poor expression. What I mean is the average duration of
>>>> "completed" checkpoints, so I guess there are some problems that make some
>>>> subtasks of checkpoint take so long, even more than 10 mins.
>>>>
>>>>
>>>>>
>>>>> About to the problem: after a second look at the info you privode, we
>>>>> can found from the checkpoint detail picture that there is one task which
>>>&

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-09 Thread Stefan Richter
erle...@163.com>>:
> Hi Tony,
> 
> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu load is 
> so much higher that the 'good tm', so I think maybe it also a reason that 
> could lead to timeout. Since you were using "full checkpoint", it need to 
> iterate all the records in the RocksDB with some `if` check, when the state 
> is huge this is cpu costly. Let me try to explain the full checkpoint a bit 
> more, it contains two parts.
> 
> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint 
> Duration (sync) " on the checkpoint detail page)
> 
> Part2. Loop the records of the snapshot, along with some `if` check to ensure 
> that the data is sent to s3 in the order of the key group. (This can map to 
> the "Checkpoint Duration(Async)").
> 
> So part2 could be cpu costly and network costly, if the CPU load is too high, 
> then sending data will slow down, because there are in a single loop. If cpu 
> is the reason, this phenomenon will disappear if you use increment 
> checkpoint, because it almost only send data to s3. In the all, for now 
> trying out the incremental checkpoint is the best thing to do I think.
> 
> Best Regards,
> Sihua Zhou
> 
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Sent to the wrong mailing list. Forward it to the correct one.
> 
> -- Forwarded message --
> From: Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>>
> Date: 2018-03-06 14:43 GMT+08:00
> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
> To: 周思华 <summerle...@163.com <mailto:summerle...@163.com>>, Stefan Richter 
> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>>
> Cc: "user-subscr...@flink.apache.org 
> <mailto:user-subscr...@flink.apache.org>" <user-subscr...@flink.apache.org 
> <mailto:user-subscr...@flink.apache.org>>
> 
> 
> Hi Sihua,
> 
> Thanks a lot. I will try to find out the problem from machines' environment. 
> If you and Stefan have any new suggestions or thoughts, please advise me. 
> Thank you !
> 
> Best Regards,
> Tony Wei
> 
> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com 
> <mailto:summerle...@163.com>>:
> Hi Tony,
> 
> I think the two things you mentioned can both lead to a bad network. But from 
> my side, I think it more likely that it is the unstable network env that 
> cause the poor network performance itself, because as far as I know I can't 
> found out the reason that the flink would slow down the network so much (even 
> It does, the effect should not be that so much). 
> 
> Maybe stefan could tell more about that. ;)
> 
> Best Regards,
> Sihua Zhou
> 
> 发自网易邮箱大师
> 
> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com> 
> <mailto:tony19920...@gmail.com> wrote:
> Hi Sihua,
> 
> 
> Hi Tony,
> 
> About to your question: average end to end latency of checkpoint is less than 
> 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined 
> byt the max end to end latency (the slowest one), a checkpoint truly 
> completed only after all task's checkpoint have completed.
> 
> Sorry for my poor expression. What I mean is the average duration of 
> "completed" checkpoints, so I guess there are some problems that make some 
> subtasks of checkpoint take so long, even more than 10 mins.
>  
> 
> About to the problem: after a second look at the info you privode, we can 
> found from the checkpoint detail picture that there is one task which cost 
> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others tasks 
> didn't complete the checkpoint yet. And from the bad_tm_pic.png vs 
> good_tm_pic.png, we can found that on "bad tm" the network performance is far 
> less than the "good tm" (-15 MB vs -50MB). So I guss the network is a 
> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe 
> you need to check whether the network env is stable)
>  
> That is what I concerned. Because I can't determine if checkpoint is stuck 
> makes network performance worse or network performance got worse makes 
> checkpoint stuck.
> Although I provided one "bad machine" and one "good machine", that doesn't 
> mean bad machine is always bad and good machine is always good. See the 
> attachments.
> All of my TMs met this problem at least once from last weekend until now. 
> Some machines recovered by themselves and some recovered after I restarted 
> them.
> 
> Best Regards,
> Tony 

Re: Fwd: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Sihua,

You are right. The incremental checkpoint might release machine from high
cpu loading and make the bad machines recover quickly, but I was wondering
why the first checkpoint failed by timeout. You can see when the bad
machine recovered, the cpu loading for each checkpoint is not so high,
although there were still peeks in each checkpoint happened. I think the
high cpu loading that might be caused by those timeout checkpointing
threads is not the root cause. I will use the incremental checkpoint
eventually but I will decide if change my persistence filesystem after we
find out the root cause or stop the investigation and make the
conclusion in this mailing thread. What do you think?

Best Regards,
Tony Wei

2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com>:

> Hi Tony,
>
> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu load
> is so much higher that the 'good tm', so I think maybe it also a reason
> that could lead to timeout. Since you were using "full checkpoint", it need
> to iterate all the records in the RocksDB with some `if` check, when the
> state is huge this is cpu costly. Let me try to explain the full checkpoint
> a bit more, it contains two parts.
>
> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint
> Duration (sync) " on the checkpoint detail page)
>
> Part2. Loop the records of the snapshot, along with some `if` check to ensure
> that the data is sent to s3 in the order of the key group. (This can map to
> the "Checkpoint Duration(Async)").
>
> So part2 could be cpu costly and network costly, if the CPU load is too
> high, then sending data will slow down, because there are in a single loop.
> If cpu is the reason, this phenomenon will disappear if you use increment
> checkpoint, because it almost only send data to s3. In the all, for now
> trying out the incremental checkpoint is the best thing to do I think.
>
> Best Regards,
> Sihua Zhou
>
>
> 发自网易邮箱大师
>
> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com>
> <tony19920...@gmail.com> wrote:
>
> Sent to the wrong mailing list. Forward it to the correct one.
>
> -- Forwarded message --
> From: Tony Wei <tony19920...@gmail.com>
> Date: 2018-03-06 14:43 GMT+08:00
> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
> To: 周思华 <summerle...@163.com>, Stefan Richter <s.rich...@data-artisans.com
> >
> Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>
>
>
> Hi Sihua,
>
> Thanks a lot. I will try to find out the problem from machines'
> environment. If you and Stefan have any new suggestions or thoughts, please
> advise me. Thank you !
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:
>
>> Hi Tony,
>>
>> I think the two things you mentioned can both lead to a bad network. But
>> from my side, I think it more likely that it is the unstable network env
>> that cause the poor network performance itself, because as far as I know
>> I can't found out the reason that the flink would slow down the network so
>> much (even It does, the effect should not be that so much).
>>
>> Maybe stefan could tell more about that. ;)
>>
>> Best Regards,
>> Sihua Zhou
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com>
>> <tony19920...@gmail.com> wrote:
>>
>> Hi Sihua,
>>
>>
>>> Hi Tony,
>>>
>>> About to your question: average end to end latency of checkpoint is less
>>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>>> determined byt the max end to end latency (the slowest one), a checkpoint
>>> truly completed only after all task's checkpoint have completed.
>>>
>>
>> Sorry for my poor expression. What I mean is the average duration of
>> "completed" checkpoints, so I guess there are some problems that make some
>> subtasks of checkpoint take so long, even more than 10 mins.
>>
>>
>>>
>>> About to the problem: after a second look at the info you privode, we
>>> can found from the checkpoint detail picture that there is one task which
>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4
>>> others tasks didn't complete the checkpoint yet. And from the
>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the
>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I
>>> guss the network is a problem, sometimes it failed to send 50

Fwd: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Sent to the wrong mailing list. Forward it to the correct one.

-- Forwarded message --
From: Tony Wei <tony19920...@gmail.com>
Date: 2018-03-06 14:43 GMT+08:00
Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
To: 周思华 <summerle...@163.com>, Stefan Richter <s.rich...@data-artisans.com>
Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org>


Hi Sihua,

Thanks a lot. I will try to find out the problem from machines'
environment. If you and Stefan have any new suggestions or thoughts, please
advise me. Thank you !

Best Regards,
Tony Wei

2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>:

> Hi Tony,
>
> I think the two things you mentioned can both lead to a bad network. But
> from my side, I think it more likely that it is the unstable network env
> that cause the poor network performance itself, because as far as I know
> I can't found out the reason that the flink would slow down the network so
> much (even It does, the effect should not be that so much).
>
> Maybe stefan could tell more about that. ;)
>
> Best Regards,
> Sihua Zhou
>
> 发自网易邮箱大师
>
> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com>
> <tony19920...@gmail.com> wrote:
>
> Hi Sihua,
>
>
>> Hi Tony,
>>
>> About to your question: average end to end latency of checkpoint is less
>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>> determined byt the max end to end latency (the slowest one), a checkpoint
>> truly completed only after all task's checkpoint have completed.
>>
>
> Sorry for my poor expression. What I mean is the average duration of
> "completed" checkpoints, so I guess there are some problems that make some
> subtasks of checkpoint take so long, even more than 10 mins.
>
>
>>
>> About to the problem: after a second look at the info you privode, we can
>> found from the checkpoint detail picture that there is one task which cost
>> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others
>> tasks didn't complete the checkpoint yet. And from the bad_tm_pic.png vs
>> good_tm_pic.png, we can found that on "bad tm" the network performance is
>> far less than the "good tm" (-15 MB vs -50MB). So I guss the network is a
>> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe
>> you need to check whether the network env is stable)
>>
>
> That is what I concerned. Because I can't determine if checkpoint is stuck
> makes network performance worse or network performance got worse makes
> checkpoint stuck.
> Although I provided one "bad machine" and one "good machine", that doesn't
> mean bad machine is always bad and good machine is always good. See the
> attachments.
> All of my TMs met this problem at least once from last weekend until now.
> Some machines recovered by themselves and some recovered after I restarted
> them.
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 13:41 GMT+08:00 周思华 <summerle...@163.com>:
>
>>
>> Hi Tony,
>>
>> About to your question: average end to end latency of checkpoint is less
>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>> determined byt the max end to end latency (the slowest one), a checkpoint
>> truly completed only after all task's checkpoint have completed.
>>
>> About to the problem: after a second look at the info you privode, we can
>> found from the checkpoint detail picture that there is one task which cost
>> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others
>> tasks didn't complete the checkpoint yet. And from the bad_tm_pic.png vs
>> good_tm_pic.png, we can found that on "bad tm" the network performance is
>> far less than the "good tm" (-15 MB vs -50MB). So I guss the network is a
>> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe
>> you need to check whether the network env is stable)
>>
>> About the solution: I think incremental checkpoint can help you a lot, it
>> will only send the new data each checkpoint, but you are right if the
>> increment state size is huger than 500M, it might cause the timeout problem
>> again (because of the bad network performance).
>>
>> Best Regards,
>> Sihua Zhou
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 13:02,Tony Wei<tony19920...@gmail.com>
>> <tony19920...@gmail.com> wrote:
>>
>> Hi Sihua,
>>
>> Thanks for your suggestion. "incremental checkpoint" is what I will try
>> out next and I know it will giv

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread 周思华


Hi Tony,


About to your question: average end to end latency of checkpoint is less than 
1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined 
byt the max end to end latency (the slowest one), a checkpoint truly completed 
only after all task's checkpoint have completed.


About to the problem: after a second look at the info you privode, we can found 
from the checkpoint detail picture that there is one task which cost 4m20s to 
transfer it snapshot (about 482M) to s3 and there are 4 others tasks didn't 
complete the checkpoint yet. And from the bad_tm_pic.png vs good_tm_pic.png, we 
can found that on "bad tm" the network performance is far less than the "good 
tm" (-15 MB vs -50MB). So I guss the network is a problem, sometimes it failed 
to send 500M data to s3 in 10 minutes. (maybe you need to check whether the 
network env is stable)


About the solution: I think incremental checkpoint can help you a lot, it will 
only send the new data each checkpoint, but you are right if the increment 
state size is huger than 500M, it might cause the timeout problem again 
(because of the bad network performance).


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 13:02,Tony Wei wrote:
Hi Sihua,


Thanks for your suggestion. "incremental checkpoint" is what I will try out 
next and I know it will give a better performance. However, it might not solve 
this issue completely, because as I said, the average end to end latency of 
checkpointing is less than 1.5 mins currently, and it is far from my timeout 
configuration. I believe "incremental checkpoint" will reduce the latency and 
make this issue might occur seldom, but I can't promise it won't happen again 
if I have bigger states growth in the future. Am I right?


Best Regards,
Tony Wei 


2018-03-06 10:55 GMT+08:00 周思华 :

Hi Tony,


Sorry for jump into, one thing I want to remind is that from the log you 
provided it looks like you are using "full checkpoint", this means that the 
state data that need to be checkpointed and transvered to s3 will grow over 
time, and even for the first checkpoint it performance is slower that 
incremental checkpoint (because it need to iterate all the record from the 
rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental 
checkpoint", it could help you got a better performance.


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 10:34,Tony Wei wrote:
Hi Stefan,


I see. That explains why the loading of machines grew up. However, I think it 
is not the root cause that led to these consecutive checkpoint timeout. As I 
said in my first mail, the checkpointing progress usually took 1.5 mins to 
upload states, and this operator and kafka consumer are only two operators that 
have states in my pipeline. In the best case, I should never encounter the 
timeout problem that only caused by lots of pending checkpointing threads that 
have already timed out. Am I right?


Since these logging and stack trace was taken after nearly 3 hours from the 
first checkpoint timeout, I'm afraid that we couldn't actually find out the 
root cause for the first checkpoint timeout. Because we are preparing to make 
this pipeline go on production, I was wondering if you could help me find out 
where the root cause happened: bad machines or s3 or flink-s3-presto packages 
or flink checkpointing thread. It will be great if we can find it out from 
those informations the I provided, or a hypothesis based on your experience is 
welcome as well. The most important thing is that I have to decide whether I 
need to change my persistence filesystem or use another s3 filesystem package, 
because it is the last thing I want to see that the checkpoint timeout happened 
very often.


Thank you very much for all your advices.


Best Regards,
Tony Wei


2018-03-06 1:07 GMT+08:00 Stefan Richter :

Hi,


thanks for all the info. I had a look into the problem and opened 
https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your stack 
trace, you can see many checkpointing threads are running on your TM for 
checkpoints that have already timed out, and I think this cascades and slows 
down everything. Seems like the implementation of some features like checkpoint 
timeouts and not failing tasks from checkpointing problems overlooked that we 
also require to properly communicate that checkpoint cancellation to all task, 
which was not needed before.


Best,
Stefan




Am 05.03.2018 um 14:42 schrieb Tony Wei :


Hi Stefan,


Here is my checkpointing configuration.



| Checkpointing Mode | Exactly Once |
| Interval | 20m 0s |
| Timeout | 10m 0s |
| Minimum Pause Between Checkpoints | 0ms |
| Maximum Concurrent Checkpoints | 1 |
| Persist Checkpoints Externally | Enabled (delete on cancellation) |
Best Regards,

Tony Wei


2018-03-05 21:30 GMT+08:00 Stefan Richter 

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Sihua,

Thanks for your suggestion. "incremental checkpoint" is what I will try out
next and I know it will give a better performance. However, it might not
solve this issue completely, because as I said, the average end to end
latency of checkpointing is less than 1.5 mins currently, and it is far
from my timeout configuration. I believe "incremental checkpoint" will
reduce the latency and make this issue might occur seldom, but I can't
promise it won't happen again if I have bigger states growth in the future.
Am I right?

Best Regards,
Tony Wei

2018-03-06 10:55 GMT+08:00 周思华 :

> Hi Tony,
>
> Sorry for jump into, one thing I want to remind is that from the log you
> provided it looks like you are using "full checkpoint", this means that the
> state data that need to be checkpointed and transvered to s3 will grow over
> time, and even for the first checkpoint it performance is slower that
> incremental checkpoint (because it need to iterate all the record from the
> rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental
> checkpoint", it could help you got a better performance.
>
> Best Regards,
> Sihua Zhou
>
> 发自网易邮箱大师
>
> On 03/6/2018 10:34,Tony Wei
>  wrote:
>
> Hi Stefan,
>
> I see. That explains why the loading of machines grew up. However, I think
> it is not the root cause that led to these consecutive checkpoint timeout.
> As I said in my first mail, the checkpointing progress usually took 1.5
> mins to upload states, and this operator and kafka consumer are only two
> operators that have states in my pipeline. In the best case, I should never
> encounter the timeout problem that only caused by lots of pending
> checkpointing threads that have already timed out. Am I right?
>
> Since these logging and stack trace was taken after nearly 3 hours from
> the first checkpoint timeout, I'm afraid that we couldn't actually find out
> the root cause for the first checkpoint timeout. Because we are preparing
> to make this pipeline go on production, I was wondering if you could help
> me find out where the root cause happened: bad machines or s3 or
> flink-s3-presto packages or flink checkpointing thread. It will be great if
> we can find it out from those informations the I provided, or a
> hypothesis based on your experience is welcome as well. The most important
> thing is that I have to decide whether I need to change my persistence
> filesystem or use another s3 filesystem package, because it is the last
> thing I want to see that the checkpoint timeout happened very often.
>
> Thank you very much for all your advices.
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 1:07 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> thanks for all the info. I had a look into the problem and opened
>> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your
>> stack trace, you can see many checkpointing threads are running on your TM
>> for checkpoints that have already timed out, and I think this cascades and
>> slows down everything. Seems like the implementation of some features like
>> checkpoint timeouts and not failing tasks from checkpointing problems
>> overlooked that we also require to properly communicate that checkpoint
>> cancellation to all task, which was not needed before.
>>
>> Best,
>> Stefan
>>
>>
>> Am 05.03.2018 um 14:42 schrieb Tony Wei :
>>
>> Hi Stefan,
>>
>> Here is my checkpointing configuration.
>>
>> Checkpointing Mode Exactly Once
>> Interval 20m 0s
>> Timeout 10m 0s
>> Minimum Pause Between Checkpoints 0ms
>> Maximum Concurrent Checkpoints 1
>> Persist Checkpoints Externally Enabled (delete on cancellation)
>> Best Regards,
>> Tony Wei
>>
>> 2018-03-05 21:30 GMT+08:00 Stefan Richter :
>>
>>> Hi,
>>>
>>> quick question: what is your exact checkpointing configuration? In
>>> particular, what is your value for the maximum parallel checkpoints and the
>>> minimum time interval to wait between two checkpoints?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 05.03.2018 um 06:34 schrieb Tony Wei :
>>> >
>>> > Hi all,
>>> >
>>> > Last weekend, my flink job's checkpoint start failing because of
>>> timeout. I have no idea what happened, but I collect some informations
>>> about my cluster and job. Hope someone can give me advices or hints about
>>> the problem that I encountered.
>>> >
>>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each
>>> has 4 cores. These machines are ec2 spot instances. The job's parallelism
>>> is set as 32, using rocksdb as state backend and s3 presto as checkpoint
>>> file system.
>>> > The state's size is nearly 15gb and still grows day-by-day. Normally,
>>> It takes 1.5 mins to finish the whole checkpoint process. The timeout
>>> configuration is set as 10 mins.
>>> >
>>> > 
>>> >
>>> > As the picture shows, not each subtask of checkpoint broke caused by

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Stefan,

I see. That explains why the loading of machines grew up. However, I think
it is not the root cause that led to these consecutive checkpoint timeout.
As I said in my first mail, the checkpointing progress usually took 1.5
mins to upload states, and this operator and kafka consumer are only two
operators that have states in my pipeline. In the best case, I should never
encounter the timeout problem that only caused by lots of pending
checkpointing threads that have already timed out. Am I right?

Since these logging and stack trace was taken after nearly 3 hours from the
first checkpoint timeout, I'm afraid that we couldn't actually find out the
root cause for the first checkpoint timeout. Because we are preparing to
make this pipeline go on production, I was wondering if you could help me
find out where the root cause happened: bad machines or s3 or
flink-s3-presto packages or flink checkpointing thread. It will be great if
we can find it out from those informations the I provided, or a
hypothesis based on your experience is welcome as well. The most important
thing is that I have to decide whether I need to change my persistence
filesystem or use another s3 filesystem package, because it is the last
thing I want to see that the checkpoint timeout happened very often.

Thank you very much for all your advices.

Best Regards,
Tony Wei

2018-03-06 1:07 GMT+08:00 Stefan Richter :

> Hi,
>
> thanks for all the info. I had a look into the problem and opened
> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your
> stack trace, you can see many checkpointing threads are running on your TM
> for checkpoints that have already timed out, and I think this cascades and
> slows down everything. Seems like the implementation of some features like
> checkpoint timeouts and not failing tasks from checkpointing problems
> overlooked that we also require to properly communicate that checkpoint
> cancellation to all task, which was not needed before.
>
> Best,
> Stefan
>
>
> Am 05.03.2018 um 14:42 schrieb Tony Wei :
>
> Hi Stefan,
>
> Here is my checkpointing configuration.
>
> Checkpointing Mode Exactly Once
> Interval 20m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints 1
> Persist Checkpoints Externally Enabled (delete on cancellation)
> Best Regards,
> Tony Wei
>
> 2018-03-05 21:30 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> quick question: what is your exact checkpointing configuration? In
>> particular, what is your value for the maximum parallel checkpoints and the
>> minimum time interval to wait between two checkpoints?
>>
>> Best,
>> Stefan
>>
>> > Am 05.03.2018 um 06:34 schrieb Tony Wei :
>> >
>> > Hi all,
>> >
>> > Last weekend, my flink job's checkpoint start failing because of
>> timeout. I have no idea what happened, but I collect some informations
>> about my cluster and job. Hope someone can give me advices or hints about
>> the problem that I encountered.
>> >
>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has
>> 4 cores. These machines are ec2 spot instances. The job's parallelism is
>> set as 32, using rocksdb as state backend and s3 presto as checkpoint file
>> system.
>> > The state's size is nearly 15gb and still grows day-by-day. Normally,
>> It takes 1.5 mins to finish the whole checkpoint process. The timeout
>> configuration is set as 10 mins.
>> >
>> > 
>> >
>> > As the picture shows, not each subtask of checkpoint broke caused by
>> timeout, but each machine has ever broken for all its subtasks during last
>> weekend. Some machines recovered by themselves and some machines recovered
>> after I restarted them.
>> >
>> > I record logs, stack trace and snapshot for machine's status (CPU, IO,
>> Network, etc.) for both good and bad machine. If there is a need for more
>> informations, please let me know. Thanks in advance.
>> >
>> > Best Regards,
>> > Tony Wei.
>> > > log.log>
>>
>>
>
>


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Stefan Richter
Hi,

thanks for all the info. I had a look into the problem and opened 
https://issues.apache.org/jira/browse/FLINK-8871 
 to fix this. From your stack 
trace, you can see many checkpointing threads are running on your TM for 
checkpoints that have already timed out, and I think this cascades and slows 
down everything. Seems like the implementation of some features like checkpoint 
timeouts and not failing tasks from checkpointing problems overlooked that we 
also require to properly communicate that checkpoint cancellation to all task, 
which was not needed before.

Best,
Stefan

> Am 05.03.2018 um 14:42 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> Here is my checkpointing configuration.
> 
> Checkpointing ModeExactly Once
> Interval  20m 0s
> Timeout   10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints1
> Persist Checkpoints ExternallyEnabled (delete on cancellation)
> Best Regards,
> Tony Wei
> 
> 2018-03-05 21:30 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> quick question: what is your exact checkpointing configuration? In 
> particular, what is your value for the maximum parallel checkpoints and the 
> minimum time interval to wait between two checkpoints?
> 
> Best,
> Stefan
> 
> > Am 05.03.2018 um 06:34 schrieb Tony Wei  > >:
> >
> > Hi all,
> >
> > Last weekend, my flink job's checkpoint start failing because of timeout. I 
> > have no idea what happened, but I collect some informations about my 
> > cluster and job. Hope someone can give me advices or hints about the 
> > problem that I encountered.
> >
> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 
> > cores. These machines are ec2 spot instances. The job's parallelism is set 
> > as 32, using rocksdb as state backend and s3 presto as checkpoint file 
> > system.
> > The state's size is nearly 15gb and still grows day-by-day. Normally, It 
> > takes 1.5 mins to finish the whole checkpoint process. The timeout 
> > configuration is set as 10 mins.
> >
> > 
> >
> > As the picture shows, not each subtask of checkpoint broke caused by 
> > timeout, but each machine has ever broken for all its subtasks during last 
> > weekend. Some machines recovered by themselves and some machines recovered 
> > after I restarted them.
> >
> > I record logs, stack trace and snapshot for machine's status (CPU, IO, 
> > Network, etc.) for both good and bad machine. If there is a need for more 
> > informations, please let me know. Thanks in advance.
> >
> > Best Regards,
> > Tony Wei.
> > 
> 
> 



Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Stefan,

Here is my checkpointing configuration.

Checkpointing Mode Exactly Once
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (delete on cancellation)
Best Regards,
Tony Wei

2018-03-05 21:30 GMT+08:00 Stefan Richter :

> Hi,
>
> quick question: what is your exact checkpointing configuration? In
> particular, what is your value for the maximum parallel checkpoints and the
> minimum time interval to wait between two checkpoints?
>
> Best,
> Stefan
>
> > Am 05.03.2018 um 06:34 schrieb Tony Wei :
> >
> > Hi all,
> >
> > Last weekend, my flink job's checkpoint start failing because of
> timeout. I have no idea what happened, but I collect some informations
> about my cluster and job. Hope someone can give me advices or hints about
> the problem that I encountered.
> >
> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has
> 4 cores. These machines are ec2 spot instances. The job's parallelism is
> set as 32, using rocksdb as state backend and s3 presto as checkpoint file
> system.
> > The state's size is nearly 15gb and still grows day-by-day. Normally, It
> takes 1.5 mins to finish the whole checkpoint process. The timeout
> configuration is set as 10 mins.
> >
> > 
> >
> > As the picture shows, not each subtask of checkpoint broke caused by
> timeout, but each machine has ever broken for all its subtasks during last
> weekend. Some machines recovered by themselves and some machines recovered
> after I restarted them.
> >
> > I record logs, stack trace and snapshot for machine's status (CPU, IO,
> Network, etc.) for both good and bad machine. If there is a need for more
> informations, please let me know. Thanks in advance.
> >
> > Best Regards,
> > Tony Wei.
> >  tm_log.log>
>
>


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Stefan Richter
Hi,

quick question: what is your exact checkpointing configuration? In particular, 
what is your value for the maximum parallel checkpoints and the minimum time 
interval to wait between two checkpoints?

Best,
Stefan

> Am 05.03.2018 um 06:34 schrieb Tony Wei :
> 
> Hi all,
> 
> Last weekend, my flink job's checkpoint start failing because of timeout. I 
> have no idea what happened, but I collect some informations about my cluster 
> and job. Hope someone can give me advices or hints about the problem that I 
> encountered.
> 
> My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 
> cores. These machines are ec2 spot instances. The job's parallelism is set as 
> 32, using rocksdb as state backend and s3 presto as checkpoint file system.
> The state's size is nearly 15gb and still grows day-by-day. Normally, It 
> takes 1.5 mins to finish the whole checkpoint process. The timeout 
> configuration is set as 10 mins.
> 
> 
> 
> As the picture shows, not each subtask of checkpoint broke caused by timeout, 
> but each machine has ever broken for all its subtasks during last weekend. 
> Some machines recovered by themselves and some machines recovered after I 
> restarted them.
> 
> I record logs, stack trace and snapshot for machine's status (CPU, IO, 
> Network, etc.) for both good and bad machine. If there is a need for more 
> informations, please let me know. Thanks in advance.
> 
> Best Regards,
> Tony Wei.
> 



Re: RocksDB Statebackend

2016-04-13 Thread Konstantin Knauf
Hi Aljoscha,

thanks for your answers. I am currently not in the office, so I can not
run any further analysis until Monday. Just some quick answers to your
questions.

We are using the partitioned state abstraction, most of the state should
correspond to buffered events in windows. Parallelism is 9. In terms of
stateful operators we basically just have a KafkaSource, a custom
stateful trigger as well as a RollingSink. Overall in this test scenario
the state is very limited (see size of state using FsStateBackend).

I will get back to you once, I have done some more experiments, which
will be in the course of next week.

Cheers,

Konstantin


On 12.04.2016 18:41, Aljoscha Krettek wrote:
> Hi,
> I'm going to try and respond to each point:
> 
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are
> using the partitioned state abstraction, i.e. getState(), correct?
> 
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB
> stores data in on-disk files and goes to them for every state access (of
> course there are caches, but generally it is like this). I'm actually
> impressed that it is still this fast in comparison.
> 
> 3. see 1. (I think for now)
> 
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I
> have seen this before and I think it results from back pressure. The
> problem is that the checkpoint messages that we sent through the
> topology are sitting at the sources because they are also back pressured
> by the slow processing of normal records. You should be able to see the
> actual checkpointing times (both synchronous and asynchronous) in the
> log files of the task managers, they should be very much lower.
> 
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
> 
> Cheers,
> Aljoscha
> 
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf
> > wrote:
> 
> Hi everyone,
> 
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
> 
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be
> larger.)
> 
> These are my observations:
> 
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
> 
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
> 
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
> gets smaller for very large state. Can you confirm?
> 
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
> 
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
> 
> Cheers,
> 
> Konstantin
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: RocksDB Statebackend

2016-04-13 Thread Aljoscha Krettek
That's interesting to hear. If you want we can also collaborate on that
one. Using the Flink managed memory for that purpose would require some
changes to lower layers of Flink.

On Wed, 13 Apr 2016 at 13:11 Shannon Carey <sca...@expedia.com> wrote:

> This is something that my team and I have discussed building, so it's
> great to know that it's already on the radar. If we beat you to it, I'll
> definitely try to make it a contribution.
>
> Shannon
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Wednesday, April 13, 2016 at 1:46 PM
> To: <user@flink.apache.org>
> Subject: Re: RocksDB Statebackend
>
> Hi Maxim,
> yes the plan is to have a cache of hot values that uses the managed memory
> abstraction of Flink so that we can make sure that we stay within memory
> bounds and don't run into OOM exceptions.
>
> On Tue, 12 Apr 2016 at 23:37 Maxim <mfat...@gmail.com> wrote:
>
>> Is it possible to add an option to store the state in the Java HashMap
>> and write its content to RocksDB when checkpointing? For "hot" keys that
>> are updated very frequently such optimization would help with performance.
>>
>> I know that you are also working on incremental checkpoints which would
>> also be big win for jobs with a large number of keys.
>>
>> Thanks,
>>
>> Maxim.
>>
>> On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
>>> simply does not compact for a long time, thus having a lot of stale data in
>>> the snapshot.
>>>
>>> That would be especially the case, if you have a lot of changing values
>>> for the same set of keys.
>>>
>>> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> I'm going to try and respond to each point:
>>>>
>>>> 1. This seems strange, could you give some background on parallelism,
>>>> number of operators with state and so on? Also, I'm assuming you are using
>>>> the partitioned state abstraction, i.e. getState(), correct?
>>>>
>>>> 2. your observations are pretty much correct. The reason why RocksDB is
>>>> slower is that the FsStateBackend basically stores the state in a Java
>>>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>>>> data in on-disk files and goes to them for every state access (of course
>>>> there are caches, but generally it is like this). I'm actually impressed
>>>> that it is still this fast in comparison.
>>>>
>>>> 3. see 1. (I think for now)
>>>>
>>>> 4. The checkpointing time is the time from the JobManager deciding to
>>>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>>>> seen this before and I think it results from back pressure. The problem is
>>>> that the checkpoint messages that we sent through the topology are sitting
>>>> at the sources because they are also back pressured by the slow processing
>>>> of normal records. You should be able to see the actual checkpointing times
>>>> (both synchronous and asynchronous) in the log files of the task managers,
>>>> they should be very much lower.
>>>>
>>>> I can go into details, I'm just writing this quickly before calling it
>>>> a day. :-)
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>>>> konstantin.kn...@tngtech.com> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> my experience with RocksDBStatebackend have left me a little bit
>>>>> confused. Maybe you guys can confirm that my epxierence is the expected
>>>>> behaviour ;):
>>>>>
>>>>> I have run a "performancetest" twice, once with FsStateBackend and once
>>>>> RocksDBStatebackend in comparison. In this particular test the state
>>>>> saved is generally not large (in a production scenario it will be
>>>>> larger.)
>>>>>
>>>>> These are my observations:
>>>>>
>>>>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>>>>> to <<1MB with the FSStatebackend.
>>>>>
>>>>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>>>>
>>>>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>>>>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the
>>>>> difference
>>>>> gets smaller for very large state. Can you confirm?
>>>>>
>>>>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>>>>> RocksDB during the test and <1 second for FsStatebackend. Does the
>>>>> reported time correspond to the sync. + asynchronous part of the
>>>>> checkpointing in case of RocksDB? Is there any way to tell how long the
>>>>> synchronous part takes?
>>>>>
>>>>> Form these first observations RocksDB does seem to bring a large
>>>>> overhead for state < 1GB, I guess? Is this expected?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>
>>>
>>


Re: RocksDB Statebackend

2016-04-13 Thread Shannon Carey
This is something that my team and I have discussed building, so it's great to 
know that it's already on the radar. If we beat you to it, I'll definitely try 
to make it a contribution.

Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, April 13, 2016 at 1:46 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: RocksDB Statebackend

Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory 
abstraction of Flink so that we can make sure that we stay within memory bounds 
and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim 
<mfat...@gmail.com<mailto:mfat...@gmail.com>> wrote:
Is it possible to add an option to store the state in the Java HashMap and 
write its content to RocksDB when checkpointing? For "hot" keys that are 
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would also be 
big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply 
does not compact for a long time, thus having a lot of stale data in the 
snapshot.

That would be especially the case, if you have a lot of changing values for the 
same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism, number of 
operators with state and so on? Also, I'm assuming you are using the 
partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is slower 
is that the FsStateBackend basically stores the state in a Java HashMap and 
writes the contents to HDFS when checkpointing. RocksDB stores data in on-disk 
files and goes to them for every state access (of course there are caches, but 
generally it is like this). I'm actually impressed that it is still this fast 
in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start a 
checkpoint until all tasks have confirmed that checkpoint. I have seen this 
before and I think it results from back pressure. The problem is that the 
checkpoint messages that we sent through the topology are sitting at the 
sources because they are also back pressured by the slow processing of normal 
records. You should be able to see the actual checkpointing times (both 
synchronous and asynchronous) in the log files of the task managers, they 
should be very much lower.

I can go into details, I'm just writing this quickly before calling it a day. 
:-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
<konstantin.kn...@tngtech.com<mailto:konstantin.kn...@tngtech.com>> wrote:
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin




Re: RocksDB Statebackend

2016-04-13 Thread Aljoscha Krettek
Hi Maxim,
yes the plan is to have a cache of hot values that uses the managed memory
abstraction of Flink so that we can make sure that we stay within memory
bounds and don't run into OOM exceptions.

On Tue, 12 Apr 2016 at 23:37 Maxim  wrote:

> Is it possible to add an option to store the state in the Java HashMap and
> write its content to RocksDB when checkpointing? For "hot" keys that are
> updated very frequently such optimization would help with performance.
>
> I know that you are also working on incremental checkpoints which would
> also be big win for jobs with a large number of keys.
>
> Thanks,
>
> Maxim.
>
> On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:
>
>> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
>> simply does not compact for a long time, thus having a lot of stale data in
>> the snapshot.
>>
>> That would be especially the case, if you have a lot of changing values
>> for the same set of keys.
>>
>> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I'm going to try and respond to each point:
>>>
>>> 1. This seems strange, could you give some background on parallelism,
>>> number of operators with state and so on? Also, I'm assuming you are using
>>> the partitioned state abstraction, i.e. getState(), correct?
>>>
>>> 2. your observations are pretty much correct. The reason why RocksDB is
>>> slower is that the FsStateBackend basically stores the state in a Java
>>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>>> data in on-disk files and goes to them for every state access (of course
>>> there are caches, but generally it is like this). I'm actually impressed
>>> that it is still this fast in comparison.
>>>
>>> 3. see 1. (I think for now)
>>>
>>> 4. The checkpointing time is the time from the JobManager deciding to
>>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>>> seen this before and I think it results from back pressure. The problem is
>>> that the checkpoint messages that we sent through the topology are sitting
>>> at the sources because they are also back pressured by the slow processing
>>> of normal records. You should be able to see the actual checkpointing times
>>> (both synchronous and asynchronous) in the log files of the task managers,
>>> they should be very much lower.
>>>
>>> I can go into details, I'm just writing this quickly before calling it a
>>> day. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>>> konstantin.kn...@tngtech.com> wrote:
>>>
 Hi everyone,

 my experience with RocksDBStatebackend have left me a little bit
 confused. Maybe you guys can confirm that my epxierence is the expected
 behaviour ;):

 I have run a "performancetest" twice, once with FsStateBackend and once
 RocksDBStatebackend in comparison. In this particular test the state
 saved is generally not large (in a production scenario it will be
 larger.)

 These are my observations:

 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
 to <<1MB with the FSStatebackend.

 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
 FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
 gets smaller for very large state. Can you confirm?

 4. Checkpointing Times as reported in the Dashboard were 26secs for
 RocksDB during the test and <1 second for FsStatebackend. Does the
 reported time correspond to the sync. + asynchronous part of the
 checkpointing in case of RocksDB? Is there any way to tell how long the
 synchronous part takes?

 Form these first observations RocksDB does seem to bring a large
 overhead for state < 1GB, I guess? Is this expected?

 Cheers,

 Konstantin

>>>
>>
>


Re: RocksDB Statebackend

2016-04-12 Thread Maxim
Is it possible to add an option to store the state in the Java HashMap and
write its content to RocksDB when checkpointing? For "hot" keys that are
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would
also be big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:

> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
> simply does not compact for a long time, thus having a lot of stale data in
> the snapshot.
>
> That would be especially the case, if you have a lot of changing values
> for the same set of keys.
>
> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I'm going to try and respond to each point:
>>
>> 1. This seems strange, could you give some background on parallelism,
>> number of operators with state and so on? Also, I'm assuming you are using
>> the partitioned state abstraction, i.e. getState(), correct?
>>
>> 2. your observations are pretty much correct. The reason why RocksDB is
>> slower is that the FsStateBackend basically stores the state in a Java
>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>> data in on-disk files and goes to them for every state access (of course
>> there are caches, but generally it is like this). I'm actually impressed
>> that it is still this fast in comparison.
>>
>> 3. see 1. (I think for now)
>>
>> 4. The checkpointing time is the time from the JobManager deciding to
>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>> seen this before and I think it results from back pressure. The problem is
>> that the checkpoint messages that we sent through the topology are sitting
>> at the sources because they are also back pressured by the slow processing
>> of normal records. You should be able to see the actual checkpointing times
>> (both synchronous and asynchronous) in the log files of the task managers,
>> they should be very much lower.
>>
>> I can go into details, I'm just writing this quickly before calling it a
>> day. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>>
>>> Hi everyone,
>>>
>>> my experience with RocksDBStatebackend have left me a little bit
>>> confused. Maybe you guys can confirm that my epxierence is the expected
>>> behaviour ;):
>>>
>>> I have run a "performancetest" twice, once with FsStateBackend and once
>>> RocksDBStatebackend in comparison. In this particular test the state
>>> saved is generally not large (in a production scenario it will be
>>> larger.)
>>>
>>> These are my observations:
>>>
>>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>>> to <<1MB with the FSStatebackend.
>>>
>>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>>
>>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>>> gets smaller for very large state. Can you confirm?
>>>
>>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>>> RocksDB during the test and <1 second for FsStatebackend. Does the
>>> reported time correspond to the sync. + asynchronous part of the
>>> checkpointing in case of RocksDB? Is there any way to tell how long the
>>> synchronous part takes?
>>>
>>> Form these first observations RocksDB does seem to bring a large
>>> overhead for state < 1GB, I guess? Is this expected?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>
>


Re: RocksDB Statebackend

2016-04-12 Thread Stephan Ewen
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply
does not compact for a long time, thus having a lot of stale data in the
snapshot.

That would be especially the case, if you have a lot of changing values for
the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
wrote:

> Hi,
> I'm going to try and respond to each point:
>
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are using
> the partitioned state abstraction, i.e. getState(), correct?
>
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
> data in on-disk files and goes to them for every state access (of course
> there are caches, but generally it is like this). I'm actually impressed
> that it is still this fast in comparison.
>
> 3. see 1. (I think for now)
>
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I have
> seen this before and I think it results from back pressure. The problem is
> that the checkpoint messages that we sent through the topology are sitting
> at the sources because they are also back pressured by the slow processing
> of normal records. You should be able to see the actual checkpointing times
> (both synchronous and asynchronous) in the log files of the task managers,
> they should be very much lower.
>
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
>
> Cheers,
> Aljoscha
>
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
>
>> Hi everyone,
>>
>> my experience with RocksDBStatebackend have left me a little bit
>> confused. Maybe you guys can confirm that my epxierence is the expected
>> behaviour ;):
>>
>> I have run a "performancetest" twice, once with FsStateBackend and once
>> RocksDBStatebackend in comparison. In this particular test the state
>> saved is generally not large (in a production scenario it will be larger.)
>>
>> These are my observations:
>>
>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>> to <<1MB with the FSStatebackend.
>>
>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>
>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>> gets smaller for very large state. Can you confirm?
>>
>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>> RocksDB during the test and <1 second for FsStatebackend. Does the
>> reported time correspond to the sync. + asynchronous part of the
>> checkpointing in case of RocksDB? Is there any way to tell how long the
>> synchronous part takes?
>>
>> Form these first observations RocksDB does seem to bring a large
>> overhead for state < 1GB, I guess? Is this expected?
>>
>> Cheers,
>>
>> Konstantin
>>
>


Re: RocksDB Statebackend

2016-04-12 Thread Aljoscha Krettek
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism,
number of operators with state and so on? Also, I'm assuming you are using
the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is
slower is that the FsStateBackend basically stores the state in a Java
HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
data in on-disk files and goes to them for every state access (of course
there are caches, but generally it is like this). I'm actually impressed
that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start
a checkpoint until all tasks have confirmed that checkpoint. I have seen
this before and I think it results from back pressure. The problem is that
the checkpoint messages that we sent through the topology are sitting at
the sources because they are also back pressured by the slow processing of
normal records. You should be able to see the actual checkpointing times
(both synchronous and asynchronous) in the log files of the task managers,
they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a
day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
wrote:

> Hi everyone,
>
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
>
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be larger.)
>
> These are my observations:
>
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
>
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
> gets smaller for very large state. Can you confirm?
>
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
>
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
>
> Cheers,
>
> Konstantin
>


RocksDB Statebackend

2016-04-12 Thread Konstantin Knauf
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin