Thursday afternoon, after work, I hacked the full-text mail indexing program from last weekend so it could handle arbitrarily large mailboxes. It generates N small indices, each small enough to easily fit into memory, then does an N-way merge on them. On Gentle, which is a 500MHz AMD K6 with 64kB of cache and 64MB of RAM, it indexed my 559MB mailbox in 90 minutes this afternoon: an hour to generate the indices, then half an hour to merge them. That's a pathetic 100kB/s.
On Thursday, generating larger index chunks (2 million postings instead of 1/2 million), on a machine with a 800MHz CPU and 256MB of RAM, it took only about 20 minutes to index the same corpus, which is a more reasonable rate. The code here is in two files: merge.py, which merges arbitrary collections of sequences, and maildex.py. Maildex.py is currently pretty messy and needs to be restructured. I haven't gotten around to doing it yet, but if I wait until I do before I send it out, then it may never happen. When I said, "arbitrarily large," I didn't really mean it. This code still has several important scaling limitations: - it mmaps the whole mbox, so it can't deal with mboxes larger than a few gigabytes on 32-bit platforms (subject to your OS's virtual memory map; default on Linux puts the limit at a little under 2GB) - search times scale at least linearly with the length of the longest posting list involved; posting list lengths scale linearly with the corpus size. - maximum memory use scales with the size of individual messages; a single large message can cause excessive memory use - the results aren't ranked, so really large result sets are useless - there's no incremental indexing or incremental search Here's merge.py: #!/usr/bin/env python '''Generic lazy merge algorithm. There are lots of "merge" algorithms; generally they make one pass over each of several sorted lists. They work well in environments where locality of reference is important, such as when your data is much larger than your physical RAM. Some of them work on arbitrary numbers of lists of items of the same type, while others work on specific numbers of lists of items of different types. The thing that binds them all together is that they all traverse their input lists in the same way. This way is not, in general, trivial to implement. This module implements it so I won't have to implement it again, at least not for small numbers of lists. Most problems that have merge-algorithm solutions also have hashtable solutions which run faster when you have sufficient random-access storage to hold all your data. Accordingly, merge algorithms have fallen very much out of favor over the last thirty or forty years as main memory sizes have increased from megabits to terabits. Some examples of merge algorithms: - produce a single sorted list from multiple sorted lists. (This is the kind of merge used in mergesort.) - produce the set union, set intersection, or set difference of two sorted lists. - given a master file and an update file sorted in the same order, produce a new master file with all the updates applied. It turns out that you can implement all the other algorithms on top of the first one, as long as you remember which input list each output item came from. My particular problem is that I have inverted index files rather too large for my RAM, so I have to index my corpus in chunks, then merge the individual chunks to form a master index. ''' def merge2(seq1, seq2): """Generator for the merged output of two sorted sequences.""" it1, it2 = iter(seq1), iter(seq2) try: thing1 = it1.next() except StopIteration: for item in it2: yield item return try: thing2 = it2.next() except StopIteration: yield thing1 for item in it1: yield item return while 1: if thing1 > thing2: it1, it2, thing1, thing2 = it2, it1, thing2, thing1 yield thing1 try: thing1 = it1.next() except StopIteration: yield thing2 for item in it2: yield item return def taggediter(iter, tag): """Generator that tags each item in a sequence with its source.""" for item in iter: yield (item, tag) def _mergen(seqs): """Generate the merged output of N sorted sequences, N >= 0. Creates a heap of merge2 iterators that do the actual work, then returns the root of that heap. """ if len(seqs) == 0: return [] if len(seqs) == 1: return seqs[0] elif len(seqs) == 2: return merge2(seqs[0], seqs[1]) else: part = len(seqs)/2 # 1 -> 0, 2 -> 1, 3 -> 1, 4 -> 2, 5 -> 2 return merge2(_mergen(seqs[:part]), _mergen(seqs[part:])) def merge(iters): """Generate the tagged, merged output of N sorted sequences, N >= 0. Items from the first sequence will be returned as (item, 0), items from the second as (item, 1), etc. """ return _mergen([taggediter(iters[i], i) for i in range(len(iters))]) def ok(a, b): assert a==b, (a,b) def test(): ok(list(merge([])), []) ok(list(merge([['a', 'b', 'c']])), [('a', 0), ('b', 0), ('c', 0)]) ok(list(merge([[1], []])), [(1, 0)]) ok(list(merge([[], [1]])), [(1, 1)]) ok(list(merge([[1], [2]])), [(1, 0), (2, 1)]) ok(list(merge([[2], [1]])), [(1, 1), (2, 0)]) ok(list(merge([[1, 2, 3], [1.5, 3.25, 3.5]])), [(1, 0), (1.5, 1), (2, 0), (3, 0), (3.25, 1), (3.5, 1)]) ok(list(merge([[1], [5], [4], [6]])), [(1, 0), (4, 2), (5, 1), (6, 3)]) test() And here's maildex.py: #!/usr/bin/env python """Index my mailbox. This utility generates a term index of an mbox file and allows you to search it. If you run it with just the mbox file as an argument, it assumes you want to reindex it; if you run it with the mbox file and some other terms, it does a search. Like Google, it's default-and, and you can precede a term with a - to exclude messages containing it. Requires recent Python, probably 2.3. Won't work in 2.1.3. I have only 4.5M postings out of 7K messages in 50MB, so I can make a list of all postings in RAM. It takes about a minute or two and another 50MB of RAM. Writing it to disk with Berkeley DB used to take another 10 minutes or so! The index files are about 27MB (down from 45MB with Berkeley DB), but gzip to <11MB. Unimplemented feature 1: merge multiple index files into a single one, so you can index files whose index won't fit in RAM. (The index seems to be about the same size as the file itself, so this probably means, to index files bigger than RAM.) If you had a really big corpus, you might need to do this more than once, but who has a corpus that big? Unimplemented feature 2: handle mbox files bigger than the address space. """ # - Python space costs: # Looks like a dict is about 154 bytes, an empty list about 48, # and a one-item list about 96 bytes. There are about 340K terms # if we use the naive [a-zA-Z0-9]+ term definition, so about 340K * # 96 + 4M * 4 bytes = ~48MB. # "Well, I've read about inverted indices and stuff, but I've never # built a full-text search engine." - me on Thursday # "Dude, it's, like, cake." - DMH import mmap, re, sys, os, merge def sorted(alist): "Returns a list after sorting it." alist.sort() return alist class lindb: """Linear database. I was storing data in a Berkeley DB file, but it was really slow --- it took 10 minutes to write out 45 megabytes of data with 340K keys. So I thought I'd take the Lucene approach; just write a sorted list of key-value pairs, write a "headwords" file to make it easy to find the right part of the file, then linearly search. This module lets it take much less time, by just sorting the data and storing it in a text file. It's very slightly slower than the Berkeley DB version, but uses about half the space (for my data.) The big compromises are that this module doesn't allow the same flexibility in access as Berkeley DB, and it doesn't allow the storage of arbitrary data --- data with newlines or ": " in it can screw things up. The basic operations: - db = lindb(filename) --- open database in 'filename', creating if nec. - db.set(somedict) --- set contents of database to contents of 'somedict' - db[key] --- return value for 'key' or raise KeyError - db.get(key, default) --- return value for 'key' or default """ ### basic primitives # I tried various powers of 2 here: 131072, 32768, 1024, 4096, and # now 8192; the smaller it was, the better the performance was, # but the difference between 4096 and 1024 wasn't significant. threshold_size = 4096 def open(self, filename, mode='r'): return file("%s/%s" % (self.dirname, filename), mode) def readline(self, f): """Read the next key-value pair from open file f. Returns (key, value) on success, or (None, None) on EOF. """ line = f.readline() if not line: return None, None # XXX no robustness against incomplete writes return line[:-1].split(': ', 2) def writeline(self, fh, key, value): fh.write("%s: %s\n" % (key, value)) ### internal routines def headwords(self, filename): """Generate a headwords file for 'filename'. The headwords file lists a few words from the file, along with the positions of their entries in the file. This allows the lookup process to find a particular entry relatively quickly, while retaining high locality of access. """ f = self.open(filename) hwdict = {} blockno = None while 1: pos = f.tell() name, value = self.readline(f) if name is None: break nblockno = pos // self.threshold_size if nblockno != blockno: hwdict[name] = pos blockno = nblockno self.writefile_with_headwords(filename + '.hw', hwdict) def writefile_with_headwords(self, filename, contents): size = self.writefile(filename, contents) if size > self.threshold_size: self.headwords(filename) def writefile(self, filename, contents): """Write file named 'filename' with contents of dict 'contents'. If necessary, this writes a headwords file for 'filename'. """ # obviously this technique won't work at all for stuff that doesn't # fit in RAM f = self.open(filename, 'w') for key in sorted(contents.keys()): self.writeline(f, key, contents[key]) size = f.tell() f.close() try: os.unlink('%s/%s.hw' % (self.dirname, filename)) except OSError: pass return size def lookup(self, filename, term): """Return greatest (key, value) pair not greater than 'term'. This returns the key-value pair where the key is term if there is one, otherwise the one before where 'term' would be. Uses headwords files if they exist to speed up access. """ start = 0 if os.path.exists('%s/%s.hw' % (self.dirname, filename)): name, value = self.lookup(filename + '.hw', term) if name is not None: assert type(name) is type(term) assert name <= term start = int(value) f = self.open(filename) f.seek(start, 0) name, value = None, None while 1: nname, nvalue = self.readline(f) if nname is None or nname > term: return name, value name, value = nname, nvalue def readlines(self, filename): """Yield each name-value pair from the specified file. Note that this interface is inconsistent with readline, which takes a file, not a filename. """ f = self.open(filename) while 1: n, v = self.readline(f) if n is None: return yield n, v def merge_indices(self, outfilename, filenames): outfile = self.open(outfilename, 'w') files = [self.readlines(filename) for filename in filenames] lastkey, values = None, [] for (key, value), filenum in merge.merge(files): if key != lastkey: if lastkey is not None: values.reverse() self.writeline(outfile, lastkey, ' '.join(values)) lastkey, values = key, [] values.extend(value.split()) if lastkey is not None: values.reverse() self.writeline(outfile, lastkey, ' '.join(values)) size = outfile.tell() outfile.close() if size > self.threshold_size: self.headwords(outfilename) # XXX maybe mmap? ### external interfaces def __init__(self, dirname): self.dirname = dirname if not os.path.exists(dirname): os.mkdir(dirname) def __getitem__(self, name): lname, lvalue = self.lookup('contents', name) if lname == name: return lvalue raise KeyError, name def get(self, name, value=None): try: return self[name] except KeyError: return value def set(self, contents): self.writefile_with_headwords('contents', contents) def ok(a, b): "Regression test function." assert a == b, (a, b) def test_lindb(): "Very basic regression test for linear db class lindb." os.system('rm -rf tmp.db') x = lindb("tmp.db") x.set({'roar': 'lion'}) ok(x['roar'], 'lion') ok(x.get('roar', 'boo'), 'lion') ok(x.get('foar', 'boo'), 'boo') os.system('rm -rf tmp.db') test_lindb() def mkindex(mm, start, maxpostings): """Create an in-memory index of part of an mbox file. This function takes a string-like object, such as an mmap object, and returns a list of byte offsets in it where mail messages start, and a postings dictionary that maps words to lists of message starting byte offsets. """ poses = [start] wordpat = re.compile("[a-zA-Z0-9]+") fi = wordpat.finditer(mm, start) allpostings = {} totalpostings = 0 while 1: # This won't match the beginning of the first message. nps = mm.find("\nFrom ", poses[-1]) if nps == -1: nps = len(mm) nps += 1 this_guys_postings = {} for mo in fi: if mo.start() >= nps: # I wish I could push back the item we just got, because # it belongs to the next message and won't get indexed, # but it's the "From" at the beginning of the message. # So it doesn't really matter. break this_guys_postings[mo.group(0).lower()] = 1 for word in this_guys_postings.iterkeys(): if not allpostings.has_key(word): allpostings[word] = [] allpostings[word].append(poses[-1]) totalpostings += len(this_guys_postings) if nps > len(mm) or totalpostings >= maxpostings: break poses.append(nps) if len(poses) % 250 == 0: print "%d msgs" % len(poses) print "indexed %d total postings" % totalpostings return poses, allpostings, nps class index: """Index of an mbox. Stores a lindb under mboxname.idx which contains posting lists for the mbox's messages, and allows you to search it. """ def postings(self, term): "Returns the posting list for a term." return [int(id) for id in self.db.get(term, '').split()] def msg(self, pos): """Returns the message at a particular byte offset in the mbox. The items in posting lists are just such byte offsets. """ npos = self.mm.find("\nFrom ", pos + 1) if npos == -1: npos = self.size rv = self.mm[pos:npos] assert "\nFrom " not in rv return rv def __init__(self, fname): "init. fname is the path to the mbox." self.f = file(fname) self.f.seek(0, 2) # EOF self.size = self.f.tell() self.mm = mmap.mmap(self.f.fileno(), self.size, access=mmap.ACCESS_READ) self.db = lindb(fname + '.idx') def write(self): "Reindex an mbox." pos = 0 filenames = [] while pos < len(self.mm): # 1/2 million fit in RAM easily poses, allpostings, pos = mkindex(self.mm, pos, 500 * 1000) print "indexed to %d;" % pos for term in allpostings.keys(): allpostings[term] = ' '.join([str(x) for x in allpostings[term]]) filename = 'newindex%d' % len(filenames) self.db.writefile(filename, allpostings) filenames.append(filename) print "wrote index;" print "merging indices;" self.db.merge_indices('contents', filenames) print "done." def search(self, terms, exclusions=[]): """Returns a posting list for some search. 'terms' is a list of terms to require the presence of; 'exclusions' is an optional list of terms to require the absence of. """ lists = [self.postings(term) for term in terms] excludelists = [self.postings(term) for term in exclusions] # intersect lists. # sort by length. ii is in the tuples to prevent comparing the lists # themselves. sorted_lists = sorted([(len(lists[ii]), ii, lists[ii]) for ii in range(len(lists))]) # start with smallest rvdict = dict([(key, 1) for key in sorted_lists[0][2]]) # it might be better to, I don't know, do this last? for list in excludelists: for message in list: if rvdict.has_key(message): del rvdict[message] # now remove items not in all the other lists for _, _, list in sorted_lists[1:]: newdict = {} for message in list: if rvdict.has_key(message): newdict[message] = 1 rvdict = newdict return sorted(rvdict.keys()) def cmdlinesearch(self, terms): """Parses a search, runs it, and returns a posting list. You have to break it up into terms first, as the Unix command line does for you, but then this parses it to see what has a '-' before it and what doesn't. """ plusterms = [] minusterms = [] for term in terms: if term.startswith('-'): minusterms.append(term[1:].lower()) else: plusterms.append(term.lower()) return self.search(plusterms, minusterms) def main(argv, stdout): """Command-line search/reindex interface.""" if len(argv) > 2: idx = index(argv[1]) for pos in idx.cmdlinesearch(argv[2:]): stdout.write(idx.msg(pos)) elif len(argv) == 2: index(argv[1]).write() else: print ("Usage:\n\t%s mailboxfilename -- to reindex\n" "\t%s mailboxfilename term1 term2 -unwanted1 -unwanted2 -- to look" % (argv[0], argv[0])) if __name__ == '__main__': main(sys.argv, sys.stdout)