On Fri, Jan 15, 2010 at 4:36 PM, Andrzej Bialecki <a...@getopt.org> wrote:

> My 0.02 PLN on the subject ...
>

Polish currency seems pretty strong lately.  There are a lot of good ideas
for this small sum.


>
> Terminology
>
> * (global) search index
> * index shard:
> * partitioning:
> * search node:
> * search host:
> * Shard Manager:
>

I think that these terms are excellent.


> The replication and load balancing is a problem with many existing
> solutions, and this one in particular reminds me strongly of the Hadoop
> HDFS. In fact, early on during the development of Hadoop [1] I wondered
> whether we could reuse HDFS to manage Lucene indexes instead of opaque
> blocks of fixed size. It turned out to be infeasible, but the model of
> Namenode/Datanode still looks useful in our case, too.
>

I have seen the analogy with hadoop in managing a Katta cluster.  The
randomized assignment provides very many of the same robustness benefits as
a map-reduce architecture provides for parallel computing.


> I believe there are many useful lessons lurking in Hadoop/HBase/Zookeeper
> that we could reuse in our design. The following is just a straightforward
> port of the Namenode/Datanode concept.
>
> Let's imagine a component called ShardManager that is responsible for
> managing the following data:
>
> * list of shard ID-s that together form the complete search index,
> * for each shard ID, list of search nodes that serve this shard.
> * issuing replication requests
> * maintaining the partitioning function (see below), so that updates are
> directed to correct shards
> * maintaining heartbeat to check for dead nodes
> * providing search clients with a list of nodes to query in order to obtain
> all results from the search index.
>

I think that this is close.

I think that the list of search nodes that serve each shard should be
maintained by the nodes themselves.  Moreover, ZK provides the ability to
have this list magically update if the node dies.

This means that the need for heartbeats virtually disappears.

In addition, I think that a substrate like ZK should be used to provide
search clients with the information about which nodes have which shards and
the clients should themselves decide how to cover the set of shards with a
list of nodes.  This means that the ShardManager is *completely* out of the
real-time pathway.



> ... I believe most of the above functionality could be facilitated by
> Zookeeper, including the election of the node that runs the ShardManager.
>

Absolutely.


> Updates
> -------
> We need a partitioning schema that splits documents more or less evenly
> among shards, and at the same time allows us to split or merge unbalanced
> shards. The simplest function that we could imagine is the following:
>
>        hash(docId) % numShards
>
> though this has the disadvantage that any larger update will affect
> multiple shards, thus creating an avalanche of replication requests ... so a
> sequential model would be probably better, where ranges of docIds are
> assigned to shards.
>

A hybrid is quite possible:

hash(floor(docId / sequence-size)) % numShards

this gives sequential assignment of sequence-size documents at a time.
Sequence-size should be small to distribute query results and update loads
across all nodes.  Sequence size should be large to avoid replication of all
shards after a focussed update.  Balance is necessary.


> Now, if any particular shard is too unbalanced, e.g. too large, it could be
> further split in two halves, and the ShardManager would have to record this
> exception. This is a very similar process to a region split in HBase, or a
> page split in btree DBs. Conversely, shards that are too small could be
> joined. This is the icing on the cake, so we can leave it for later.
>

Leaving for later is a great idea.  With relatively small shards, I am able
to parallelize indexing to the point that a terabyte or so of documents
index in a few hours.  Combined with a small sequence-size in the shard
distribution function so that all shards grow together, it is easy to plan
for 3x growth or more without the need to shard splitting.  With a complete
index being so cheap, I can afford to simply reindex from scratch with a
different shard count if I feel like it.



> Search
> ------
> There should be a component sometimes referred to as query integrator (or
> search front-end) that is the entry and exit point for user search requests.
> On receiving a search request this component gets a list of randomly
> selected nodes from SearchManager to contact (the list containing all shards
> that form the global index), sends the query and integrates partial results
> (under a configurable policy for timeouts/early termination), and sends back
> the assembled results to the user.
>

Yes in outline.

A few details:

I think that the shard cover computation should, in fact, be done on the
client side.  One reason is that the node/shard state is relatively static
and if all clients retrieve the full state this is cachable and simple.

Another detail is that however the cover of all shards is computed, the
result is a distinct list of all shards grouped by one of the nodes that
shard resides on.  Each group should represent exactly one query to the
correspoding node.  The node should perform the query on each of the request
shards (but NOT all shards on the node) and return the results unmerged.
Additionally, it should return an exception that may have occurred in place
of a result list.

As results are returned, the query integrator needs to do several things:

a) if a node cannot be reached, the shards on that node should be found on
other nodes and the query repeated for those nodes.  If no other nodes can
be found for any of those shards, an error result should be recorded for the
missing shards.

b) if an exception is returned as a result for a shard, the query integrator
can try the query for the failing shards analogously to (a) or it can
immediately report an error.  My experience is that reporting the error is
generally better because shards tend to be replicated cleanly and errors one
place will just occur elsewhere.  Mileage may vary on this point.

c) as results are recorded per shard the query integrator needs to decide
whether to return partial results or to wait for more results.  I have found
it very useful to have a pluggable policy here and have typically used a
policy with dual deadlines.  The first deadline is how long the query
integrator will wait for full results for every shard to be recorded.  The
second deadline is one after which whatever results are available will be
returned.  I also like a minimum percentage at the second deadline which
determines whether to mark the partial results as success or failure.

These three points give robustness to the process in a fashion very similar
to the way that map-reduce with task retry gives robustness to a parallel
program.  The result is very nice.

That's it for now from the top of my head ...
>

One additional concept that I find useful is the idea of different kinds of
broadcast.  For search, we want to do what I call "horizontal broadcast" of
the query to exactly one copy of each shard.  For update, we often want to
do what I call "vertical broadcast" to each copy of a particular shard.

With a little bit of functional programming, these two kinds of broadcast
are nearly the complete repertoire of functions that need to be exposed to
the caller.  In particular, this shard management layer does not need to
know at all what kinds of queries or updates are being done.  Instead, it
just needs to know what the query object is, what arguments it wants and
whether to do a horizontal or vertical broadcast.   This makes the parallel
with map-reduce even stronger.  Map-reduce is a way of taking a few
functions that define a map-reduce program and passing the data from one
function to another in a highly ritualized fashion.  Shard management for
retrieval should be very much the same.  The shard management framework
doesn't need to care what the functions that implement the queries DO, it
just has to invoke them next to the right shards and assemble the results
and failures.  Of course, with retrieval the set of functions being invoked
is far more static than with, say, hadoop map functions, but the principal
of isolation should be the same.

All of these shard management concepts are present and well worked out in
Katta.  Katta provides at the least a useful proof of concept for these
ideas even if the specific implementation isn't what is desired for
Solr/Cloud.

Reply via email to