# distributed intersections of sorted lists

```#!/usr/bin/python
# The problem: You have several sorted lists containing only many
# URLs, but only a few URLs are common to all of them.  Find the
# common URLs, i.e. the intersection.  Worse, the lists are on
# different machines, so you want to minimize the number of round-trip
# times between the lists.```
```
# Dave Long suggested an approach to this problem some time ago on
# kragen-discuss.  This program tries it out.

# Each machine participating does the following:
# when it receives the token, it looks at the datum attached.
#
# If it's after the last item in its input list, it terminates the
# algorithm.
#
# If it has sent the token out with that datum, it outputs that
# datum, then sends the token to the next machine, but with the next
# item in its input list attached.
#
# If it has not sent the token out with that datum, it checks to see
# whether that datum is in its input list.  If it is, it sends the
# token to the next machine with the same datum attached.  Otherwise,
# it searches in its input list for the place where that datum would
# be, moves on to the next item in that input list, then sends the
# token to the next machine with that next item attached.
#
# The machines are connected (logically) in a ring, such that the
# transitive closure of the "next machine" relation from any machine
# is all the machines, including itself.
#
# A little crude experimentation shows that with four machines, for
# more than a few items, with density below about 25%, and with a
# uniform random distribution, this scheme handles three or four items
# per token pass.  As density increases, it eventually handles fewer,
# of course, approaching one item per token pass.
#
# Perhaps counterintuitively, it handles more items per token pass as
# the number of machines increases.  For example, with almost 800
# items each chosen from a range up to 5000, and with eight machines,
# we finish the algorithm with only about 880 token passes, finding
# the intersection of eight lists containing 5900 or so items
# altogether.
#
# If, on each machine, you chop your input lists in half at some fixed
# number, such that about half the items in each list are above and
# half are below, then you can run this algorithm on the two list
# halves in parallel.  You can even piggyback on the existing data
# transmission; your token just needs to contain two data.
#
# You can continue this chopping-up process further; instead of
# chopping into two pieces, you could chop into four, or eight, or
# ten, or a hundred.  If you take this to the extreme, each data
# transmission merely contains an inefficiently-encoded bitmap, and
# each machine need only hold the token once.  For any particular
# ratio between latency and throughput, we can probably find an
# optimal amount of chopping-up; if latency is zero, no chopping-up is
# best, and if bandwidth is infinite, the bitmaps are best.  However,
# I have a hunch there's a midpoint that is close to optimal for a
# wide range of such ratios, because I have a hunch that increasing
# the number of token-passings beyond 10 or 30 or so doesn't reduce
# the bandwidth needed much further.
#
# My standard AltaVista numbers were 31420 occurrences of 'autism',
# 602213 occurrences of 'criteria', and 10000 occurrences of 'eye
# contact', with 32 occurrences of all three.
#
# This algorithm's worst case would be three token-passings per item
# from the shortest list, or 30000 token-passings.
#
# The avtest() routine below simulates this.  It took about 80 seconds
# to run on my Pentium-II/300, which (with constant factors from clock
# speed and implementation techcnology) seems quite sufficient; but it
# took 22600 token passings, or 27 items per token-passing.  Note that
# this comes close to the worst case.  If we implemented this on a
# network where it took 5ms for a packet to get from any one machine
# to any other, it would take 113 seconds to run this.

# If we were to chop the URL space up into 750 segments containing
# equal numbers of URLs, this would probably take only 30 or 60
# token-passings, and each token-passing would include 750 URLs
# (presumably compressed somehow; God only knows how big or small that
# would be.  75KB without compression.)

# Note that real-life URL lists will be a lot clumpier than these
# random-number lists --- e.g. every page from www.creativecommons.org
# will contain the word "commons" --- and clumps that occur only in
# some of the lists being intersected will be skipped over in a single
# round-trip of the token, as if they were single URLs.  So the input
# provided by this test should be strictly worse than real data.

# I still need to simulate the chopping-up to see how much it reduces
# the number of token-passings, but I think that will depend
# sensitively on how clumpy and intercorrelated the lists are.

import random, operator

def test(merger):
assert merger().merge([[1, 2, 3]] * 4) == [1, 2, 3]
assert merger().merge([[1, 2, 3]]) == [1, 2, 3]
assert merger().merge([[1, 3], [2, 4]]) == []
assert merger().merge([[1, 3, 4], [1, 2, 4]]) == [1, 4]
assert merger().merge([[1, 2, 3], [2, 3, 4]]) == [2, 3]

foo = merger()
results = foo.merge([[0, 1, 4, 10, 20, 22, 30],
[0, 2, 5, 16, 20, 23, 31],
[0, 2, 5, 11, 20, 22, 30]])
assert results == [0, 20], results
print foo.stats()
print

lists = [[random.randrange(5000) for x in xrange(1600)] + [44, 193]
for y in range(8)]
uniqlists = map(uniq, lists)
for xx in uniqlists: xx.sort()
foo = merger()
results = foo.merge(uniqlists)
print "got", results
assert 44 in results
assert 193 in results
goodresults = dict_merge(uniqlists)
assert results == goodresults, (results, goodresults, lists)
print foo.stats()
#avtest(merger)

def avtest(merger):
matches = uniq([random.randrange(100000) for xx in xrange(32)])
matches.sort()
lists = [[random.randrange(10000000) for xx in xrange(31420)] + matches,
[random.randrange(10000000) for xx in xrange(602213)] + matches,
[random.randrange(10000000) for xx in xrange(10000)] + matches]
uniqlists = map(uniq, lists)
for xx in uniqlists: xx.sort()
foo = merger()
results = foo.merge(uniqlists)
print "got", results
print foo.stats()

class simple_machine:
"Implementation directly from the spec, above."
def __init__(self, inputlist):
self.inputlist = inputlist
self.lastsent = None
def handle_token(self, datum):
"""Returns list of items to output and new datum, None to terminate."""
if datum is None: datum = self.inputlist[0]
if datum > self.inputlist[-1]: return [], None
if self.lastsent == datum:
nextidx = self.inputlist.index(datum) + 1
if nextidx == len(self.inputlist): return [datum], None
self.lastsent = self.inputlist[nextidx]
return [datum], self.lastsent
if datum in self.inputlist:
self.lastsent = datum
return [], self.lastsent
nextidx = 0
while (nextidx < len(self.inputlist) and
self.inputlist[nextidx] < datum):
nextidx = nextidx + 1
if nextidx == len(self.inputlist):
return [], None
assert self.inputlist[nextidx] > datum
self.lastsent = self.inputlist[nextidx]
return [], self.lastsent

class smart_machine:
"""A more optimized implementation, about 4x faster.

You'd expect it to be linear instead of O(N^2) like the above one,
but they both act pretty linear at the small problem sizes I've
tried.

"""
def __init__(self, inputlist):
self.inputlist = inputlist
self.lastsent = None
self.nextidx = 0
nextidx = self.nextidx
while (nextidx < len(self.inputlist) and
self.inputlist[nextidx] < datum):
nextidx = nextidx + 1
self.nextidx = nextidx
def handle_token(self, datum):
if datum is None: datum = self.inputlist[self.nextidx]
if self.nextidx == len(self.inputlist):
return [], None
if self.lastsent == datum:
self.nextidx = self.nextidx + 1
if self.nextidx == len(self.inputlist): return [datum], None
self.lastsent = self.inputlist[self.nextidx]
return [datum], self.lastsent
if datum == self.inputlist[self.nextidx]:
self.lastsent = datum
return [], self.lastsent
assert self.inputlist[self.nextidx] > datum
self.lastsent = self.inputlist[self.nextidx]
return [], self.lastsent

class simple:
"A merger suitable for testing."
def __init__(self, machinetype):
self.token_passes = 0
self.total_items = None
self.machinetype = machinetype
def merge(self, lists):
outputlist = []
machines = [self.machinetype(ll) for ll in lists]
token_datum = None
while 1:
for mm in machines:
newitems, token_datum = mm.handle_token(token_datum)
outputlist.extend(newitems)
if token_datum is None: return outputlist
self.token_passes = self.token_passes + 1
def stats(self):
return ("%d token passes for %d total items "
"(%.2f items per token pass)" %
(self.token_passes, self.total_items,
float(self.total_items) / self.token_passes))

def dict_merge(lists):
"A hash-based intersection algorithm, simple enough to be correct."
rvdict = {}
for each_list in lists:
each_list.sort()
for item in each_list:
try:
rvdict[item] = rvdict[item] + 1
except KeyError:
rvdict[item] = 1
rvlist = []
for item in lists[0]:
if rvdict[item] == len(lists):
rvlist.append(item)
return rvlist

def uniq(alist):
rvdict = {}
for item in alist:
rvdict[item] = 1
return rvdict.keys()

if __name__ == "__main__": test(lambda: simple(smart_machine))

--
<[EMAIL PROTECTED]>       Kragen Sitaker     <http://www.pobox.com/~kragen/>
Edsger Wybe Dijkstra died in August of 2002.  The world has lost a great