Agree.
On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel <[email protected]> wrote: > Sorry, I had replied to the user list ! > > Begin forwarded message: > > > From: Matthieu Morel <[email protected]> > > Subject: Re: Using Helix for cluster management of S4 > > Date: November 30, 2012 7:56:39 PM GMT+01:00 > > To: [email protected] > > > > > > Regarding redistribution of partitions with stateful PEs, another simple > solution is to activate checkpointing. > > > > Then when you need to repartition: > > - on the origin node, upon a rebalancing request for partition p, you > simply invalidate PE instances from partition p from the local cache (and > trigger checkpointing if necessary) > > - on the destination node, there is nothing to do, state for PEs > belonging to partition p is lazily reloaded (i.e. when necessary). > > > > Of course there are other solutions, such as manually snapshotting to a > data store, but that may be more complex. > > > > Matthieu > > > > On Nov 30, 2012, at 7:44 PM, kishore g wrote: > > > >> > >> Matt, > >> > >> Thats right the key idea is to over partition the stream( number of > partitions higher than the number of nodes). > >> > >> Helix supports two modes, when a new node is added it automatically > moves partitions. We call this AUTO_REBALANCE. This is recommended for > stateless tasks. Another mode is SEMI_AUTO where you can change the > topology using admin command. So what one would do is add a new node and > then rebalance the cluster by invoking an Helix Api, Helix will then > re-distribute partitions and as I said earlier will minimize the movement > and co-ordinate the movement. When I say co-ordinate, it will first ask the > old leader to stop processing the partition. It can then snapshot its > state. Once thats done Helix will ask the new node to host the partition, > where it can load the snapshot. I will add this example to the walk through > instruction. > >> > >> Daniel, > >> > >> Yes that is possible, I think createTask and deployApp commands already > take an optional parameter to list a subset of nodes but I think I have > only implemented it for deployApp. Adding it for createTask( which assigns > stream processing to s4 nodes) is straight forward. > >> > >> Matthieu mentioned that in the Readme instructions the adapter command > is invoking the old code. I will make that change in some time. If you are > trying this now, then run GenericEventAdapter from eclipse directly. ( The > same options hold good). > >> > >> Yes JIRA is 110. I will add the description. I am pretty sure there > will be issues :-) > >> > >> > >> > >> On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro < > [email protected]> wrote: > >> I agree with Matthieu, that's a really nice integration! > >> > >> I particularly like having different partition schemes per stream. I > guess it would be easy (or at least possible) to implement some kind of > isolation where only a subset of nodes handles a specific stream, for > example (related to S4-91). > >> > >> It looks really nice, I'm looking forward to trying it. I'll give more > feedback if I run into any issues. I guess the right JIRA for that would be > S4-110, right? (It's missing a description!) > >> > >> Good job! > >> > >> Daniel > >> > >> > >> On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote: > >> Thanks Kishore, that's a very interesting contribution! > >> > >> It's also very appropriate considering that S4 is completely > decentralized and that there is no driving/scheduling entity: the logic is > within the nodes. So it's nice to have a way to easily describe and define > coordinated behaviors, and to easily automate them. > >> > >> About the partitioning, the key here as I understand it, is to have a > number of partitions higher than the number of nodes by default, possibly > several times higher. So a given node is assigned multiple partitions. (In > contrast, until now in S4, nb partitions <= nb nodes, including standby > nodes). > >> > >> In the canonical example that you provide, how do we proceed if we want > to add another s4 node? That's not clear to me, and it would help > understand how partitions are reassigned. > >> > >> Thanks! > >> > >> Matthieu > >> > >> > >> > >> > >> In S4 the number of partition is fixed for all streams and is dependent > on > >> the number of nodes in the cluster. Adding new nodes to S4 cluster > causes > >> the number of partitions to change. This results in lot of data > movement. > >> For example if there are 4 nodes and you add another node then nearly > all > >> keys will be remapped which result is huge data movement where as > ideally > >> only 20% of the data should move. > >> > >> By using Helix, every stream can be partitioned differently and > >> independent of the number of nodes. Helix distributes the partitions > evenly > >> among the nodes. When new nodes are added, partitions can be migrated to > >> new nodes without changing the number of partitions and minimizes the > data > >> movement. > >> > >> In S4 handles failures by having stand by nodes that are idle most of > the > >> time and become active when a node fails. Even though this works, its > not > >> ideal in terms of efficient hardware usage since the stand by nodes are > >> idle most of the time. This also increases the fail over time since the > PE > >> state has to be transfered to only one node. > >> > >> Helix allows S4 to have Active and Standby nodes at a partition level so > >> that all nodes can be active but some partitions will be Active and > some in > >> stand by mode. When a node fails, the partitions that were Active on > that > >> node will be evenly distributed among the remaining nodes. This provides > >> automatic load balancing and also improves fail over time, since PE > state > >> can be transfered to multiple nodes in parallel. > >> > >> I have a prototype implementation here > >> https://github.com/kishoreg/incubator-s4 > >> > >> Instructions to build it and try it out are in the Readme. > >> > >> More info on Helix can be found here, > http://helix.incubator.apache.org/ > >> > >> Helix can provide lot of other functionalities like > >> > >> 1. Configure the topology according to use case. For example, > co-locate > >> the partitions of different streams to allow efficient joins. > Configure the > >> number of standby for each partition based on the head room > available. > >> 2. When new nodes are added, it can throttle the data movement > >> 3. Comes with large set of admin tools like enable/disable node, > >> dynamically change the topology etc. Provides a rest interface to > manage > >> the cluster. > >> 4. Allows one to schedule custom tasks like snapshot the PE's in a > >> partition and restore from the snapshot. > >> > >> > >> Would like to get your feedback. > >> > >> Thanks, > >> Kishore G > >> > >> > > > >
