#!/usr/bin/python # The problem: You have several sorted lists containing only many # URLs, but only a few URLs are common to all of them. Find the # common URLs, i.e. the intersection. Worse, the lists are on # different machines, so you want to minimize the number of round-trip # times between the lists.
# Dave Long suggested an approach to this problem some time ago on # kragen-discuss. This program tries it out. # Each machine participating does the following: # when it receives the token, it looks at the datum attached. # # If it's after the last item in its input list, it terminates the # algorithm. # # If it has sent the token out with that datum, it outputs that # datum, then sends the token to the next machine, but with the next # item in its input list attached. # # If it has not sent the token out with that datum, it checks to see # whether that datum is in its input list. If it is, it sends the # token to the next machine with the same datum attached. Otherwise, # it searches in its input list for the place where that datum would # be, moves on to the next item in that input list, then sends the # token to the next machine with that next item attached. # # The machines are connected (logically) in a ring, such that the # transitive closure of the "next machine" relation from any machine # is all the machines, including itself. # # A little crude experimentation shows that with four machines, for # more than a few items, with density below about 25%, and with a # uniform random distribution, this scheme handles three or four items # per token pass. As density increases, it eventually handles fewer, # of course, approaching one item per token pass. # # Perhaps counterintuitively, it handles more items per token pass as # the number of machines increases. For example, with almost 800 # items each chosen from a range up to 5000, and with eight machines, # we finish the algorithm with only about 880 token passes, finding # the intersection of eight lists containing 5900 or so items # altogether. # # If, on each machine, you chop your input lists in half at some fixed # number, such that about half the items in each list are above and # half are below, then you can run this algorithm on the two list # halves in parallel. You can even piggyback on the existing data # transmission; your token just needs to contain two data. # # You can continue this chopping-up process further; instead of # chopping into two pieces, you could chop into four, or eight, or # ten, or a hundred. If you take this to the extreme, each data # transmission merely contains an inefficiently-encoded bitmap, and # each machine need only hold the token once. For any particular # ratio between latency and throughput, we can probably find an # optimal amount of chopping-up; if latency is zero, no chopping-up is # best, and if bandwidth is infinite, the bitmaps are best. However, # I have a hunch there's a midpoint that is close to optimal for a # wide range of such ratios, because I have a hunch that increasing # the number of token-passings beyond 10 or 30 or so doesn't reduce # the bandwidth needed much further. # # My standard AltaVista numbers were 31420 occurrences of 'autism', # 602213 occurrences of 'criteria', and 10000 occurrences of 'eye # contact', with 32 occurrences of all three. # # This algorithm's worst case would be three token-passings per item # from the shortest list, or 30000 token-passings. # # The avtest() routine below simulates this. It took about 80 seconds # to run on my Pentium-II/300, which (with constant factors from clock # speed and implementation techcnology) seems quite sufficient; but it # took 22600 token passings, or 27 items per token-passing. Note that # this comes close to the worst case. If we implemented this on a # network where it took 5ms for a packet to get from any one machine # to any other, it would take 113 seconds to run this. # If we were to chop the URL space up into 750 segments containing # equal numbers of URLs, this would probably take only 30 or 60 # token-passings, and each token-passing would include 750 URLs # (presumably compressed somehow; God only knows how big or small that # would be. 75KB without compression.) # Note that real-life URL lists will be a lot clumpier than these # random-number lists --- e.g. every page from www.creativecommons.org # will contain the word "commons" --- and clumps that occur only in # some of the lists being intersected will be skipped over in a single # round-trip of the token, as if they were single URLs. So the input # provided by this test should be strictly worse than real data. # I still need to simulate the chopping-up to see how much it reduces # the number of token-passings, but I think that will depend # sensitively on how clumpy and intercorrelated the lists are. import random, operator def test(merger): assert merger().merge([[1, 2, 3]] * 4) == [1, 2, 3] assert merger().merge([[1, 2, 3]]) == [1, 2, 3] assert merger().merge([[1, 3], [2, 4]]) == [] assert merger().merge([[1, 3, 4], [1, 2, 4]]) == [1, 4] assert merger().merge([[1, 2, 3], [2, 3, 4]]) == [2, 3] foo = merger() results = foo.merge([[0, 1, 4, 10, 20, 22, 30], [0, 2, 5, 16, 20, 23, 31], [0, 2, 5, 11, 20, 22, 30]]) assert results == [0, 20], results print foo.stats() print lists = [[random.randrange(5000) for x in xrange(1600)] + [44, 193] for y in range(8)] uniqlists = map(uniq, lists) for xx in uniqlists: xx.sort() foo = merger() results = foo.merge(uniqlists) print "got", results assert 44 in results assert 193 in results goodresults = dict_merge(uniqlists) assert results == goodresults, (results, goodresults, lists) print foo.stats() #avtest(merger) def avtest(merger): matches = uniq([random.randrange(100000) for xx in xrange(32)]) matches.sort() lists = [[random.randrange(10000000) for xx in xrange(31420)] + matches, [random.randrange(10000000) for xx in xrange(602213)] + matches, [random.randrange(10000000) for xx in xrange(10000)] + matches] uniqlists = map(uniq, lists) for xx in uniqlists: xx.sort() foo = merger() results = foo.merge(uniqlists) print "got", results print foo.stats() class simple_machine: "Implementation directly from the spec, above." def __init__(self, inputlist): self.inputlist = inputlist self.lastsent = None def handle_token(self, datum): """Returns list of items to output and new datum, None to terminate.""" if datum is None: datum = self.inputlist[0] if datum > self.inputlist[-1]: return [], None if self.lastsent == datum: nextidx = self.inputlist.index(datum) + 1 if nextidx == len(self.inputlist): return [datum], None self.lastsent = self.inputlist[nextidx] return [datum], self.lastsent if datum in self.inputlist: self.lastsent = datum return [], self.lastsent nextidx = 0 while (nextidx < len(self.inputlist) and self.inputlist[nextidx] < datum): nextidx = nextidx + 1 if nextidx == len(self.inputlist): return [], None assert self.inputlist[nextidx] > datum self.lastsent = self.inputlist[nextidx] return [], self.lastsent class smart_machine: """A more optimized implementation, about 4x faster. You'd expect it to be linear instead of O(N^2) like the above one, but they both act pretty linear at the small problem sizes I've tried. """ def __init__(self, inputlist): self.inputlist = inputlist self.lastsent = None self.nextidx = 0 def advanceto(self, datum): nextidx = self.nextidx while (nextidx < len(self.inputlist) and self.inputlist[nextidx] < datum): nextidx = nextidx + 1 self.nextidx = nextidx def handle_token(self, datum): if datum is None: datum = self.inputlist[self.nextidx] self.advanceto(datum) if self.nextidx == len(self.inputlist): return [], None if self.lastsent == datum: self.nextidx = self.nextidx + 1 if self.nextidx == len(self.inputlist): return [datum], None self.lastsent = self.inputlist[self.nextidx] return [datum], self.lastsent if datum == self.inputlist[self.nextidx]: self.lastsent = datum return [], self.lastsent assert self.inputlist[self.nextidx] > datum self.lastsent = self.inputlist[self.nextidx] return [], self.lastsent class simple: "A merger suitable for testing." def __init__(self, machinetype): self.token_passes = 0 self.total_items = None self.machinetype = machinetype def merge(self, lists): outputlist = [] machines = [self.machinetype(ll) for ll in lists] self.total_items = reduce(operator.add, map(len, lists)) token_datum = None while 1: for mm in machines: newitems, token_datum = mm.handle_token(token_datum) outputlist.extend(newitems) if token_datum is None: return outputlist self.token_passes = self.token_passes + 1 def stats(self): return ("%d token passes for %d total items " "(%.2f items per token pass)" % (self.token_passes, self.total_items, float(self.total_items) / self.token_passes)) def dict_merge(lists): "A hash-based intersection algorithm, simple enough to be correct." rvdict = {} for each_list in lists: each_list.sort() for item in each_list: try: rvdict[item] = rvdict[item] + 1 except KeyError: rvdict[item] = 1 rvlist = [] for item in lists[0]: if rvdict[item] == len(lists): rvlist.append(item) return rvlist def uniq(alist): rvdict = {} for item in alist: rvdict[item] = 1 return rvdict.keys() if __name__ == "__main__": test(lambda: simple(smart_machine)) -- <[EMAIL PROTECTED]> Kragen Sitaker <http://www.pobox.com/~kragen/> Edsger Wybe Dijkstra died in August of 2002. The world has lost a great man. See http://advogato.org/person/raph/diary.html?start=252 and http://www.kode-fu.com/geek/2002_08_04_archive.shtml for details.