Hi Guys, Happy new year!. I pushed the changes to S4-110 branch. I have added the instruction to test addition of nodes. I added the support to tag nodes with a group name. This allows one to specify the node group name at task creatiion/app deployment. Does it makes sense ?
thanks, Kishore G On Wed, Dec 5, 2012 at 10:20 AM, kishore g <[email protected]> wrote: > You were brave enough to try them. You did the right thing for adding > nodes, disabling node and swapping nodes. For rebalancing App, we need to > change the DeployApp code to add the new node. The reason rebalance did not > work for App is its set as CUSTOM assignment, which means S4 owns the > mapping of Apps to Nodes. One way to work around this is to run DeployApp > again and it will deploy it to new nodes. > > Using Helix command directly will work but its better to provide s4 > specific commands for the commands we intend to support. > > Thanks again for trying it out. > > thanks, > Kishore G > > > On Wed, Dec 5, 2012 at 7:42 AM, Daniel Gómez Ferro <[email protected] > > wrote: > >> Hi Kishore, >> >> I just tried it and I think it is very promising! >> >> I followed the readme and everything worked (apart from the adapter which >> you mentioned). Then I wanted to make a few changes to the partitioning but >> I couldn't get it to work, probably because I don't understand Helix. I'm >> sure I wasn't supposed to do most of the things I did, but maybe it's >> valuable to you! :) >> >> First I added a new node using the helix-admin command and started the >> node. One of the partitions got assigned to the new node (and unassigned >> from the old one) but the new node didn't deploy the application. I tried >> doing a rebalance of myApp but that somehow messed the S4 configuration. >> >> If I configure the cluster for 3 nodes and start only 2, it works great, >> even after starting the 3rd one (it rebalances the partitions correctly) >> >> I also tried swaping two instances and that also worked (I think!). The >> only comment is that at first I had no idea how to disable an instance >> (it's done with --enable <cluster> <instance> false) >> >> My guess is that I shouldn't be using directly the helix-admin command. >> Do we need to provide custom "s4 <command>" for rebalancing, bringing down >> nodes, etc? >> >> I think the work is great, sorry I couldn't resist playing with it! It >> would be great to have a short guide on how to do a basic rebalancing >> operation and so on. >> >> Regards, >> >> Daniel >> >> >> On Tue Dec 4 20:00:04 2012, kishore g wrote: >> >>> Daniel, did you get a chance to try it out. I have fixed some issues in >>> the >>> emitter code to use the partitioning per stream. Would love to hear your >>> comments. >>> >>> I will create a branch S4-110 with my changes. >>> >>> >>> On Fri, Nov 30, 2012 at 2:06 PM, kishore g <[email protected]> wrote: >>> >>> Agree. >>>> >>>> >>>> On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel <[email protected] >>>> >wrote: >>>> >>>> Sorry, I had replied to the user list ! >>>>> >>>>> Begin forwarded message: >>>>> >>>>> From: Matthieu Morel <[email protected]> >>>>>> Subject: Re: Using Helix for cluster management of S4 >>>>>> Date: November 30, 2012 7:56:39 PM GMT+01:00 >>>>>> To: [email protected] >>>>>> >>>>>> >>>>>> Regarding redistribution of partitions with stateful PEs, another >>>>>> >>>>> simple solution is to activate checkpointing. >>>>> >>>>>> >>>>>> Then when you need to repartition: >>>>>> - on the origin node, upon a rebalancing request for partition p, you >>>>>> >>>>> simply invalidate PE instances from partition p from the local cache >>>>> (and >>>>> trigger checkpointing if necessary) >>>>> >>>>>> - on the destination node, there is nothing to do, state for PEs >>>>>> >>>>> belonging to partition p is lazily reloaded (i.e. when necessary). >>>>> >>>>>> >>>>>> Of course there are other solutions, such as manually snapshotting to >>>>>> a >>>>>> >>>>> data store, but that may be more complex. >>>>> >>>>>> >>>>>> Matthieu >>>>>> >>>>>> On Nov 30, 2012, at 7:44 PM, kishore g wrote: >>>>>> >>>>>> >>>>>>> Matt, >>>>>>> >>>>>>> Thats right the key idea is to over partition the stream( number of >>>>>>> >>>>>> partitions higher than the number of nodes). >>>>> >>>>>> >>>>>>> Helix supports two modes, when a new node is added it automatically >>>>>>> >>>>>> moves partitions. We call this AUTO_REBALANCE. This is recommended for >>>>> stateless tasks. Another mode is SEMI_AUTO where you can change the >>>>> topology using admin command. So what one would do is add a new node >>>>> and >>>>> then rebalance the cluster by invoking an Helix Api, Helix will then >>>>> re-distribute partitions and as I said earlier will minimize the >>>>> movement >>>>> and co-ordinate the movement. When I say co-ordinate, it will first >>>>> ask the >>>>> old leader to stop processing the partition. It can then snapshot its >>>>> state. Once thats done Helix will ask the new node to host the >>>>> partition, >>>>> where it can load the snapshot. I will add this example to the walk >>>>> through >>>>> instruction. >>>>> >>>>>> >>>>>>> Daniel, >>>>>>> >>>>>>> Yes that is possible, I think createTask and deployApp commands >>>>>>> >>>>>> already take an optional parameter to list a subset of nodes but I >>>>> think I >>>>> have only implemented it for deployApp. Adding it for createTask( which >>>>> assigns stream processing to s4 nodes) is straight forward. >>>>> >>>>>> >>>>>>> Matthieu mentioned that in the Readme instructions the adapter >>>>>>> command >>>>>>> >>>>>> is invoking the old code. I will make that change in some time. If >>>>> you are >>>>> trying this now, then run GenericEventAdapter from eclipse directly. ( >>>>> The >>>>> same options hold good). >>>>> >>>>>> >>>>>>> Yes JIRA is 110. I will add the description. I am pretty sure there >>>>>>> >>>>>> will be issues :-) >>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro < >>>>>>> >>>>>> [email protected]> wrote: >>>>> >>>>>> I agree with Matthieu, that's a really nice integration! >>>>>>> >>>>>>> I particularly like having different partition schemes per stream. I >>>>>>> >>>>>> guess it would be easy (or at least possible) to implement some kind >>>>> of >>>>> isolation where only a subset of nodes handles a specific stream, for >>>>> example (related to S4-91). >>>>> >>>>>> >>>>>>> It looks really nice, I'm looking forward to trying it. I'll give >>>>>>> more >>>>>>> >>>>>> feedback if I run into any issues. I guess the right JIRA for that >>>>> would be >>>>> S4-110, right? (It's missing a description!) >>>>> >>>>>> >>>>>>> Good job! >>>>>>> >>>>>>> Daniel >>>>>>> >>>>>>> >>>>>>> On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote: >>>>>>> Thanks Kishore, that's a very interesting contribution! >>>>>>> >>>>>>> It's also very appropriate considering that S4 is completely >>>>>>> >>>>>> decentralized and that there is no driving/scheduling entity: the >>>>> logic is >>>>> within the nodes. So it's nice to have a way to easily describe and >>>>> define >>>>> coordinated behaviors, and to easily automate them. >>>>> >>>>>> >>>>>>> About the partitioning, the key here as I understand it, is to have a >>>>>>> >>>>>> number of partitions higher than the number of nodes by default, >>>>> possibly >>>>> several times higher. So a given node is assigned multiple partitions. >>>>> (In >>>>> contrast, until now in S4, nb partitions <= nb nodes, including standby >>>>> nodes). >>>>> >>>>>> >>>>>>> In the canonical example that you provide, how do we proceed if we >>>>>>> >>>>>> want to add another s4 node? That's not clear to me, and it would help >>>>> understand how partitions are reassigned. >>>>> >>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Matthieu >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> In S4 the number of partition is fixed for all streams and is >>>>>>> >>>>>> dependent on >>>>> >>>>>> the number of nodes in the cluster. Adding new nodes to S4 cluster >>>>>>> >>>>>> causes >>>>> >>>>>> the number of partitions to change. This results in lot of data >>>>>>> >>>>>> movement. >>>>> >>>>>> For example if there are 4 nodes and you add another node then nearly >>>>>>> >>>>>> all >>>>> >>>>>> keys will be remapped which result is huge data movement where as >>>>>>> >>>>>> ideally >>>>> >>>>>> only 20% of the data should move. >>>>>>> >>>>>>> By using Helix, every stream can be partitioned differently and >>>>>>> independent of the number of nodes. Helix distributes the partitions >>>>>>> >>>>>> evenly >>>>> >>>>>> among the nodes. When new nodes are added, partitions can be migrated >>>>>>> >>>>>> to >>>>> >>>>>> new nodes without changing the number of partitions and minimizes the >>>>>>> >>>>>> data >>>>> >>>>>> movement. >>>>>>> >>>>>>> In S4 handles failures by having stand by nodes that are idle most of >>>>>>> >>>>>> the >>>>> >>>>>> time and become active when a node fails. Even though this works, its >>>>>>> >>>>>> not >>>>> >>>>>> ideal in terms of efficient hardware usage since the stand by nodes >>>>>>> are >>>>>>> idle most of the time. This also increases the fail over time since >>>>>>> >>>>>> the PE >>>>> >>>>>> state has to be transfered to only one node. >>>>>>> >>>>>>> Helix allows S4 to have Active and Standby nodes at a partition level >>>>>>> >>>>>> so >>>>> >>>>>> that all nodes can be active but some partitions will be Active and >>>>>>> >>>>>> some in >>>>> >>>>>> stand by mode. When a node fails, the partitions that were Active on >>>>>>> >>>>>> that >>>>> >>>>>> node will be evenly distributed among the remaining nodes. This >>>>>>> >>>>>> provides >>>>> >>>>>> automatic load balancing and also improves fail over time, since PE >>>>>>> >>>>>> state >>>>> >>>>>> can be transfered to multiple nodes in parallel. >>>>>>> >>>>>>> I have a prototype implementation here >>>>>>> https://github.com/kishoreg/**incubator-s4<https://github.com/kishoreg/incubator-s4> >>>>>>> >>>>>>> Instructions to build it and try it out are in the Readme. >>>>>>> >>>>>>> More info on Helix can be found here, >>>>>>> >>>>>> http://helix.incubator.apache.**org/<http://helix.incubator.apache.org/> >>>>> >>>>>> >>>>>>> Helix can provide lot of other functionalities like >>>>>>> >>>>>>> 1. Configure the topology according to use case. For example, >>>>>>> >>>>>> co-locate >>>>> >>>>>> the partitions of different streams to allow efficient joins. >>>>>>> >>>>>> Configure the >>>>> >>>>>> number of standby for each partition based on the head room >>>>>>> >>>>>> available. >>>>> >>>>>> 2. When new nodes are added, it can throttle the data movement >>>>>>> 3. Comes with large set of admin tools like enable/disable node, >>>>>>> dynamically change the topology etc. Provides a rest interface to >>>>>>> >>>>>> manage >>>>> >>>>>> the cluster. >>>>>>> 4. Allows one to schedule custom tasks like snapshot the PE's in >>>>>>> a >>>>>>> partition and restore from the snapshot. >>>>>>> >>>>>>> >>>>>>> Would like to get your feedback. >>>>>>> >>>>>>> Thanks, >>>>>>> Kishore G >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >
