Thursday afternoon, after work, I hacked the full-text mail indexing
program from last weekend so it could handle arbitrarily large
mailboxes.  It generates N small indices, each small enough to easily
fit into memory, then does an N-way merge on them.  On Gentle, which
is a 500MHz AMD K6 with 64kB of cache and 64MB of RAM, it indexed my
559MB mailbox in 90 minutes this afternoon: an hour to generate the
indices, then half an hour to merge them.  That's a pathetic 100kB/s.

On Thursday, generating larger index chunks (2 million postings
instead of 1/2 million), on a machine with a 800MHz CPU and 256MB of
RAM, it took only about 20 minutes to index the same corpus, which is
a more reasonable rate.

The code here is in two files: merge.py, which merges arbitrary
collections of sequences, and maildex.py.  Maildex.py is currently
pretty messy and needs to be restructured.  I haven't gotten around to
doing it yet, but if I wait until I do before I send it out, then it
may never happen.

When I said, "arbitrarily large," I didn't really mean it.  This code
still has several important scaling limitations:
- it mmaps the whole mbox, so it can't deal with mboxes larger than a
  few gigabytes on 32-bit platforms (subject to your OS's virtual memory map;
  default on Linux puts the limit at a little under 2GB)
- search times scale at least linearly with the length of the longest posting
  list involved; posting list lengths scale linearly with the corpus size.
- maximum memory use scales with the size of individual messages; a single 
  large message can cause excessive memory use
- the results aren't ranked, so really large result sets are useless
- there's no incremental indexing or incremental search


Here's merge.py:

#!/usr/bin/env python
'''Generic lazy merge algorithm.

There are lots of "merge" algorithms; generally they make one pass
over each of several sorted lists.  They work well in environments
where locality of reference is important, such as when your data is
much larger than your physical RAM.  Some of them work on arbitrary
numbers of lists of items of the same type, while others work on
specific numbers of lists of items of different types.

The thing that binds them all together is that they all traverse their
input lists in the same way.  This way is not, in general, trivial to
implement.  This module implements it so I won't have to implement it
again, at least not for small numbers of lists.

Most problems that have merge-algorithm solutions also have hashtable
solutions which run faster when you have sufficient random-access
storage to hold all your data.  Accordingly, merge algorithms have
fallen very much out of favor over the last thirty or forty years as
main memory sizes have increased from megabits to terabits.

Some examples of merge algorithms:
- produce a single sorted list from multiple sorted lists.  (This is
  the kind of merge used in mergesort.)
- produce the set union, set intersection, or set difference of two
  sorted lists.
- given a master file and an update file sorted in the same order,
  produce a new master file with all the updates applied.

It turns out that you can implement all the other algorithms on top of
the first one, as long as you remember which input list each output
item came from.

My particular problem is that I have inverted index files rather too
large for my RAM, so I have to index my corpus in chunks, then merge
the individual chunks to form a master index.

'''

def merge2(seq1, seq2):
    """Generator for the merged output of two sorted sequences."""
    it1, it2 = iter(seq1), iter(seq2)
    try: thing1 = it1.next()
    except StopIteration:
        for item in it2: yield item
        return
    try: thing2 = it2.next()
    except StopIteration:
        yield thing1
        for item in it1: yield item
        return
    while 1:
        if thing1 > thing2:
            it1, it2, thing1, thing2 = it2, it1, thing2, thing1
        yield thing1
        try: thing1 = it1.next()
        except StopIteration:
            yield thing2
            for item in it2: yield item
            return

def taggediter(iter, tag):
    """Generator that tags each item in a sequence with its source."""
    for item in iter: yield (item, tag)

def _mergen(seqs):
    """Generate the merged output of N sorted sequences, N >= 0.

    Creates a heap of merge2 iterators that do the actual work, then
    returns the root of that heap.
    """
    if len(seqs) == 0: return []
    if len(seqs) == 1: return seqs[0]
    elif len(seqs) == 2: return merge2(seqs[0], seqs[1])
    else:
        part = len(seqs)/2  # 1 -> 0, 2 -> 1, 3 -> 1, 4 -> 2, 5 -> 2
        return merge2(_mergen(seqs[:part]), _mergen(seqs[part:]))

def merge(iters):
    """Generate the tagged, merged output of N sorted sequences, N >= 0.

    Items from the first sequence will be returned as (item, 0), items
    from the second as (item, 1), etc.

    """
    return _mergen([taggediter(iters[i], i) for i in range(len(iters))])

def ok(a, b): assert a==b, (a,b)
def test():
    ok(list(merge([])), [])
    ok(list(merge([['a', 'b', 'c']])), [('a', 0), ('b', 0), ('c', 0)])
    ok(list(merge([[1], []])), [(1, 0)])
    ok(list(merge([[], [1]])), [(1, 1)])
    ok(list(merge([[1], [2]])), [(1, 0), (2, 1)])
    ok(list(merge([[2], [1]])), [(1, 1), (2, 0)])
    
    ok(list(merge([[1, 2, 3], [1.5, 3.25, 3.5]])),
       [(1, 0), (1.5, 1), (2, 0), (3, 0), (3.25, 1), (3.5, 1)])
    ok(list(merge([[1], [5], [4], [6]])),
       [(1, 0), (4, 2), (5, 1), (6, 3)])

test()





And here's maildex.py:

#!/usr/bin/env python
"""Index my mailbox.

This utility generates a term index of an mbox file and allows you to
search it.  If you run it with just the mbox file as an argument, it
assumes you want to reindex it; if you run it with the mbox file and
some other terms, it does a search.  Like Google, it's default-and,
and you can precede a term with a - to exclude messages containing it.

Requires recent Python, probably 2.3.  Won't work in 2.1.3.

I have only 4.5M postings out of 7K messages in 50MB, so I can make a
list of all postings in RAM.  It takes about a minute or two and
another 50MB of RAM.  Writing it to disk with Berkeley DB used to take
another 10 minutes or so!  The index files are about 27MB (down from
45MB with Berkeley DB), but gzip to <11MB.

Unimplemented feature 1: merge multiple index files into a single one,
so you can index files whose index won't fit in RAM.  (The index seems
to be about the same size as the file itself, so this probably means,
to index files bigger than RAM.)  If you had a really big corpus, you
might need to do this more than once, but who has a corpus that big?

Unimplemented feature 2: handle mbox files bigger than the address
space.

"""
# - Python space costs:
#   Looks like a dict is about 154 bytes, an empty list about 48,
#   and a one-item list about 96 bytes.  There are about 340K terms
#   if we use the naive [a-zA-Z0-9]+ term definition, so about 340K *
#   96 + 4M * 4 bytes = ~48MB.

# "Well, I've read about inverted indices and stuff, but I've never
# built a full-text search engine." - me on Thursday
# "Dude, it's, like, cake." - DMH

import mmap, re, sys, os, merge

def sorted(alist):
    "Returns a list after sorting it."
    alist.sort()
    return alist

class lindb:
    """Linear database.

    I was storing data in a Berkeley DB file, but it was really slow ---
    it took 10 minutes to write out 45 megabytes of data with 340K keys.
    So I thought I'd take the Lucene approach; just write a sorted list
    of key-value pairs, write a "headwords" file to make it easy to find
    the right part of the file, then linearly search.

    This module lets it take much less time, by just sorting the data
    and storing it in a text file.  It's very slightly slower than the
    Berkeley DB version, but uses about half the space (for my data.)

    The big compromises are that this module doesn't allow the same
    flexibility in access as Berkeley DB, and it doesn't allow the storage
    of arbitrary data --- data with newlines or ": " in it can screw
    things up.

    The basic operations:
    - db = lindb(filename) --- open database in 'filename', creating if nec.
    - db.set(somedict) --- set contents of database to contents of 'somedict'
    - db[key] --- return value for 'key' or raise KeyError
    - db.get(key, default) --- return value for 'key' or default

    """

    ### basic primitives

    # I tried various powers of 2 here: 131072, 32768, 1024, 4096, and
    # now 8192; the smaller it was, the better the performance was,
    # but the difference between 4096 and 1024 wasn't significant.

    threshold_size = 4096

    def open(self, filename, mode='r'):
        return file("%s/%s" % (self.dirname, filename), mode)

    def readline(self, f):
        """Read the next key-value pair from open file f.

        Returns (key, value) on success, or (None, None) on EOF.

        """
        line = f.readline()
        if not line: return None, None
        # XXX no robustness against incomplete writes
        return line[:-1].split(': ', 2)

    def writeline(self, fh, key, value):
        fh.write("%s: %s\n" % (key, value))

    ### internal routines

    def headwords(self, filename):
        """Generate a headwords file for 'filename'.

        The headwords file lists a few words from the file, along with
        the positions of their entries in the file.  This allows the
        lookup process to find a particular entry relatively quickly,
        while retaining high locality of access.

        """
        f = self.open(filename)
        hwdict = {}
        blockno = None
        while 1:
            pos = f.tell()
            name, value = self.readline(f)
            if name is None: break
            nblockno = pos // self.threshold_size
            if nblockno != blockno:
                hwdict[name] = pos
                blockno = nblockno
        self.writefile_with_headwords(filename + '.hw', hwdict)

    def writefile_with_headwords(self, filename, contents):
        size = self.writefile(filename, contents)
        if size > self.threshold_size: self.headwords(filename)

    def writefile(self, filename, contents):
        """Write file named 'filename' with contents of dict 'contents'.

        If necessary, this writes a headwords file for 'filename'.

        """
        # obviously this technique won't work at all for stuff that doesn't
        # fit in RAM
        f = self.open(filename, 'w')
        for key in sorted(contents.keys()):
            self.writeline(f, key, contents[key])
        size = f.tell()
        f.close()
        try: os.unlink('%s/%s.hw' % (self.dirname, filename))
        except OSError: pass
        return size
        

    def lookup(self, filename, term):
        """Return greatest (key, value) pair not greater than 'term'.

        This returns the key-value pair where the key is term if there
        is one, otherwise the one before where 'term' would be.

        Uses headwords files if they exist to speed up access.

        """
        start = 0
        if os.path.exists('%s/%s.hw' % (self.dirname, filename)):
            name, value = self.lookup(filename + '.hw', term)
            if name is not None:
                assert type(name) is type(term)
                assert name <= term
                start = int(value)

        f = self.open(filename)
        f.seek(start, 0)

        name, value = None, None
        while 1:
            nname, nvalue = self.readline(f)
            if nname is None or nname > term: return name, value
            name, value = nname, nvalue

    def readlines(self, filename):
        """Yield each name-value pair from the specified file.

        Note that this interface is inconsistent with readline, which
        takes a file, not a filename.

        """
        f = self.open(filename)
        while 1:
            n, v = self.readline(f)
            if n is None: return
            yield n, v

    def merge_indices(self, outfilename, filenames):
        outfile = self.open(outfilename, 'w')
        files = [self.readlines(filename) for filename in filenames]
        lastkey, values = None, []
        for (key, value), filenum in merge.merge(files):
            if key != lastkey:
                if lastkey is not None:
                    values.reverse()
                    self.writeline(outfile, lastkey, ' '.join(values))
                lastkey, values = key, []
            values.extend(value.split())
        if lastkey is not None:
            values.reverse()
            self.writeline(outfile, lastkey, ' '.join(values))
        size = outfile.tell()
        outfile.close()
        if size > self.threshold_size: self.headwords(outfilename)

    # XXX maybe mmap?

    ### external interfaces

    def __init__(self, dirname):
        self.dirname = dirname
        if not os.path.exists(dirname): os.mkdir(dirname)
    def __getitem__(self, name):
        lname, lvalue = self.lookup('contents', name)
        if lname == name: return lvalue
        raise KeyError, name
    def get(self, name, value=None):
        try: return self[name]
        except KeyError: return value
    def set(self, contents):
        self.writefile_with_headwords('contents', contents)

def ok(a, b):
    "Regression test function."
    assert a == b, (a, b)

def test_lindb():
    "Very basic regression test for linear db class lindb."
    os.system('rm -rf tmp.db')
    x = lindb("tmp.db")
    x.set({'roar': 'lion'})
    ok(x['roar'], 'lion')
    ok(x.get('roar', 'boo'), 'lion')
    ok(x.get('foar', 'boo'), 'boo')
    os.system('rm -rf tmp.db')

test_lindb()


def mkindex(mm, start, maxpostings):
    """Create an in-memory index of part of an mbox file.

    This function takes a string-like object, such as an mmap object,
    and returns a list of byte offsets in it where mail messages
    start, and a postings dictionary that maps words to lists of
    message starting byte offsets.

    """
    poses = [start]
    wordpat = re.compile("[a-zA-Z0-9]+")
    fi = wordpat.finditer(mm, start)
    allpostings = {}
    totalpostings = 0
    while 1:
        # This won't match the beginning of the first message.
        nps = mm.find("\nFrom ", poses[-1])
        if nps == -1: nps = len(mm)
        nps += 1
        this_guys_postings = {}
        for mo in fi:
            if mo.start() >= nps:
                # I wish I could push back the item we just got, because
                # it belongs to the next message and won't get indexed,
                # but it's the "From" at the beginning of the message.
                # So it doesn't really matter.
                break
            this_guys_postings[mo.group(0).lower()] = 1
        for word in this_guys_postings.iterkeys():
            if not allpostings.has_key(word): allpostings[word] = []
            allpostings[word].append(poses[-1])
        totalpostings += len(this_guys_postings)
        if nps > len(mm) or totalpostings >= maxpostings: break
        poses.append(nps)
        if len(poses) % 250 == 0: print "%d msgs" % len(poses)
    print "indexed %d total postings" % totalpostings
    return poses, allpostings, nps

class index:
    """Index of an mbox.

    Stores a lindb under mboxname.idx which contains posting lists for
    the mbox's messages, and allows you to search it.

    """
    def postings(self, term):
        "Returns the posting list for a term."
        return [int(id) for id in self.db.get(term, '').split()]

    def msg(self, pos):
        """Returns the message at a particular byte offset in the mbox.

        The items in posting lists are just such byte offsets.

        """
        npos = self.mm.find("\nFrom ", pos + 1)
        if npos == -1: npos = self.size
        rv = self.mm[pos:npos]
        assert "\nFrom " not in rv
        return rv

    def __init__(self, fname):
        "init.  fname is the path to the mbox."
        self.f = file(fname)
        self.f.seek(0, 2) # EOF
        self.size = self.f.tell()
        self.mm = mmap.mmap(self.f.fileno(), self.size,
                            access=mmap.ACCESS_READ)
        self.db = lindb(fname + '.idx')

    def write(self):
        "Reindex an mbox."
        pos = 0
        filenames = []
        while pos < len(self.mm):
            # 1/2 million fit in RAM easily
            poses, allpostings, pos = mkindex(self.mm, pos, 500 * 1000) 
            print "indexed to %d;" % pos
            for term in allpostings.keys():
                allpostings[term] = ' '.join([str(x) for x in allpostings[term]])
            filename = 'newindex%d' % len(filenames)
            self.db.writefile(filename, allpostings)
            filenames.append(filename)
            print "wrote index;"
        print "merging indices;"
        self.db.merge_indices('contents', filenames)
        print "done."

    def search(self, terms, exclusions=[]):
        """Returns a posting list for some search.

        'terms' is a list of terms to require the presence of;
        'exclusions' is an optional list of terms to require the
        absence of.

        """
        lists = [self.postings(term) for term in terms]
        excludelists = [self.postings(term) for term in exclusions]

        # intersect lists.
        # sort by length.  ii is in the tuples to prevent comparing the lists
        # themselves.
        sorted_lists = sorted([(len(lists[ii]), ii, lists[ii])
                               for ii in range(len(lists))])
        # start with smallest 
        rvdict = dict([(key, 1) for key in sorted_lists[0][2]])

        # it might be better to, I don't know, do this last?
        for list in excludelists:
            for message in list:
                if rvdict.has_key(message): del rvdict[message]

        # now remove items not in all the other lists
        for _, _, list in sorted_lists[1:]:
            newdict = {}
            for message in list:
                if rvdict.has_key(message): newdict[message] = 1
            rvdict = newdict

        return sorted(rvdict.keys())
    
    def cmdlinesearch(self, terms):
        """Parses a search, runs it, and returns a posting list.

        You have to break it up into terms first, as the Unix command
        line does for you, but then this parses it to see what has a
        '-' before it and what doesn't.

        """
        plusterms = []
        minusterms = []
        for term in terms:
            if term.startswith('-'): minusterms.append(term[1:].lower())
            else: plusterms.append(term.lower())
        return self.search(plusterms, minusterms)
            
    
def main(argv, stdout):
    """Command-line search/reindex interface."""
    if len(argv) > 2:
        idx = index(argv[1])
        for pos in idx.cmdlinesearch(argv[2:]):
            stdout.write(idx.msg(pos))
    elif len(argv) == 2:
        index(argv[1]).write()
    else:
        print ("Usage:\n\t%s mailboxfilename -- to reindex\n"
          "\t%s mailboxfilename term1 term2 -unwanted1 -unwanted2 -- to look"
               % (argv[0], argv[0]))

if __name__ == '__main__': main(sys.argv, sys.stdout)



Reply via email to