Daniel, did you get a chance to try it out. I have fixed some issues in the emitter code to use the partitioning per stream. Would love to hear your comments.
I will create a branch S4-110 with my changes. On Fri, Nov 30, 2012 at 2:06 PM, kishore g <[email protected]> wrote: > Agree. > > > On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel <[email protected]>wrote: > >> Sorry, I had replied to the user list ! >> >> Begin forwarded message: >> >> > From: Matthieu Morel <[email protected]> >> > Subject: Re: Using Helix for cluster management of S4 >> > Date: November 30, 2012 7:56:39 PM GMT+01:00 >> > To: [email protected] >> > >> > >> > Regarding redistribution of partitions with stateful PEs, another >> simple solution is to activate checkpointing. >> > >> > Then when you need to repartition: >> > - on the origin node, upon a rebalancing request for partition p, you >> simply invalidate PE instances from partition p from the local cache (and >> trigger checkpointing if necessary) >> > - on the destination node, there is nothing to do, state for PEs >> belonging to partition p is lazily reloaded (i.e. when necessary). >> > >> > Of course there are other solutions, such as manually snapshotting to a >> data store, but that may be more complex. >> > >> > Matthieu >> > >> > On Nov 30, 2012, at 7:44 PM, kishore g wrote: >> > >> >> >> >> Matt, >> >> >> >> Thats right the key idea is to over partition the stream( number of >> partitions higher than the number of nodes). >> >> >> >> Helix supports two modes, when a new node is added it automatically >> moves partitions. We call this AUTO_REBALANCE. This is recommended for >> stateless tasks. Another mode is SEMI_AUTO where you can change the >> topology using admin command. So what one would do is add a new node and >> then rebalance the cluster by invoking an Helix Api, Helix will then >> re-distribute partitions and as I said earlier will minimize the movement >> and co-ordinate the movement. When I say co-ordinate, it will first ask the >> old leader to stop processing the partition. It can then snapshot its >> state. Once thats done Helix will ask the new node to host the partition, >> where it can load the snapshot. I will add this example to the walk through >> instruction. >> >> >> >> Daniel, >> >> >> >> Yes that is possible, I think createTask and deployApp commands >> already take an optional parameter to list a subset of nodes but I think I >> have only implemented it for deployApp. Adding it for createTask( which >> assigns stream processing to s4 nodes) is straight forward. >> >> >> >> Matthieu mentioned that in the Readme instructions the adapter command >> is invoking the old code. I will make that change in some time. If you are >> trying this now, then run GenericEventAdapter from eclipse directly. ( The >> same options hold good). >> >> >> >> Yes JIRA is 110. I will add the description. I am pretty sure there >> will be issues :-) >> >> >> >> >> >> >> >> On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro < >> [email protected]> wrote: >> >> I agree with Matthieu, that's a really nice integration! >> >> >> >> I particularly like having different partition schemes per stream. I >> guess it would be easy (or at least possible) to implement some kind of >> isolation where only a subset of nodes handles a specific stream, for >> example (related to S4-91). >> >> >> >> It looks really nice, I'm looking forward to trying it. I'll give more >> feedback if I run into any issues. I guess the right JIRA for that would be >> S4-110, right? (It's missing a description!) >> >> >> >> Good job! >> >> >> >> Daniel >> >> >> >> >> >> On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote: >> >> Thanks Kishore, that's a very interesting contribution! >> >> >> >> It's also very appropriate considering that S4 is completely >> decentralized and that there is no driving/scheduling entity: the logic is >> within the nodes. So it's nice to have a way to easily describe and define >> coordinated behaviors, and to easily automate them. >> >> >> >> About the partitioning, the key here as I understand it, is to have a >> number of partitions higher than the number of nodes by default, possibly >> several times higher. So a given node is assigned multiple partitions. (In >> contrast, until now in S4, nb partitions <= nb nodes, including standby >> nodes). >> >> >> >> In the canonical example that you provide, how do we proceed if we >> want to add another s4 node? That's not clear to me, and it would help >> understand how partitions are reassigned. >> >> >> >> Thanks! >> >> >> >> Matthieu >> >> >> >> >> >> >> >> >> >> In S4 the number of partition is fixed for all streams and is >> dependent on >> >> the number of nodes in the cluster. Adding new nodes to S4 cluster >> causes >> >> the number of partitions to change. This results in lot of data >> movement. >> >> For example if there are 4 nodes and you add another node then nearly >> all >> >> keys will be remapped which result is huge data movement where as >> ideally >> >> only 20% of the data should move. >> >> >> >> By using Helix, every stream can be partitioned differently and >> >> independent of the number of nodes. Helix distributes the partitions >> evenly >> >> among the nodes. When new nodes are added, partitions can be migrated >> to >> >> new nodes without changing the number of partitions and minimizes the >> data >> >> movement. >> >> >> >> In S4 handles failures by having stand by nodes that are idle most of >> the >> >> time and become active when a node fails. Even though this works, its >> not >> >> ideal in terms of efficient hardware usage since the stand by nodes are >> >> idle most of the time. This also increases the fail over time since >> the PE >> >> state has to be transfered to only one node. >> >> >> >> Helix allows S4 to have Active and Standby nodes at a partition level >> so >> >> that all nodes can be active but some partitions will be Active and >> some in >> >> stand by mode. When a node fails, the partitions that were Active on >> that >> >> node will be evenly distributed among the remaining nodes. This >> provides >> >> automatic load balancing and also improves fail over time, since PE >> state >> >> can be transfered to multiple nodes in parallel. >> >> >> >> I have a prototype implementation here >> >> https://github.com/kishoreg/incubator-s4 >> >> >> >> Instructions to build it and try it out are in the Readme. >> >> >> >> More info on Helix can be found here, >> http://helix.incubator.apache.org/ >> >> >> >> Helix can provide lot of other functionalities like >> >> >> >> 1. Configure the topology according to use case. For example, >> co-locate >> >> the partitions of different streams to allow efficient joins. >> Configure the >> >> number of standby for each partition based on the head room >> available. >> >> 2. When new nodes are added, it can throttle the data movement >> >> 3. Comes with large set of admin tools like enable/disable node, >> >> dynamically change the topology etc. Provides a rest interface to >> manage >> >> the cluster. >> >> 4. Allows one to schedule custom tasks like snapshot the PE's in a >> >> partition and restore from the snapshot. >> >> >> >> >> >> Would like to get your feedback. >> >> >> >> Thanks, >> >> Kishore G >> >> >> >> >> > >> >> >
