Sorry, I had replied to the user list ! 

Begin forwarded message:

> From: Matthieu Morel <[email protected]>
> Subject: Re: Using Helix for cluster management of S4
> Date: November 30, 2012 7:56:39 PM GMT+01:00
> To: [email protected]
> 
> 
> Regarding redistribution of partitions with stateful PEs, another simple 
> solution is to activate checkpointing.
> 
> Then when you need to repartition:
> - on the origin node, upon a rebalancing request for partition p, you simply 
> invalidate PE instances from partition p from the local cache (and trigger 
> checkpointing if necessary)
> - on the destination node, there is nothing to do, state for PEs belonging to 
> partition p is lazily reloaded (i.e. when necessary).
> 
> Of course there are other solutions, such as manually snapshotting to a data 
> store, but that may be more complex.
> 
> Matthieu
> 
> On Nov 30, 2012, at 7:44 PM, kishore g wrote:
> 
>> 
>> Matt, 
>> 
>> Thats right the key idea is to over partition the stream( number of 
>> partitions higher than the number of nodes).
>> 
>> Helix supports two modes, when a new node is added it automatically moves 
>> partitions. We call this AUTO_REBALANCE. This is recommended for stateless 
>> tasks. Another mode is SEMI_AUTO where you can change the topology using 
>> admin command. So what  one would do is add a new node and then rebalance 
>> the cluster by invoking an Helix Api, Helix will then re-distribute 
>> partitions and as I said earlier will minimize the movement and co-ordinate 
>> the movement. When I say co-ordinate, it will first ask the old leader to 
>> stop processing the partition. It can then snapshot its state. Once thats 
>> done Helix will ask the new node to host the partition, where it can load 
>> the snapshot. I will add this example to the walk through instruction.
>> 
>>  Daniel,
>> 
>> Yes that is possible, I think createTask and deployApp commands already take 
>> an optional parameter to list a subset of nodes but I think I have only 
>> implemented it for deployApp. Adding it for createTask( which assigns stream 
>> processing to s4 nodes) is straight forward. 
>> 
>> Matthieu mentioned that in the Readme instructions the adapter command is 
>> invoking the old code. I will make that change in some time. If you are 
>> trying this now, then run GenericEventAdapter from eclipse directly. ( The 
>> same options hold good).
>> 
>> Yes JIRA is 110. I will add the description. I am pretty sure there will be 
>> issues :-)
>> 
>> 
>> 
>> On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro <[email protected]> 
>> wrote:
>> I agree with Matthieu, that's a really nice integration!
>> 
>> I particularly like having different partition schemes per stream. I guess 
>> it would be easy (or at least possible) to implement some kind of isolation 
>> where only a subset of nodes handles a specific stream, for example (related 
>> to S4-91).
>> 
>> It looks really nice, I'm looking forward to trying it. I'll give more 
>> feedback if I run into any issues. I guess the right JIRA for that would be 
>> S4-110, right? (It's missing a description!)
>> 
>> Good job!
>> 
>> Daniel
>> 
>> 
>> On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote:
>> Thanks Kishore, that's a very interesting contribution!
>> 
>> It's also very appropriate considering that S4 is completely decentralized 
>> and that there is no driving/scheduling entity: the logic is within the 
>> nodes. So it's nice to have a way to easily describe and define coordinated 
>> behaviors, and to easily automate them.
>> 
>> About the partitioning, the key here as I understand it, is to have a number 
>> of partitions higher than the number of nodes by default, possibly several 
>> times higher. So a given node is assigned multiple partitions. (In contrast, 
>> until now in S4, nb partitions <= nb nodes, including standby nodes).
>> 
>> In the canonical example that you provide, how do we proceed if we want to 
>> add another s4 node? That's not clear to me, and it would help understand 
>> how partitions are reassigned.
>> 
>> Thanks!
>> 
>> Matthieu
>> 
>> 
>> 
>> 
>> In S4 the number of partition is fixed for all streams and is dependent on
>> the number of nodes in the cluster.  Adding new nodes to S4 cluster causes
>> the number of partitions to change. This results in lot of data movement.
>> For example if there are 4 nodes and you add another node then nearly all
>> keys will be remapped which result is huge data movement where as ideally
>> only 20% of the data should move.
>> 
>> By using Helix, every stream can be  partitioned differently and
>> independent of the number of nodes. Helix distributes the partitions evenly
>> among the nodes. When new nodes are added, partitions can be migrated to
>> new nodes without changing the number of partitions and  minimizes the data
>> movement.
>> 
>> In S4 handles failures by having stand by nodes that are idle most of the
>> time and become active when a node fails. Even though this works, its not
>> ideal in terms of efficient hardware usage since the stand by nodes are
>> idle most of the time. This also increases the fail over time since the PE
>> state has to be transfered to only one node.
>> 
>> Helix allows S4 to have Active and Standby nodes at a partition level so
>> that all nodes can be active but some partitions will be Active and some in
>> stand by mode. When a node fails, the partitions that were  Active on that
>> node will be evenly distributed among the remaining nodes. This provides
>> automatic load balancing and also improves fail over time, since PE state
>> can be transfered to multiple nodes in parallel.
>> 
>> I have a prototype implementation here
>> https://github.com/kishoreg/incubator-s4
>> 
>> Instructions to build it and try it out are in the Readme.
>> 
>> More info on Helix can be found here, http://helix.incubator.apache.org/
>> 
>> Helix can provide lot of other functionalities like
>> 
>>    1. Configure the topology according to use case. For example, co-locate
>>    the partitions of different streams to allow efficient joins. Configure 
>> the
>>    number of standby for each partition based on the head room available.
>>    2. When new nodes are added, it can throttle the data movement
>>    3. Comes with large set of admin tools like enable/disable node,
>>    dynamically change the topology etc. Provides a rest interface to manage
>>    the cluster.
>>    4. Allows one to schedule custom tasks like snapshot the PE's in a
>>    partition and restore from the snapshot.
>> 
>> 
>> Would like to get your feedback.
>> 
>> Thanks,
>> Kishore G
>> 
>> 
> 

Reply via email to