I'm offline in Ecuador at the moment, so I can't look up the research
that has been done on data structures like this.  The basic idea is that
you can have smaller posting lists by omitting the actual words from the
postings table, at the cost of some false hits, and (similar to Bloom
filters) do multiple hashings; see the kragen-tol post for details.

This is (another) full-text indexing program I've written, here on
Beatrice's Mac, since my computer stopped booting.

#!/usr/bin/python
# vim:tw=80:sw=4:ai
# Full-text indexing hack to see if my hash inverted indexing idea plays out.
"""Full-text indexer for text files using hash inverted indexing.

The unusual thing about this indexer is that it doesn't store the actual
words the postings are for, only their hashes.  It's somewhat similar to
a Bloom filter; we hash each word with two different hash functions and
stick the corresponding docid into a hash bucket in each of two index
files.  By intersecting the lists of docids, we can achieve a reasonably
small false-positive rate.  (I probably should just use one bigger hash
table instead of two; I think the false-positive rate would go down.  I
think someone else has invented this already.)

I've only tried it on small datasets so far, because that's all I have
handy.

TODO:
- prevent things that aren't supposed to write to the index from creating new
  index directories (e.g. lookup, excerpt, misspelled commands)
- handle files containing more than one document (e.g. mbox files)
- use a single hash table instead of multiple hash tables
- clean up merge code
- clean up term list intersections (between the two indexes and between
  multiple terms) using new merge code so they're less inefficient
- prefer looking at one short posting list (and the documents) over one
  short and one very long
- clean up excerpting so it's not so grossly inefficient; also make it produce
  saner output
- handle non-text files (e.g. PDFs)
- have some way to reindex a file only if it has changed
- have some way to reindex all the files that have changed
- rewrite in Squeak

"""
import os, sys, sets, string, StringIO, struct, cgitb, errno, sha
# On speed, well, it's not that fast at indexing, but the index isn't
# that big, either, and lookups are reasonably fast, answering the
# questions I've posed in less than a second.
# Indexing my Safari cache directory (16232kiB in 1643 files) 500 files at a
# time took only:
# real    4m8.600s
# user    3m53.966s
# sys     0m12.240s
# when I used Python's built-in hash function.
# Switching to SHA-1 makes it take:
# real    4m45.544s
# user    4m27.842s
# sys     0m12.640s
# --- another 15% slower!  Caching the SHA-1 results makes it worse.
# This is all on Beatrice's 1.83 GHz Intel Core Duo Mac.
# This is 285 seconds.  That's 5.8 files per second or 57 kiB per
# second, or perhaps 15kiB per second on my old 600MHz laptop, roughly
# 300 times slower than the text-file mailbox indexer I've been using.
# The resulting index was 2116kiB, 13% of the size of the original data,
# and almost half of that is the fixed overhead.  That's a lot smaller
# than my text-file index.  Most of the hash buckets are still empty.

# It takes about 26 seconds to write an updated index of this size,
# which is slow, since presumably if you index a gigabyte, it will take
# 30 times this long, or about 13 minutes.  One solution is to write the
# index-mangling code in C so as to be able to do it in a few minutes;
# maybe a better solution is to support multiple index segments, like
# Lucene, and merge them when there are several large index segments of
# around the same size.

# A little profiling of the self-test says:
#    ncalls  tottime  percall  cumtime  percall filename:lineno(function)
#    262157   11.220    0.000   32.080    0.000 textindex.py:345(binbucket)
#         5   11.110    2.222   87.680   17.536 textindex.py:382(write_index)
#    524310   10.750    0.000   19.100    0.000 textindex.py:119(read)
#    327701    9.440    0.000   46.890    0.000 textindex.py:367(read_bucket)
#    327713    7.140    0.000   54.030    0.000 textindex.py:223(merge_union)
# but this is of dubious value, because the self-test normally runs in
# 24 seconds total, and the indices in the test are nearly empty.

### Serialization:

def unpacknums(strbuf):
    """Turns a binary string into an iterator of unsigned integers.

    Partial inverse of packnum.

    """
    rv = 0
    for c in strbuf:
        n = ord(c)
        rv = (rv << 7) + (n & 0x7f)
        if n & 0x80:  # high bit set: terminates
            yield rv
            rv = 0
    assert rv == 0

def packnum(n):
    """Turns an unsigned integer into a self-terminating binary string.

    Partial inverse of unpacknums; concatenate packnum results without
    terminators.

    """
    assert n >= 0   # won't terminate otherwise
    rv = []
    while 1:  # at least one byte
        rv.append(n & 0x7f)
        n >>= 7
        if n == 0: break
    rv[0] |= 0x80
    rv.reverse()
    return ''.join(map(chr, rv))

def delta_decode(nums):
    """Turns a list of numbers into a list of partial sums.

    Inverse of delta_encode.
    
    """
    n = 0
    for num in nums:
        n += num
        yield n

def delta_encode(nums):
    """Turns a list of partial sums into a list of numbers.

    Or, to look at it another way, turns a list of numbers into a list of
    differences between them.  Inverse of delta_decode.
    
    """
    last = 0
    for num in nums:
        yield num - last
        last = num

class Struct:
    """A more convenient interface to the struct module."""
    def __init__(self, fmt): self.fmt = fmt
    def calcsize(self): return struct.calcsize(self.fmt)
    def unpack(self, data): return struct.unpack(self.fmt, data)
    def pack(self, *data): return struct.pack(self.fmt, *data)
    # for file-like objects:
    def read(self, fo):
        """Read this data structure from a file-like object and unpack."""
        data = fo.read(self.calcsize())
        if not data: raise EOFError(fo)
        return self.unpack(data)
    def write(self, fo, *data):
        """Write this data to this file-like object."""
        fo.write(self.pack(*data))

### tests for serialization stuff
def ok(a, b):
    """From Perl's Test::Simple IIRC."""
    assert a == b, (a, b)
def testlist(mylist):
    """Small utility function for testing delta_encode, unpacknums, etc."""
    ok(list(delta_decode(delta_encode(mylist))), mylist)
    ok(list(unpacknums(''.join(map(packnum, mylist)))), mylist)
testlist([3, 5, 6, 10, 215])
testlist([0, 1, 2])
testlist([0, 0, 1, 2])
tmp = Struct('>q')
ok(tmp.calcsize(), 8)
ok(tmp.unpack(tmp.pack(1380)), (1380,))
ok(tmp.read(StringIO.StringIO(tmp.pack(1380))), (1380,))

### docid management

class IncompatibleDocIdFileFormat(Exception): pass
class DocIds:
    """Interface to a file containing the list of filenames.
    
    To keep the index small, we don't store filenames in the index; 
    instead, we store indices into this list.  We only ever append to 
    this list.
    """
    def __init__(self, fname):
        """fname is the name of the file to store the list in."""
        # yay this creates the file if need be on macos x
        self.doc_id_file = file(fname, 'a+')
        self.doc_id_file.seek(0)  # 'a' starts at eof by default
        self.filenames = []
        for line in self.doc_id_file:
            while line.endswith('\n'): line = line[:-1]
            # note that the following is a security risk if someone you don't
            # trust created your index
            textfile = 'textfile '
            if line.startswith(textfile):
                self.filenames.append(eval(line[len(textfile):]))
            else:
                raise IncompatibleDocIdFileFormat, line
    def __getitem__(self, fname):
        """Return the doc_id for the given filename, allocating if need be."""
        try: return self.filenames.index(fname)
        except ValueError:
            self.doc_id_file.write('textfile %r\n' % fname)
            self.doc_id_file.flush()
            self.filenames.append(fname)
            return self[fname]  # infinite recursion here would suggest a bug
    def __contains__(self, fname):
        """Test for the existence of fname in the list."""
        return fname in self.filenames
    def filename(self, doc_id):
        """Return the filename for the given doc_id."""
        return self.filenames[doc_id]
        
def test_doc_ids():
    """A test for docid file handling; writes to filesystem."""
    fname = 'tmp.docids.%s' % os.getpid()
    try:
        di = DocIds(fname)
        assert os.path.isfile(fname)
        ok(di['a'], 0)
        ok(di['b'], 1)
        ok(di['a'], 0)
        di = DocIds(fname)
        ok(di['b'], 1)
        ok(di['a'], 0)
    finally:
        os.remove(fname)

### word extraction

def words(fo):
    """Iterate over the words in a file."""
    word = []
    word_chars = sets.Set(string.digits + string.ascii_letters) # new in 2.3
    while 1:  # until eof actually
        while 1:  # skip nonword chars
            c = fo.read(1)
            if c == '': return
            if c in word_chars:
                word[:] = [c]
                break
        while 1:
            c = fo.read(1)
            if c == '':
                yield ''.join(word)
                return
            if c not in word_chars: break
            word.append(c)
        yield ''.join(word)

ok(list(words(StringIO.StringIO('hi there\n09 words  and 3jane RULZ! there'))),
   ['hi', 'there', '09', 'words', 'and', '3jane', 'RULZ', 'there'])
# This test is needed to test the second return path:
ok(list(words(StringIO.StringIO('So it goes...'))), ['So', 'it', 'goes'])

### Merging sorted lists to get union or intersection.

def merge_union(itera, iterb):
    """Return the union of two sorted lists, as a sorted list."""
    # Wow, this is incredibly ugly the way I wrote it.
    itera = iter(itera)
    iterb = iter(iterb)  # Ytterby!
    try: a = itera.next()
    except StopIteration:
        for item in iterb: yield item
        return
    try: b = iterb.next()
    except StopIteration:
        yield a
        for item in itera: yield item
        return
    while 1:
        if a < b:
            yield a
            try: a = itera.next()
            except StopIteration:
                yield b
                for item in iterb: yield item
                return
        elif b < a:
            yield b
            try: b = iterb.next()
            except StopIteration:
                yield a
                for item in itera: yield item
                return
        else:  # ==
            yield a
            try: a = itera.next()
            except StopIteration:
                for item in iterb: yield item
                return
            try: b = iterb.next()
            except StopIteration:
                yield a
                for item in itera: yield item
                return

# Each of these tests (except the first one) actually tests a different
# code path.  How gross is that?
ok(list(merge_union([], [])), [])
ok(list(merge_union([], [1, 2, 3])), [1, 2, 3])
ok(list(merge_union([1, 2, 3], [])), [1, 2, 3])
ok(list(merge_union([0, 1, 3], [0, 2, 6])), [0, 1, 2, 3, 6])
ok(list(merge_union([0, 2, 6], [0, 1, 3])), [0, 1, 2, 3, 6])
ok(list(merge_union([0, 2, 6], [0, 1, 2])), [0, 1, 2, 6])
ok(list(merge_union([0, 1, 2], [0, 2, 6])), [0, 1, 2, 6])

### Hash inverted index file management.

gethash = Struct(">L").unpack
sha1 = lambda astr: sha.sha(astr).digest()
def hash1(ob): 
    """Need two hash functions to make this work.  Here's the first.
    """
    return gethash(sha1(ob)[0:4])[0]
def hash2(ob): 
    """Need two hash functions to make this work.  Here's the second."""
    return gethash(sha1(ob)[4:8])[0]

class Index:
    """Interface to a single index."""
    nbuckets = 65536
    header_offset_struct = Struct('>q')
    def __init__(self, fname, hashfunc):
        """fname is the name of the index file.
        hashfunc is the hash function to use.
        """
        self.hash = hashfunc
        self.fname = fname
        self.fo = None
        self.additions = {}
        self.removed_doc_ids = sets.Set()
        self.header_offset_size = self.header_offset_struct.calcsize()
    def remove_doc_id(self, doc_id):
        """Remove all postings for the specified document ID.

        Does it lazily, and doesn't remove the postings added in this object
        since the last flush --- I don't expect that will be a problem, but 
        I thought I should document the stupidity here.
        """
        self.removed_doc_ids.add(doc_id)
    def add(self, doc_id, word):
        """Add a posting to be written out later."""
        bucket = self.additions.setdefault(self.bucketnum(word), [])
        if doc_id not in bucket: bucket.append(doc_id)
    def flush(self):
        """Write index updates back to the filesystem."""
        # note race condition on two concurrent updates:
        newfile = self.fname + '.new'  
        self.write_index(file(newfile, 'wb'))
        # Atomic rename for atomic index update.
        os.rename(newfile, self.fname)
        self.fo = None
    def sorted_additions(self, bucketnum):
        """The doc_ids currently pending addition to this bucket number.
        
        In order, as implied by the name.
        
        We could be lazier about this if we used a heap, but in the current
        program, we never do anything but write this whole list out to the
        index, so that would be premature optimization.
        """
        try: rv = self.additions[bucketnum]
        except KeyError: return []
        rv.sort()
        return rv
    def doc_ids(self, bucketnum):
        """Return the doc_ids for this bucket number.

        This includes any pending updates as well as those from the file.

        """
        return merge_union(self.read_bucket(bucketnum), 
                           self.sorted_additions(bucketnum))
    def binbucket(self, fo, bucketnum):
        """The binary string containing the doc_ids in the index file.

        This may be of zero length, or it may be quite large.  It won't 
        contain currently pending additions.
        """
        # So here, to find the length of the bucket, I need the starts of two
        # buckets.  So I need a special case either for the first bucket or for
        # the last bucket, or I need nbuckets+1 items in the header.  I chose
        # to have a special case for the first bucket, and store the offsets of
        # the *ends* of the buckets in the header.
        if bucketnum == 0: 
            start_offset = self.header_offset_size * self.nbuckets
        else:
            fo.seek((bucketnum - 1) * self.header_offset_size)
            (start_offset,) = self.header_offset_struct.read(fo)
        fo.seek(bucketnum * self.header_offset_size)
        (end_offset,) = self.header_offset_struct.read(fo)
        fo.seek(start_offset)
        return fo.read(end_offset - start_offset)
    def read_bucket(self, bucketnum):
        """The doc_ids in bucketnum in the file, minus those pending deletion.

        Deletion is processed here (rather than in .doc_ids()) so that it 
        won't affect newly added postings.
        """
        if not self.fo:
            try: 
                self.fo = file(self.fname, 'rb')
            except IOError, e: 
                if e.errno == errno.ENOENT: return  # it doesn't exist yet
                else: raise  # wouldn't want EPERM to wipe out the index
        fo = self.fo
        for doc_id in delta_decode(unpacknums(self.binbucket(fo, bucketnum))):
            if doc_id not in self.removed_doc_ids: yield doc_id
    def write_index(self, fo):
        """Given a filelike object, write the current index to it."""
        header_offsets = []
        fo.seek(self.nbuckets * self.header_offset_size)
        for ii in xrange(self.nbuckets):
            bucketstr = ''.join(map(packnum, delta_encode(self.doc_ids(ii))))
            fo.write(bucketstr)
            header_offsets.append(fo.tell())
        fo.seek(0)
        for item in header_offsets: self.header_offset_struct.write(fo, item)
        fo.flush()
        fo.close()
    def lookup(self, word):
        """Main entry point: return doc_ids that might contain 'word'."""
        return self.doc_ids(self.bucketnum(word))
    def bucketnum(self, word):
        """Return the bucket number in which postings for 'word' would be."""
        # note that x % y returns positive numbers in Python even when x is
        # negative
        return self.hash(word) % self.nbuckets

def test_index():
    """Slow test that writes to the filesystem."""
    fname = 'tmp.index.%d' % os.getpid()
    try:
        idx = Index(fname, hash)
        assert not os.path.isfile(fname)
        ok(list(idx.lookup('foo')), [])
        idx.flush()
        assert os.path.isfile(fname)
        ok(list(idx.lookup('foo')), [])  # No documents added yet!
        idx.add(23, 'foo')
        ok(list(idx.lookup('foo')), [23])  # No chance for hash collision yet
        idx.flush()

        idx = Index(fname, hash)
        ok(list(idx.lookup('foo')), [23])
        idx.remove_doc_id(23)
        ok(list(idx.lookup('foo')), [])
        idx.flush()

        idx = Index(fname, hash)
        ok(list(idx.lookup('foo')), [])
        idx.add(32, 'foo')
        idx.add(31, 'foo')
        idx.add(31, 'bar')
        ok(list(idx.lookup('foo')), [31, 32])
        idx.add(31, 'foo')
        ok(list(idx.lookup('foo')), [31, 32])  # no dups!
        assert 31 in list(idx.lookup('bar'))   # could have 32 if collision
        idx.flush()

        idx = Index(fname, hash)
        ok(list(idx.lookup('foo')), [31, 32])
        idx.remove_doc_id(31)
        ok(list(idx.lookup('foo')), [32])
        idx.add(31, 'bar')
        assert 31 in list(idx.lookup('bar'))   # could have 32 if collision
        idx.flush()

        idx = Index(fname, hash)
        if idx.bucketnum('foo') != idx.bucketnum('bar'):
            ok(list(idx.lookup('foo')), [32])
            ok(list(idx.lookup('bar')), [31])
        else:
            print 'hash collision, skipping some tests'
    finally:
        for name in [fname, fname + '.new']:
            if os.path.exists(name): os.remove(name)

### Main entry point

class DB:
    """Interface to a directory containing an index of some files."""
    explanation = """This directory contains a full-text index of some files.

    The index is stored in the following files:
    doc_ids: a text file containing the names of the files, as Python
        expressions, each preceded by the word 'textfile' and a space.
        Generally this means they have quotes around them and quotes in
        them are backslashed, but other strange characters may also be
        represented with backslash sequences, as in C.  See the Python
        documentation for details.  Note that the indexer will evaluate
        any line you put in this file as a Python expression, so be
        careful.
    index: a hash table of 65 536 buckets.  Starts with 65 536
        big-endian 64-bit offsets into this file; each points one byte past
        the end of the postings for the corresponding hash bucket.  The
        postings for the 0th hash bucket starts after the end of this
        header (after the 524 288th byte), and the postings of consecutive
        hash buckets are simply concatenated together with no
        delimiters.  At present, I'm using Python's built-in hash
        function for this hash table, which isn't well-documented.
        Words in the indexed files are hashed into these hash buckets.
        The postings are merely document IDs (zero-based line numbers
        from the doc_ids file) encoded as follows.  First, the document
        IDs for a bucket are sorted; second, each one except the first
        is replaced with its difference from the one before it; third,
        each number in the resulting list is encoded in a variable
        number of bytes, seven bits per byte, with the last byte having
        the high bit set.  Accordingly [0, 1, 2, 3, 5] encodes as
        (hexadecimal) 80 81 81 81 82.  The actual words are not stored
        anywhere, which makes the results somewhat inexact.
    index2: similar to index, but uses a different hash function; at the
        moment, this is the next-to-least-significant 16 bits of
        Python's built-in hash function.
    
    The headers (the offsets) are stored in the same files as the
    postings to make index updates atomic.
    """
    def __init__(self, dbdir):
        """dbdir is the path (relative or absolute) to store the index at."""
        if not os.path.isdir(dbdir): os.makedirs(dbdir)
        self.doc_ids = DocIds(os.path.join(dbdir, 'doc_ids'))
        self.index1 = Index(os.path.join(dbdir, 'index'), hash1)
        self.index2 = Index(os.path.join(dbdir, 'index2'), hash2)
        self.dbdir = dbdir
    def add(self, filename):
        """Update the indexes with the current contents of the named file."""
        fo = file(filename)
        self.deindex(filename)
        doc_id = self.doc_ids[filename]
        for word in words(fo): 
            self.index1.add(doc_id, word.lower())
            self.index2.add(doc_id, word.lower())
        fo.close()  # not really necessary in Refcounted Land
    def deindex(self, filename):
        """Remove a file from the indexes if it is indexed.
        
        Does not remove the file from the doc_id list.
        """
        if filename in self.doc_ids:
            self.index1.remove_doc_id(self.doc_ids[filename])
            self.index2.remove_doc_id(self.doc_ids[filename])
    def flush(self):
        """Flush indexes to disk after updates."""
        self.index1.flush()
        self.index2.flush()
        f = file(os.path.join(self.dbdir, 'README'), 'w')
        f.write(self.explanation)
        f.close()
    def ensure(self, terms, fnames):
        """Filter out files missing a term."""
        for fname in fnames:
            try: fo = file(fname)
            except: continue # deleted?
            termset = sets.Set(terms)
            for word in words(fo):
                termset.discard(word.lower())
                if not termset:
                    yield fname
                    break
    def lookup(self, terms):
        """Iterate over some filenames containing terms."""
        terms = [term.lower() for term in terms]
        if len(terms) == 1: 
            return self.ensure(terms, self.lookup1(terms[0]))
        # hash join for now until I have lazier intersections:
        fnames = sets.Set(self.lookup1(terms[0]))
        for term in terms[1:]:
            fnames &= sets.Set(self.lookup1(term))
        return self.ensure(terms, fnames)
    def lookup1(self, term):
        """Iterate over some filenames containing a term."""
        # a little cheat here for now until I have lazier intersections:
        index2_docids = sets.Set(self.index2.lookup(term))
        for doc_id in self.index1.lookup(term):
            if doc_id not in index2_docids: continue
            yield self.doc_ids.filename(doc_id)
    def excerpts(self, terms):
        """Return filenames and excerpts for specified terms."""
        # Fairly gross brute force approach --- we end up reading the 
        # whole file twice; but it does work, more or less.
        terms = [term.lower() for term in terms]
        for fname in self.lookup(terms):
            termset = sets.Set(terms)
            excerpts = []
            contextlen = 71
            try:
                fo = file(fname)
                pos = fo.tell()
                for word in words(fo):
                    if word.lower() in termset:
                        termset.discard(word.lower())
                        saved_pos = fo.tell()
                        fo.seek(max(0, pos - contextlen/2))
                        textexcerpt = fo.read(contextlen)
                        excerpts.append(textexcerpt.replace('\n', ' ')
                                        .replace('\t', ' '))
                        if not termset: break
                        fo.seek(saved_pos)
                    pos = fo.tell()
            except IOError, e:
                if e.errno == errno.ENOENT: continue
                else: raise
            yield fname, excerpts

def test_all():
    """Run tests that might take a while or write to the filesystem."""
    test_doc_ids()
    test_index()

def main(args):
    """Command-line entry point."""
    db = DB(args[0])
    cmd = args[1]
    if cmd == 'add':
        for filename in args[2:]:
            if not os.path.isabs(filename):
                filename = os.path.abspath(filename)
            print 'indexing', filename
            db.add(filename)
        print 'updating index'
        db.flush()
    elif cmd == 'lookup':
        for filename in db.lookup(args[2:]):
            print filename
    elif cmd == 'excerpt':
        for filename, excerpts in db.excerpts(args[2:]):
            print filename
            for excerpt in excerpts:
                print '\t', excerpt
    elif cmd == 'test':
        test_all()
        print 'Tests OK :)'
    elif cmd == 'deindex':
        for filename in args[2:]:
            print 'deindexing', filename
            db.deindex(filename)
        print 'updating index'
        db.flush()
    else:
        print '''usage: 
        textindex.py dbdir add file1 file2..., or 
        textindex.py dbdir lookup term1 term2..., or
        textindex.py dbdir excerpt term1 term2..., or
        textindex.py dbdir deindex file1 file2..., or
        textindex.py test'''

if __name__ == '__main__':
    cgitb.enable(format='text')
    main(sys.argv[1:])

Reply via email to