Re: Flink AutoScaling EMR

2020-11-16 Thread Rex Fenley
Thanks for all the input!

On Sun, Nov 15, 2020 at 6:59 PM Xintong Song  wrote:

> Is there a way to make the new yarn job only on the new hardware?
>
> I think you can simply decommission the nodes from Yarn, so that new
> containers will not be allocated from those nodes. You might also need a
> large decommission timeout, upon which all the remaining running contains
> on the decommissioning node will be killed.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Nov 13, 2020 at 2:57 PM Robert Metzger 
> wrote:
>
>> Hi,
>> it seems that YARN has a feature for targeting specific hardware:
>> https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
>> In any case, you'll need enough spare resources for some time to be able
>> to run your job twice for this kind of "zero downtime handover"
>>
>> On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley  wrote:
>>
>>> Awesome, thanks! Is there a way to make the new yarn job only on the new
>>> hardware? Or would the two jobs have to run on intersecting hardware and
>>> then would be switched on/off, which means we'll need a buffer of resources
>>> for our orchestration?
>>>
>>> Also, good point on recovery. I'll spend some time looking into this.
>>>
>>> Thanks
>>>
>>>
>>> On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger 
>>> wrote:
>>>
 Hey Rex,

 the second approach (spinning up a standby job and then doing a
 handover) sounds more promising to implement, without rewriting half of the
 Flink codebase ;)
 What you need is a tool that orchestrates creating a savepoint,
 starting a second job from the savepoint and then communicating with a
 custom sink implementation that can be switched on/off in the two jobs.
 With that approach, you should have almost no downtime, just increased
 resource requirements during such a handover.

 What you need to consider as well is that this handover process only
 works for scheduled maintenance. If you have a system failure, you'll have
 downtime until the last checkpoint is restored.
 If you are trying to reduce the potential downtime overall, I would
 rather recommend optimizing the checkpoint restore time, as this can cover
 both scheduled maintenance and system failures.

 Best,
 Robert





 On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:

> Another thought, would it be possible to
> * Spin up new core or task nodes.
> * Run a new copy of the same job on these new nodes from a savepoint.
> * Have the new job *not* write to the sink until the other job is
> torn down?
>
> This would allow us to be eventually consistent and maintain writes
> going through without downtime. As long as whatever is buffering the sink
> doesn't run out of space it should work just fine.
>
> We're hoping to achieve consistency in less than 30s ideally.
>
> Again though, if we could get savepoints to restore in less than 30s
> that would likely be sufficient for our purposes.
>
> Thanks!
>
> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I'm trying to find a solution for auto scaling our Flink EMR cluster
>> with 0 downtime using RocksDB as state storage and S3 backing store.
>>
>> My current thoughts are like so:
>> * Scaling an Operator dynamically would require all keyed state to be
>> available to the set of subtasks for that operator, therefore a set of
>> subtasks must be reading to and writing from the same RocksDB. I.e. to
>> scale in and out subtasks in that set, they need to read from the same
>> Rocks.
>> * Since subtasks can run on different core nodes, is it possible to
>> have different core nodes read/write to the same RocksDB?
>> * When's the safe point to scale in and out an operator? Only right
>> after a checkpoint possibly?
>>
>> If the above is not possible then we'll have to use save points which
>> means some downtime, therefore:
>> * Scaling out during high traffic is arguably more important to react
>> quickly to than scaling in during low traffic. Is it possible to add more
>> core nodes to EMR without disturbing a job? If so then maybe we can
>> orchestrate running a new job on new nodes and then loading a savepoint
>> from a currently running job.
>>
>> Lastly
>> * Save Points for ~70Gib of data take on the order of minutes to tens
>> of minutes for us to restore from, is there any way to speed up 
>> restoration?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG
>>   |  FOLLOW US
>>   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  

Re: Flink AutoScaling EMR

2020-11-15 Thread Xintong Song
>
> Is there a way to make the new yarn job only on the new hardware?

I think you can simply decommission the nodes from Yarn, so that new
containers will not be allocated from those nodes. You might also need a
large decommission timeout, upon which all the remaining running contains
on the decommissioning node will be killed.

Thank you~

Xintong Song



On Fri, Nov 13, 2020 at 2:57 PM Robert Metzger  wrote:

> Hi,
> it seems that YARN has a feature for targeting specific hardware:
> https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
> In any case, you'll need enough spare resources for some time to be able
> to run your job twice for this kind of "zero downtime handover"
>
> On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley  wrote:
>
>> Awesome, thanks! Is there a way to make the new yarn job only on the new
>> hardware? Or would the two jobs have to run on intersecting hardware and
>> then would be switched on/off, which means we'll need a buffer of resources
>> for our orchestration?
>>
>> Also, good point on recovery. I'll spend some time looking into this.
>>
>> Thanks
>>
>>
>> On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger 
>> wrote:
>>
>>> Hey Rex,
>>>
>>> the second approach (spinning up a standby job and then doing a
>>> handover) sounds more promising to implement, without rewriting half of the
>>> Flink codebase ;)
>>> What you need is a tool that orchestrates creating a savepoint, starting
>>> a second job from the savepoint and then communicating with a custom sink
>>> implementation that can be switched on/off in the two jobs.
>>> With that approach, you should have almost no downtime, just increased
>>> resource requirements during such a handover.
>>>
>>> What you need to consider as well is that this handover process only
>>> works for scheduled maintenance. If you have a system failure, you'll have
>>> downtime until the last checkpoint is restored.
>>> If you are trying to reduce the potential downtime overall, I would
>>> rather recommend optimizing the checkpoint restore time, as this can cover
>>> both scheduled maintenance and system failures.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:
>>>
 Another thought, would it be possible to
 * Spin up new core or task nodes.
 * Run a new copy of the same job on these new nodes from a savepoint.
 * Have the new job *not* write to the sink until the other job is torn
 down?

 This would allow us to be eventually consistent and maintain writes
 going through without downtime. As long as whatever is buffering the sink
 doesn't run out of space it should work just fine.

 We're hoping to achieve consistency in less than 30s ideally.

 Again though, if we could get savepoints to restore in less than 30s
 that would likely be sufficient for our purposes.

 Thanks!

 On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:

> Hello,
>
> I'm trying to find a solution for auto scaling our Flink EMR cluster
> with 0 downtime using RocksDB as state storage and S3 backing store.
>
> My current thoughts are like so:
> * Scaling an Operator dynamically would require all keyed state to be
> available to the set of subtasks for that operator, therefore a set of
> subtasks must be reading to and writing from the same RocksDB. I.e. to
> scale in and out subtasks in that set, they need to read from the same
> Rocks.
> * Since subtasks can run on different core nodes, is it possible to
> have different core nodes read/write to the same RocksDB?
> * When's the safe point to scale in and out an operator? Only right
> after a checkpoint possibly?
>
> If the above is not possible then we'll have to use save points which
> means some downtime, therefore:
> * Scaling out during high traffic is arguably more important to react
> quickly to than scaling in during low traffic. Is it possible to add more
> core nodes to EMR without disturbing a job? If so then maybe we can
> orchestrate running a new job on new nodes and then loading a savepoint
> from a currently running job.
>
> Lastly
> * Save Points for ~70Gib of data take on the order of minutes to tens
> of minutes for us to restore from, is there any way to speed up 
> restoration?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG 
>  |  FOLLOW US   |  LIKE US
> 
>


 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US
 

Re: Flink AutoScaling EMR

2020-11-12 Thread Robert Metzger
Hi,
it seems that YARN has a feature for targeting specific hardware:
https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
In any case, you'll need enough spare resources for some time to be able to
run your job twice for this kind of "zero downtime handover"

On Thu, Nov 12, 2020 at 6:10 PM Rex Fenley  wrote:

> Awesome, thanks! Is there a way to make the new yarn job only on the new
> hardware? Or would the two jobs have to run on intersecting hardware and
> then would be switched on/off, which means we'll need a buffer of resources
> for our orchestration?
>
> Also, good point on recovery. I'll spend some time looking into this.
>
> Thanks
>
>
> On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger 
> wrote:
>
>> Hey Rex,
>>
>> the second approach (spinning up a standby job and then doing a handover)
>> sounds more promising to implement, without rewriting half of the Flink
>> codebase ;)
>> What you need is a tool that orchestrates creating a savepoint, starting
>> a second job from the savepoint and then communicating with a custom sink
>> implementation that can be switched on/off in the two jobs.
>> With that approach, you should have almost no downtime, just increased
>> resource requirements during such a handover.
>>
>> What you need to consider as well is that this handover process only
>> works for scheduled maintenance. If you have a system failure, you'll have
>> downtime until the last checkpoint is restored.
>> If you are trying to reduce the potential downtime overall, I would
>> rather recommend optimizing the checkpoint restore time, as this can cover
>> both scheduled maintenance and system failures.
>>
>> Best,
>> Robert
>>
>>
>>
>>
>>
>> On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:
>>
>>> Another thought, would it be possible to
>>> * Spin up new core or task nodes.
>>> * Run a new copy of the same job on these new nodes from a savepoint.
>>> * Have the new job *not* write to the sink until the other job is torn
>>> down?
>>>
>>> This would allow us to be eventually consistent and maintain writes
>>> going through without downtime. As long as whatever is buffering the sink
>>> doesn't run out of space it should work just fine.
>>>
>>> We're hoping to achieve consistency in less than 30s ideally.
>>>
>>> Again though, if we could get savepoints to restore in less than 30s
>>> that would likely be sufficient for our purposes.
>>>
>>> Thanks!
>>>
>>> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>>>
 Hello,

 I'm trying to find a solution for auto scaling our Flink EMR cluster
 with 0 downtime using RocksDB as state storage and S3 backing store.

 My current thoughts are like so:
 * Scaling an Operator dynamically would require all keyed state to be
 available to the set of subtasks for that operator, therefore a set of
 subtasks must be reading to and writing from the same RocksDB. I.e. to
 scale in and out subtasks in that set, they need to read from the same
 Rocks.
 * Since subtasks can run on different core nodes, is it possible to
 have different core nodes read/write to the same RocksDB?
 * When's the safe point to scale in and out an operator? Only right
 after a checkpoint possibly?

 If the above is not possible then we'll have to use save points which
 means some downtime, therefore:
 * Scaling out during high traffic is arguably more important to react
 quickly to than scaling in during low traffic. Is it possible to add more
 core nodes to EMR without disturbing a job? If so then maybe we can
 orchestrate running a new job on new nodes and then loading a savepoint
 from a currently running job.

 Lastly
 * Save Points for ~70Gib of data take on the order of minutes to tens
 of minutes for us to restore from, is there any way to speed up 
 restoration?

 Thanks!

 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US
 

>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Flink AutoScaling EMR

2020-11-12 Thread Rex Fenley
Awesome, thanks! Is there a way to make the new yarn job only on the new
hardware? Or would the two jobs have to run on intersecting hardware and
then would be switched on/off, which means we'll need a buffer of resources
for our orchestration?

Also, good point on recovery. I'll spend some time looking into this.

Thanks


On Wed, Nov 11, 2020 at 11:53 PM Robert Metzger  wrote:

> Hey Rex,
>
> the second approach (spinning up a standby job and then doing a handover)
> sounds more promising to implement, without rewriting half of the Flink
> codebase ;)
> What you need is a tool that orchestrates creating a savepoint, starting a
> second job from the savepoint and then communicating with a custom sink
> implementation that can be switched on/off in the two jobs.
> With that approach, you should have almost no downtime, just increased
> resource requirements during such a handover.
>
> What you need to consider as well is that this handover process only works
> for scheduled maintenance. If you have a system failure, you'll have
> downtime until the last checkpoint is restored.
> If you are trying to reduce the potential downtime overall, I would rather
> recommend optimizing the checkpoint restore time, as this can cover both
> scheduled maintenance and system failures.
>
> Best,
> Robert
>
>
>
>
>
> On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:
>
>> Another thought, would it be possible to
>> * Spin up new core or task nodes.
>> * Run a new copy of the same job on these new nodes from a savepoint.
>> * Have the new job *not* write to the sink until the other job is torn
>> down?
>>
>> This would allow us to be eventually consistent and maintain writes going
>> through without downtime. As long as whatever is buffering the sink doesn't
>> run out of space it should work just fine.
>>
>> We're hoping to achieve consistency in less than 30s ideally.
>>
>> Again though, if we could get savepoints to restore in less than 30s that
>> would likely be sufficient for our purposes.
>>
>> Thanks!
>>
>> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> I'm trying to find a solution for auto scaling our Flink EMR cluster
>>> with 0 downtime using RocksDB as state storage and S3 backing store.
>>>
>>> My current thoughts are like so:
>>> * Scaling an Operator dynamically would require all keyed state to be
>>> available to the set of subtasks for that operator, therefore a set of
>>> subtasks must be reading to and writing from the same RocksDB. I.e. to
>>> scale in and out subtasks in that set, they need to read from the same
>>> Rocks.
>>> * Since subtasks can run on different core nodes, is it possible to have
>>> different core nodes read/write to the same RocksDB?
>>> * When's the safe point to scale in and out an operator? Only right
>>> after a checkpoint possibly?
>>>
>>> If the above is not possible then we'll have to use save points which
>>> means some downtime, therefore:
>>> * Scaling out during high traffic is arguably more important to react
>>> quickly to than scaling in during low traffic. Is it possible to add more
>>> core nodes to EMR without disturbing a job? If so then maybe we can
>>> orchestrate running a new job on new nodes and then loading a savepoint
>>> from a currently running job.
>>>
>>> Lastly
>>> * Save Points for ~70Gib of data take on the order of minutes to tens of
>>> minutes for us to restore from, is there any way to speed up restoration?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink AutoScaling EMR

2020-11-11 Thread Robert Metzger
Hey Rex,

the second approach (spinning up a standby job and then doing a handover)
sounds more promising to implement, without rewriting half of the Flink
codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a
second job from the savepoint and then communicating with a custom sink
implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased
resource requirements during such a handover.

What you need to consider as well is that this handover process only works
for scheduled maintenance. If you have a system failure, you'll have
downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather
recommend optimizing the checkpoint restore time, as this can cover both
scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:

> Another thought, would it be possible to
> * Spin up new core or task nodes.
> * Run a new copy of the same job on these new nodes from a savepoint.
> * Have the new job *not* write to the sink until the other job is torn
> down?
>
> This would allow us to be eventually consistent and maintain writes going
> through without downtime. As long as whatever is buffering the sink doesn't
> run out of space it should work just fine.
>
> We're hoping to achieve consistency in less than 30s ideally.
>
> Again though, if we could get savepoints to restore in less than 30s that
> would likely be sufficient for our purposes.
>
> Thanks!
>
> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I'm trying to find a solution for auto scaling our Flink EMR cluster with
>> 0 downtime using RocksDB as state storage and S3 backing store.
>>
>> My current thoughts are like so:
>> * Scaling an Operator dynamically would require all keyed state to be
>> available to the set of subtasks for that operator, therefore a set of
>> subtasks must be reading to and writing from the same RocksDB. I.e. to
>> scale in and out subtasks in that set, they need to read from the same
>> Rocks.
>> * Since subtasks can run on different core nodes, is it possible to have
>> different core nodes read/write to the same RocksDB?
>> * When's the safe point to scale in and out an operator? Only right after
>> a checkpoint possibly?
>>
>> If the above is not possible then we'll have to use save points which
>> means some downtime, therefore:
>> * Scaling out during high traffic is arguably more important to react
>> quickly to than scaling in during low traffic. Is it possible to add more
>> core nodes to EMR without disturbing a job? If so then maybe we can
>> orchestrate running a new job on new nodes and then loading a savepoint
>> from a currently running job.
>>
>> Lastly
>> * Save Points for ~70Gib of data take on the order of minutes to tens of
>> minutes for us to restore from, is there any way to speed up restoration?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Flink AutoScaling EMR

2020-11-11 Thread Rex Fenley
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job *not* write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going
through without downtime. As long as whatever is buffering the sink doesn't
run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that
would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:

> Hello,
>
> I'm trying to find a solution for auto scaling our Flink EMR cluster with
> 0 downtime using RocksDB as state storage and S3 backing store.
>
> My current thoughts are like so:
> * Scaling an Operator dynamically would require all keyed state to be
> available to the set of subtasks for that operator, therefore a set of
> subtasks must be reading to and writing from the same RocksDB. I.e. to
> scale in and out subtasks in that set, they need to read from the same
> Rocks.
> * Since subtasks can run on different core nodes, is it possible to have
> different core nodes read/write to the same RocksDB?
> * When's the safe point to scale in and out an operator? Only right after
> a checkpoint possibly?
>
> If the above is not possible then we'll have to use save points which
> means some downtime, therefore:
> * Scaling out during high traffic is arguably more important to react
> quickly to than scaling in during low traffic. Is it possible to add more
> core nodes to EMR without disturbing a job? If so then maybe we can
> orchestrate running a new job on new nodes and then loading a savepoint
> from a currently running job.
>
> Lastly
> * Save Points for ~70Gib of data take on the order of minutes to tens of
> minutes for us to restore from, is there any way to speed up restoration?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Flink AutoScaling EMR

2020-11-11 Thread Rex Fenley
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0
downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be
available to the set of subtasks for that operator, therefore a set of
subtasks must be reading to and writing from the same RocksDB. I.e. to
scale in and out subtasks in that set, they need to read from the same
Rocks.
* Since subtasks can run on different core nodes, is it possible to have
different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a
checkpoint possibly?

If the above is not possible then we'll have to use save points which means
some downtime, therefore:
* Scaling out during high traffic is arguably more important to react
quickly to than scaling in during low traffic. Is it possible to add more
core nodes to EMR without disturbing a job? If so then maybe we can
orchestrate running a new job on new nodes and then loading a savepoint
from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of
minutes for us to restore from, is there any way to speed up restoration?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US