Sorry, I had replied to the user list !
Begin forwarded message:
From: Matthieu Morel <[email protected]>
Subject: Re: Using Helix for cluster management of S4
Date: November 30, 2012 7:56:39 PM GMT+01:00
To: [email protected]
Regarding redistribution of partitions with stateful PEs, another
simple solution is to activate checkpointing.
Then when you need to repartition:
- on the origin node, upon a rebalancing request for partition p, you
simply invalidate PE instances from partition p from the local cache (and
trigger checkpointing if necessary)
- on the destination node, there is nothing to do, state for PEs
belonging to partition p is lazily reloaded (i.e. when necessary).
Of course there are other solutions, such as manually snapshotting to a
data store, but that may be more complex.
Matthieu
On Nov 30, 2012, at 7:44 PM, kishore g wrote:
Matt,
Thats right the key idea is to over partition the stream( number of
partitions higher than the number of nodes).
Helix supports two modes, when a new node is added it automatically
moves partitions. We call this AUTO_REBALANCE. This is recommended for
stateless tasks. Another mode is SEMI_AUTO where you can change the
topology using admin command. So what one would do is add a new node and
then rebalance the cluster by invoking an Helix Api, Helix will then
re-distribute partitions and as I said earlier will minimize the movement
and co-ordinate the movement. When I say co-ordinate, it will first ask the
old leader to stop processing the partition. It can then snapshot its
state. Once thats done Helix will ask the new node to host the partition,
where it can load the snapshot. I will add this example to the walk through
instruction.
Daniel,
Yes that is possible, I think createTask and deployApp commands
already take an optional parameter to list a subset of nodes but I think I
have only implemented it for deployApp. Adding it for createTask( which
assigns stream processing to s4 nodes) is straight forward.
Matthieu mentioned that in the Readme instructions the adapter command
is invoking the old code. I will make that change in some time. If you are
trying this now, then run GenericEventAdapter from eclipse directly. ( The
same options hold good).
Yes JIRA is 110. I will add the description. I am pretty sure there
will be issues :-)
On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro <
[email protected]> wrote:
I agree with Matthieu, that's a really nice integration!
I particularly like having different partition schemes per stream. I
guess it would be easy (or at least possible) to implement some kind of
isolation where only a subset of nodes handles a specific stream, for
example (related to S4-91).
It looks really nice, I'm looking forward to trying it. I'll give more
feedback if I run into any issues. I guess the right JIRA for that would be
S4-110, right? (It's missing a description!)
Good job!
Daniel
On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote:
Thanks Kishore, that's a very interesting contribution!
It's also very appropriate considering that S4 is completely
decentralized and that there is no driving/scheduling entity: the logic is
within the nodes. So it's nice to have a way to easily describe and define
coordinated behaviors, and to easily automate them.
About the partitioning, the key here as I understand it, is to have a
number of partitions higher than the number of nodes by default, possibly
several times higher. So a given node is assigned multiple partitions. (In
contrast, until now in S4, nb partitions <= nb nodes, including standby
nodes).
In the canonical example that you provide, how do we proceed if we
want to add another s4 node? That's not clear to me, and it would help
understand how partitions are reassigned.
Thanks!
Matthieu
In S4 the number of partition is fixed for all streams and is
dependent on
the number of nodes in the cluster. Adding new nodes to S4 cluster
causes
the number of partitions to change. This results in lot of data
movement.
For example if there are 4 nodes and you add another node then nearly
all
keys will be remapped which result is huge data movement where as
ideally
only 20% of the data should move.
By using Helix, every stream can be partitioned differently and
independent of the number of nodes. Helix distributes the partitions
evenly
among the nodes. When new nodes are added, partitions can be migrated
to
new nodes without changing the number of partitions and minimizes the
data
movement.
In S4 handles failures by having stand by nodes that are idle most of
the
time and become active when a node fails. Even though this works, its
not
ideal in terms of efficient hardware usage since the stand by nodes are
idle most of the time. This also increases the fail over time since
the PE
state has to be transfered to only one node.
Helix allows S4 to have Active and Standby nodes at a partition level
so
that all nodes can be active but some partitions will be Active and
some in
stand by mode. When a node fails, the partitions that were Active on
that
node will be evenly distributed among the remaining nodes. This
provides
automatic load balancing and also improves fail over time, since PE
state
can be transfered to multiple nodes in parallel.
I have a prototype implementation here
https://github.com/kishoreg/incubator-s4
Instructions to build it and try it out are in the Readme.
More info on Helix can be found here,
http://helix.incubator.apache.org/
Helix can provide lot of other functionalities like
1. Configure the topology according to use case. For example,
co-locate
the partitions of different streams to allow efficient joins.
Configure the
number of standby for each partition based on the head room
available.
2. When new nodes are added, it can throttle the data movement
3. Comes with large set of admin tools like enable/disable node,
dynamically change the topology etc. Provides a rest interface to
manage
the cluster.
4. Allows one to schedule custom tasks like snapshot the PE's in a
partition and restore from the snapshot.
Would like to get your feedback.
Thanks,
Kishore G