#!/usr/bin/python
# The problem: You have several sorted lists containing only many
# URLs, but only a few URLs are common to all of them.  Find the
# common URLs, i.e. the intersection.  Worse, the lists are on
# different machines, so you want to minimize the number of round-trip
# times between the lists.

# Dave Long suggested an approach to this problem some time ago on
# kragen-discuss.  This program tries it out.

# Each machine participating does the following:
# when it receives the token, it looks at the datum attached.
#
# If it's after the last item in its input list, it terminates the
# algorithm.
#
# If it has sent the token out with that datum, it outputs that
# datum, then sends the token to the next machine, but with the next
# item in its input list attached.
# 
# If it has not sent the token out with that datum, it checks to see
# whether that datum is in its input list.  If it is, it sends the
# token to the next machine with the same datum attached.  Otherwise,
# it searches in its input list for the place where that datum would
# be, moves on to the next item in that input list, then sends the
# token to the next machine with that next item attached.
#
# The machines are connected (logically) in a ring, such that the
# transitive closure of the "next machine" relation from any machine
# is all the machines, including itself.
#
# A little crude experimentation shows that with four machines, for
# more than a few items, with density below about 25%, and with a
# uniform random distribution, this scheme handles three or four items
# per token pass.  As density increases, it eventually handles fewer,
# of course, approaching one item per token pass.
#
# Perhaps counterintuitively, it handles more items per token pass as
# the number of machines increases.  For example, with almost 800
# items each chosen from a range up to 5000, and with eight machines,
# we finish the algorithm with only about 880 token passes, finding
# the intersection of eight lists containing 5900 or so items
# altogether.
#
# If, on each machine, you chop your input lists in half at some fixed
# number, such that about half the items in each list are above and
# half are below, then you can run this algorithm on the two list
# halves in parallel.  You can even piggyback on the existing data
# transmission; your token just needs to contain two data.
#
# You can continue this chopping-up process further; instead of
# chopping into two pieces, you could chop into four, or eight, or
# ten, or a hundred.  If you take this to the extreme, each data
# transmission merely contains an inefficiently-encoded bitmap, and
# each machine need only hold the token once.  For any particular
# ratio between latency and throughput, we can probably find an
# optimal amount of chopping-up; if latency is zero, no chopping-up is
# best, and if bandwidth is infinite, the bitmaps are best.  However,
# I have a hunch there's a midpoint that is close to optimal for a
# wide range of such ratios, because I have a hunch that increasing
# the number of token-passings beyond 10 or 30 or so doesn't reduce
# the bandwidth needed much further.
#
# My standard AltaVista numbers were 31420 occurrences of 'autism',
# 602213 occurrences of 'criteria', and 10000 occurrences of 'eye
# contact', with 32 occurrences of all three.
#
# This algorithm's worst case would be three token-passings per item
# from the shortest list, or 30000 token-passings.
# 
# The avtest() routine below simulates this.  It took about 80 seconds
# to run on my Pentium-II/300, which (with constant factors from clock
# speed and implementation techcnology) seems quite sufficient; but it
# took 22600 token passings, or 27 items per token-passing.  Note that
# this comes close to the worst case.  If we implemented this on a
# network where it took 5ms for a packet to get from any one machine
# to any other, it would take 113 seconds to run this.

# If we were to chop the URL space up into 750 segments containing
# equal numbers of URLs, this would probably take only 30 or 60
# token-passings, and each token-passing would include 750 URLs
# (presumably compressed somehow; God only knows how big or small that
# would be.  75KB without compression.)

# Note that real-life URL lists will be a lot clumpier than these
# random-number lists --- e.g. every page from www.creativecommons.org
# will contain the word "commons" --- and clumps that occur only in
# some of the lists being intersected will be skipped over in a single
# round-trip of the token, as if they were single URLs.  So the input
# provided by this test should be strictly worse than real data.

# I still need to simulate the chopping-up to see how much it reduces
# the number of token-passings, but I think that will depend
# sensitively on how clumpy and intercorrelated the lists are.

import random, operator

def test(merger):
    assert merger().merge([[1, 2, 3]] * 4) == [1, 2, 3]
    assert merger().merge([[1, 2, 3]]) == [1, 2, 3]
    assert merger().merge([[1, 3], [2, 4]]) == []
    assert merger().merge([[1, 3, 4], [1, 2, 4]]) == [1, 4]
    assert merger().merge([[1, 2, 3], [2, 3, 4]]) == [2, 3]
    
    foo = merger()
    results = foo.merge([[0, 1, 4, 10, 20, 22, 30],
                         [0, 2, 5, 16, 20, 23, 31],
                         [0, 2, 5, 11, 20, 22, 30]])
    assert results == [0, 20], results
    print foo.stats()
    print

    lists = [[random.randrange(5000) for x in xrange(1600)] + [44, 193]
             for y in range(8)]
    uniqlists = map(uniq, lists)
    for xx in uniqlists: xx.sort()
    foo = merger()
    results = foo.merge(uniqlists)
    print "got", results
    assert 44 in results
    assert 193 in results
    goodresults = dict_merge(uniqlists)
    assert results == goodresults, (results, goodresults, lists)
    print foo.stats()
    #avtest(merger)

def avtest(merger):
    matches = uniq([random.randrange(100000) for xx in xrange(32)])
    matches.sort()
    lists = [[random.randrange(10000000) for xx in xrange(31420)] + matches,
             [random.randrange(10000000) for xx in xrange(602213)] + matches,
             [random.randrange(10000000) for xx in xrange(10000)] + matches]
    uniqlists = map(uniq, lists)
    for xx in uniqlists: xx.sort()
    foo = merger()
    results = foo.merge(uniqlists)
    print "got", results
    print foo.stats()
    

class simple_machine:
    "Implementation directly from the spec, above."
    def __init__(self, inputlist):
        self.inputlist = inputlist
        self.lastsent = None
    def handle_token(self, datum):
        """Returns list of items to output and new datum, None to terminate."""
        if datum is None: datum = self.inputlist[0]
        if datum > self.inputlist[-1]: return [], None
        if self.lastsent == datum:
            nextidx = self.inputlist.index(datum) + 1
            if nextidx == len(self.inputlist): return [datum], None
            self.lastsent = self.inputlist[nextidx]
            return [datum], self.lastsent
        if datum in self.inputlist:
            self.lastsent = datum
            return [], self.lastsent
        nextidx = 0
        while (nextidx < len(self.inputlist) and
               self.inputlist[nextidx] < datum):
            nextidx = nextidx + 1
        if nextidx == len(self.inputlist):
            return [], None
        assert self.inputlist[nextidx] > datum
        self.lastsent = self.inputlist[nextidx]
        return [], self.lastsent

class smart_machine:
    """A more optimized implementation, about 4x faster.

    You'd expect it to be linear instead of O(N^2) like the above one,
    but they both act pretty linear at the small problem sizes I've
    tried.

    """
    def __init__(self, inputlist):
        self.inputlist = inputlist
        self.lastsent = None
        self.nextidx = 0
    def advanceto(self, datum):
        nextidx = self.nextidx
        while (nextidx < len(self.inputlist) and
               self.inputlist[nextidx] < datum):
            nextidx = nextidx + 1
        self.nextidx = nextidx
    def handle_token(self, datum):
        if datum is None: datum = self.inputlist[self.nextidx]
        self.advanceto(datum)
        if self.nextidx == len(self.inputlist):
            return [], None
        if self.lastsent == datum:
            self.nextidx = self.nextidx + 1
            if self.nextidx == len(self.inputlist): return [datum], None
            self.lastsent = self.inputlist[self.nextidx]
            return [datum], self.lastsent
        if datum == self.inputlist[self.nextidx]:
            self.lastsent = datum
            return [], self.lastsent
        assert self.inputlist[self.nextidx] > datum
        self.lastsent = self.inputlist[self.nextidx]
        return [], self.lastsent

class simple:
    "A merger suitable for testing."
    def __init__(self, machinetype):
        self.token_passes = 0
        self.total_items = None
        self.machinetype = machinetype
    def merge(self, lists):
        outputlist = []
        machines = [self.machinetype(ll) for ll in lists]
        self.total_items = reduce(operator.add, map(len, lists))
        token_datum = None
        while 1:
            for mm in machines:
                newitems, token_datum = mm.handle_token(token_datum)
                outputlist.extend(newitems)
                if token_datum is None: return outputlist
                self.token_passes = self.token_passes + 1
    def stats(self):
        return ("%d token passes for %d total items "
                "(%.2f items per token pass)" %
                (self.token_passes, self.total_items,
                 float(self.total_items) / self.token_passes))
    
def dict_merge(lists):
    "A hash-based intersection algorithm, simple enough to be correct."
    rvdict = {}
    for each_list in lists:
        each_list.sort()
        for item in each_list:
            try:
                rvdict[item] = rvdict[item] + 1
            except KeyError:
                rvdict[item] = 1
    rvlist = []
    for item in lists[0]:
        if rvdict[item] == len(lists):
            rvlist.append(item)
    return rvlist
    
def uniq(alist):
    rvdict = {}
    for item in alist:
        rvdict[item] = 1
    return rvdict.keys()

if __name__ == "__main__": test(lambda: simple(smart_machine))


-- 
<[EMAIL PROTECTED]>       Kragen Sitaker     <http://www.pobox.com/~kragen/>
Edsger Wybe Dijkstra died in August of 2002.  The world has lost a great
man.  See http://advogato.org/person/raph/diary.html?start=252 and
http://www.kode-fu.com/geek/2002_08_04_archive.shtml for details.

Reply via email to