The good news is that on an Athlon 850 with half a gigabyte of RAM, I
can index my 989-megabyte mailbox in 491 megabytes and 9 minutes, 27
seconds, which is relatively fast as full-text indexing goes, and its
working set is customizable and pretty small (set to 64MB for this
test).  The bad news is:

- all the disk I/O I need to do during this process adds up to only
  about 60 seconds' worth of disk I/O (hdparm -t says 37 megabytes per
  second), so it's not limited by the disk interface bandwidth;
- it doesn't run reliably because it's trying to mmap a file that's
  nearly a gigabyte (I think that's why?)
- out of those 9:27, only 4:35 was CPU time, and I'm not sure what 
  happened to the rest of it --- was it spent waiting for disk seeks 
  or running other user processes or what?
- it's way too much code, partly because the searcher is still in 
  Python, and is used to create the headwords files, but contains a lot
  of dead code too
- it's written in C99, so you need a recent compiler to compile it (although
  it might compile OK as C++ --- I haven't tried)
- it's kludgy

Like everything posted to kragen-hacks without an explicit notice to
the contrary, I relinquish copyright on this work and place it in the
public domain.

Here's the "reindexmail" shell script:
#!/bin/sh
time rm tmp.mboxtail.idx/*
time ~/devel/maildex-parts tmp.mboxtail
time ~/devel/mergedex tmp.mboxtail.idx/partindex* > tmp.mboxtail.idx/contents
time PYTHONPATH=~/devel python -c 
'__import__("maildex").lindb("tmp.mboxtail.idx").headwords("contents")'

This invokes three other programs: maildex-parts, mergedex, and
maildex.py.  maildex.py imports merge.py, although it doesn't use it
unless it's doing the merging itself.  Which it isn't, here.

maildex-parts and mergedex are in C.  Here's maildex-parts.c:

/*
maildex-parts.c: find words, index by mail message byte offset

This program computes an inverted index for a mailbox file, in the
same format produced by maildex.py, which I posted to kragen-hacks
in March 2004.  But it does it much faster.

I'm compiling with this command line with gcc 3.3.3:
cc -Wall -ftracer -O3   -std=c99    maildex-parts.c   -o maildex-parts

Using -fprofile-arcs/-fbranch-probabilities might make it faster.

Now this program should be able to index arbitrarily large mailboxes,
up to nearly the size of virtual memory.

TODO: It'd probably be good to automatically expand the allocated
memory instead of allocating all of it up front.  It's fine as long as
you don't have more than 512MB of RAM though.  (And as long as you
have at least that much swap.)

Detailed Notes on Optimizations
-------------------------------

TODO: I still can't figure out why disk bandwidth * total I/O is so
much less than total run-time for this program.  This is the program's
biggest performance problem right now.  It spends 30%-75% of its time
waiting for the disk for no good reason.

TODO: Maybe using a trie instead of a hash table (since we have to
produce sorted output anyhow) would improve things.

TODO: to compare the words, it's using repz cmpsb, which is pretty
cool as far as it goes, but don't all of our words start on 32-bit
boundaries?  Maybe we could zero out the remainder and compare them a
32-bit cell at a time.  It's also uselessly checking for a zero
wordlen.

For a while, I thought it might be better to case-fold at output time,
so I'd only have to downcase 3 million characters or so instead of 50
million or so.  Then I changed the program so it fetches the lowercase
versions of characters as a side effect of testing whether or not the
character is a word character, so there's no extra cost.  (If I
changed to a faster way of distinguishing word from nonword
characters, it might not have this side effect and I might have to
reconsider.  I haven't found one.  I tried a switch statement, and gcc
turned it into a series of conditional branches on the theory that it
would be faster than an indirect jump.  Maybe it would have been, but
it was still a lot slower than indexing into the wordchar[] array.
(gcc also didn't realize that it could perform common subexpression
elimination on the call to the function containing the switch
statement, so I did the common subexpression elimination myself by
hand, but it was still slower.))

Much of the time and space this program spends on my mailbox, it
spends indexing binary attachments.  If I skipped the binary
attachments it would work so much better and faster.

The hash function I started with was Michael Neumann's version of
Peter J. Weinberger's well-known hash function (usually known as
"hashpjw".)  Briefly, this function rotates the hash left by four bits
each iteration, then adds in the current character.  This version had
a couple of strange features: if it discovered that the leftmost four
bits, before rotation, were zero, it avoided merging them in at the
low end, and it took pains to clear the upper four bits after the
rotation.  The first seemed like a misguided attempt at optimization
to me, and the second just seemed like a waste of time.  Simplifying
the hash function saved 0.4 CPU seconds on 50 megabytes of input text.

I also changed the hash function to count downward from the end of the
word instead of up from the beginning, which seems to have saved about
0.1 CPU seconds on the same amount of input text, but that's about a
1% improvement --- within my measurement error.

On Memory Usage, with Special Attention to the Worst Case
---------------------------------------------------------

This program, at present, allocates memory only when it's adding
postings to its postings hash.  A posting is 8 bytes, and most of the
time we only add one of those, or nothing, when we encounter a new
posting.  But some of the time we add a wordentry, which is presently
20 bytes.  The program allocates a bunch of these structures, then it
twiddles them around for a while to sort lists for output, and then it
outputs stuff and frees it all.  When I was using malloc, the malloc
overhead on all these 8-byte objects was killing me.  (mallocing 8
bytes 4 million times consumes 64 megs of RAM on my machine.  So the
best case for malloc overhead is one posting per word, with 16 bytes
of malloc overhead and 28 bytes of user data.  Ick.)

Typical empirical numbers, back when I was using malloc, follow:
50_000_000 bytes of email
5560 messages
4_416_563 words in them (according to wc)
411_168 different terms
2_826_904 postings + search terms, thus
2_415_736 postings
48M peak memory use (+ 48MB mmapped file)
22 seconds run-time
We'd expect 2_415_73616 + 411_16828 = 50.1 megabytes
22.6MB of this was malloc overhead.

So now this program allocates a single large area to start with, and
allocates stuff inside of it stack-wise by nudging a pointer.  It does
all the work inside there, and then deallocates it all at once when it
moves on to the next mailbox segment.  This halved the amount of dirty
data.

Once we excluded terms of 20 characters or more, storing our strings
in our wordentry structure only cost us an extra 4-8 bytes, on
average.  We stored them at the end, with a count, obviating the char
*word inside the structure.  This allowed us to eliminate the actual
document from our working set when we're sorting and writing out the
wordlist, and made it possible to shrink the posting lists
significantly by making things case-insensitive up front instead of,
say, at output time.

I just built this indexer and ran it on a 1GHz machine with a 40MB/sec
disk on 989 megabytes of email, using the 128MB index-chunk-size
mentioned below, and got these numbers:

        real    6m6.135s   (to run this indexer and write out eight index chunks)
        user    2m41.040s
        sys     0m5.520s

        real    1m23.873s  (to run the merge process on the chunks --- another C file)
        user    0m18.900s
        sys     0m5.360s

        real    1m57.151s  (to run the Python program to create headwords for the 
merged index)
        user    1m19.700s
        sys     0m4.030s

        real    9m27.427s  (total)
        user    4m19.650s
        sys     0m14.910s

This read in 989 megabytes of mailbox and wrote out 491 megabytes of
index, with 502 megabytes of temporary files in the middle that got
both written and read, for a total of 2484 megabytes of disk I/O ---
60 seconds or so of solid disk I/O.


 */
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <unistd.h>
#include <ctype.h>
#include <string.h>

  /* what's the longest word we'll bother to index?  Mostly for
   * efficiency. */
#define MAX_WORD_LEN 20
  /* How much mail do we index at a time, in bytes?  Normally the
   * index in RAM is about half this size. Note that we index slightly
   * *more* than this amount, not slightly *less*.  */
#define INDEX_CHUNK_SIZE (32 * 1024 * 1024)
  /* How big of an area of address space do we work in?  Needs to be
   * smaller than physical memory + swap, and also smaller than
   * virtual_memory - mailbox_size, but large enough to contain the
   * largest index chunk safely. */
#define ARENA_SIZE (512 * 1024 * 1024)

  /* for panacea */
/* #define INDEX_CHUNK_SIZE (128 * 1024 * 1024) */
/* #define ARENA_SIZE (256 * 1024 * 1024) */

#define min(a, b) ((a) < (b) ? (a) : (b))
#define align(val, size) (((int)(val) + ((size) - 1)) & ~((size)-1))


/* file stuff */

int open_or_die(char *fname, int *length) {
  int rv = open(fname, O_RDONLY);
  struct stat st;
  if (rv < 0) {
    perror(fname);
    exit(1);
  }
  fstat(rv, &st);
  *length = st.st_size;
  return rv;
}

/* word indexing stuff */

typedef struct _posting {
  struct _posting *next;
  int doc_id;
} posting;

typedef struct _wordentry {
  struct _wordentry *next;
  int wordlen;
  int last_doc_found;
  posting *postings;
  char word[0];
} wordentry;

char *arena = 0;
char *arena_ptr = 0;
void *allocate(size_t size) {
  char *rv = arena_ptr;
  arena_ptr += align(size, 4);
  return rv;
}

void add_posting_to_wordentry(wordentry *wep, int doc_id) {
  if (doc_id != wep->last_doc_found) {
    posting *new = allocate(sizeof(*new));
    new->doc_id = doc_id;
    new->next = wep->postings;
    wep->postings = new;
    wep->last_doc_found = doc_id;
  }
}

/* hot spot: takes 24% of program's run time */
void add_posting_to_wordentry_list(wordentry **list, char *word, int wordlen, 
                                   int doc_id) {
  wordentry **wepp;
  wordentry *new;

  for (wepp = list; *wepp; wepp = &((*wepp)->next)) {
    if ((*wepp)->wordlen == wordlen && !memcmp((*wepp)->word, word, wordlen)) 
      break;
  }

  new = *wepp;
  if (!new) {
    new = allocate(sizeof(*new)+wordlen);
    memcpy(new->word, word, wordlen);
    new->wordlen = wordlen;
    new->last_doc_found = -1; /* invalid doc_id */
    new->postings = NULL;
  }

  add_posting_to_wordentry(new, doc_id);
  
  /* delete from old position in list */
  if (*wepp) *wepp = (*wepp)->next;
  /* move to front */
  new->next = *list;
  *list = new;
}

void dump_wordentry_list(wordentry *list, FILE *out) {
  posting *p;
  for (; list; list = list->next) {
    fwrite(list->word, list->wordlen, 1, out);
    fprintf(out, ": ");
    for (p = list->postings; p; p = p->next) {
      fprintf(out, "%d", p->doc_id);
      if (p->next) fprintf(out, " ");
    }
    fprintf(out, "\n");
  }
}

void test_word_indexing(void) {
  wordentry *list = NULL;
  char *words[] = { "fuzzy", "bunny", "bin", "packing", "bunny", "bunny", "bin", 0 };
  char **wp;
  for (wp = words; *wp; wp++) {
    add_posting_to_wordentry_list(&list, *wp, strlen(*wp), wp-words);
    add_posting_to_wordentry_list(&list, *wp, strlen(*wp), wp-words);
  }
  dump_wordentry_list(list, stdout);
}

/* hash table */

/* Since we have move-to-front, this size is only about 10%-15% better
 * than 1021 for a sample corpus with 14 million words, 112K distinct
 * words, and 6M or so postings.  */
#define HASHSIZE 32749
wordentry *index_hash[HASHSIZE];

/* thanks P. J. Weinberger and Michael Neumann. ick, this takes 12% of
   the program's run time, at 120 ns per call (according to gprof). */
int hash_word(char *s, int len) {
  int i;
  unsigned h = 0;
  for (i = len; i;) {
    i--;
    h = ((h << 4) ^ ((h&0xf0000000) >> 24)) + s[i];
  }
  return h % HASHSIZE;
}

/* dumb approach, used to be 10% of whole indexing process, unless my
 * profiler lies, which it might.  Dead code; see below for current
 * version.  */
char *memstr(char *haystack, char *needle, int haystack_size) {
  char *t;
  char *lim = haystack + haystack_size - strlen(needle);
  int i;
  for (t = haystack; t < lim; t++) {
    for (i = 0; ; i++) {
      if (needle[i] == '\0') return t;
      if (needle[i] != t[i]) break;
    }
  }
  return NULL;
}

/* really "smart" approach.  Boyer-Moore string search for "\nFrom ",
 * hand-compiled to C.  About twice as fast as "dumb approach" above.
 * This makes the whole program about 5% faster, according to
 * gprof.  Maybe that wasn't so smart after all. */
char *findfrom(char *haystack, int haystack_size) {
  char *s;
  char *t = s = haystack + 5;
  for (;;) {
    if (*t != ' ') goto advance; t--;
    if (*t != 'm') goto advance; t--;
    if (*t != 'o') goto advance; t--;
    if (*t != 'r') goto advance; t--;
    if (*t != 'F') goto advance; t--;
    if (*t == '\n') return t;
  advance:
    /* we know there's no match ending at or before 's', and the
     * characters after 't' look plausible.  Given this particular search
     * string, there are two possibilities: either t<s, so the next
     * possible match starts at s+1 and runs to s+6, or (the usual case)
     * t==s, and we need to be alert to the opportunity that we're in the
     * middle of a match now.
     */
    /* post switch: 12.12, 12.28, 12.24.
     * pre switch (if/elsif/else): 12.24, 12.33, 12.25.  No noticeable
     * speed difference, but this is clearer. */
    if (t==s) {
      switch(*t) {
      case 'm': s += 1; break;
      case 'o': s += 2; break;
      case 'r': s += 3; break;
      case 'F': s += 4; break;
      case '\n': s += 5; break;
      default: s += 6; break;
      }
    }
    else s += 6;
    t = s;
    if (t > haystack + haystack_size) return 0;
  }
}

/* a substitute for isalnum() and tolower(); using this for isalnum()
 * reduced user CPU time from 13.6 to 12.3 user seconds on the old
 * 50MB test data set, and using it for tolower() reduced total CPU
 * time from 11.3 to 10.77 seconds on the new 50MB data set. */
char wordchar[] = {
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0"
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" /* control chars */
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" /* space through / */
  "0123"     "4567"     "89\0\0"   "\0\0\0\0" /* digits, :;<=>? */

  "\0abc"    "defg"     "hijk"     "lmno"     /* capitals */
  "pqrs"     "tuvw"     "xyz\0"    "\0\0\0\0"
  "\0abc"    "defg"     "hijk"     "lmno"     /* lower case */
  "pqrs"     "tuvw"     "xyz\0"    "\0\0\0\0"

  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0"
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" 
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0"
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" 

  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0"
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" 
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0"
  "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" 
};

/* this is the offset of the message start from the beginning of the mailbox */
int current_doc_id;

/* hot spot: takes up 18% of program's CPU time */
void find_words(char *s, int len) {
  char *t = s;
  static char word[MAX_WORD_LEN];
  int i;

  for (;;) {
  skip_nonword_characters:
    for (;;) {
      if (t >= s + len) return;
      if (wordchar[(unsigned char)*t]) break;
      t++;
    }

    /* scan word */
    i = 0;
    while (t < s + len && wordchar[(unsigned char)*t]) {
      if (i == MAX_WORD_LEN) {
        while (t < s + len && wordchar[(unsigned char)*t]) t++;
        goto skip_nonword_characters;
      } 
      word[i] = wordchar[(unsigned char)*t];
      i++;
      t++;
    }

    add_posting_to_wordentry_list(index_hash + hash_word(word, i),
                                  word, i, current_doc_id);
  }
}

/* index some of a mailbox */
void index_by_mail_messages(char *s, int len) {
  char *new_msg_start;
  char *where_to_stop = min(s + current_doc_id + INDEX_CHUNK_SIZE, s + len);

  for (;;) {
    new_msg_start = findfrom(s + current_doc_id, len - current_doc_id);
    if (!new_msg_start) new_msg_start = s + len;

    find_words(s + current_doc_id, new_msg_start - s - current_doc_id);
    current_doc_id = new_msg_start - s + 1;

    if (new_msg_start >= where_to_stop) break;
  }
}

/* sorting the output by term */

void push(wordentry **stack, wordentry *item) {
  item->next = *stack;
  *stack = item;
}

wordentry *pop(wordentry **stack) {
  wordentry *rv = *stack;
  *stack = rv->next;
  /* should probably rv->next = NULL; */
  return rv;
}

wordentry *nconc(wordentry *shortlist, wordentry *longlist) {
  wordentry **join;
  if (!shortlist) return longlist;
  join = &(shortlist->next);
  while (*join) join = &((*join)->next);
  *join = longlist;
  return shortlist;
}

wordentry *all_entries() {
  int i;
  wordentry *rv = NULL;
  for (i = 0; i < HASHSIZE; i++) {
    rv = nconc(index_hash[i], rv);
    index_hash[i] = 0;
  }
  return rv;
}

/* sort linked lists by moving entries to different lists.  No
 * robustness against quicksort's O(N^2) worst case, because the input
 * data is sorted by hash value, then by recentness of occurrence, so
 * is very unlikely to be nearly sorted already.  Hotspot; 14% of run
 * time is here. */
wordentry *quicksort_wordentries(wordentry *list) {
  wordentry *first, *before = NULL, *after = NULL, *wep;
  int cmp;
  if (!list) return list;
  first = pop(&list);
  while (list) {
    wep = pop(&list);
    cmp = memcmp(wep->word, first->word, min(wep->wordlen, first->wordlen));
    if (!cmp && wep->wordlen < first->wordlen) cmp = -1;
    push((cmp < 0 ? &before : &after), wep);
  }
  first->next = quicksort_wordentries(after);
  return nconc(quicksort_wordentries(before), first);
}

/* forget everything in preparation for indexing the next mailbox segment */
void reset() {
  memset(index_hash, '\0', sizeof(index_hash));
  arena_ptr = arena;  /* deallocate all memory */
}

/* compute the filename for a partial index */
char *part_filename(char *base, int filenum) {
  char *template = "%s.idx/partindex%04d";  /* zero-pad for sortability */
  static char *buf = NULL;
  if (!buf) buf = malloc(strlen(base) + strlen(template) + 10);  /* 10 should be 
enough! */
  if (!buf) abort();
  sprintf(buf, template, base, filenum);
  return buf;
}

/* main */

int main(int argc, char **argv) {
  int length;
  int fh = open_or_die(argv[1], &length);
  char *mm = mmap(0, length, PROT_READ, MAP_SHARED, fh, 0);

  if (mm == MAP_FAILED) {
    perror("mmap");
    return 2;
  }

  arena_ptr = arena = malloc(ARENA_SIZE);

  if (!arena) {
    perror("malloc");
    return 3;
  }

  current_doc_id = 0;
  int filenum = 0;
  while (current_doc_id < length) {
    index_by_mail_messages(mm, length);

    char *indexname = part_filename(argv[1], filenum);
    FILE *out = fopen(indexname, "w");
    if (!out) {
      perror(indexname);
      return 1;
    }
    dump_wordentry_list(quicksort_wordentries(all_entries()), out);                    
 
    reset();
    filenum++;
  }
  munmap(mm, length);
  return 0;
}
/* end maildex-parts.c */


So here's mergedex.c:
/*
 * Merge maildex index files in C: mergedex.c
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>

/* 
 * BUG HISTORY
 *
 * This was the first time I'd written anything in C in a while, and
 * apparently I'd forgotten a lot about how to be careful.  I was sort
 * of coding in a rather happy-go-lucky fashion, without tests or a
 * lot of mental double-checking, since this is sort of throwaway
 * code.  As you might expect, this resulted in a lot of bugs.
 *
 * 1. forgot to include a check for end of input file in readline() (fixed)
 * 2. accidentally tripped newly inserted check for end of input file unless
 *    initial buffer size was too small (introduced by fix for #1, fixed)
 * 3. stopped noticing when I'd read a full line, due to fix for #2 (fixed badly)
 * (Since I was clearly just banging on shit rather than actually
 * debugging at this point, I slept.)
 * 4. pop_from_bucket popped too much from the bucket because I was confused 
 *    about whether I had a pointer to the last item I wanted to return or a 
 *    pointer to the item after it. (fixed.)
 * 5. pop_from_bucket crashed in the case where it was returning only one line,
 *    because it was trying to set (*last)->next to NULL, but *last
 *    had changed to point to something else since it had decided that
 *    was the thing to do. (fixed)
 * 6. Trying to simplify the fix for #3, I tried having readline() look 
 *    directly at strlen(rv->s) instead of current_length, rather than
 *    setting current_length to strlen(rv->s) just before exiting the
 *    loop.  But the loop has two exits, one of which often leaves
 *    rv->s uninitialized.
 */

/*
 * PERFORMANCE
 * 
 * My benchmark is a set of 15 partial index files I have lying
 * around, totaling almost 1.6 million lines and 98 megabytes, which
 * produce an output file of about a million lines and 87 megabytes.
 * This takes longer than I'd like:
 * 
 * [EMAIL PROTECTED]:~/devel$ time ./mergedex ~/sdc1/mail/tmp.mboxtail.idx/newindex* > 
tmp.newidx 
 *
 * real 0m37.771s
 * user 0m9.740s
 * sys  0m3.170s
 * 
 * This large discrepancy between CPU and wallclock time on an
 * otherwise idle machine suggests that the program is I/O-bound,
 * which is OK, except that it's only succeeding in writing output at
 * about 2 megabytes per (wallclock) second.  hdparm -t says my disk
 * can read at almost 10 megabytes per second, so presumably it should
 * be able to write roughly as fast.  Even if I direct output to
 * /dev/null, it still takes 27 wallclock seconds (which suggests,
 * indeed, that the data is being written at nearly ten megabytes per
 * second.)
 * 
 * I concluded that probably it was doing too many seeks per second,
 * so I tried increasing input buffer sizes to compensate:
 *    setvbuf(ic->infiles[i], realloc_or_die(NULL, 512 * 1024), _IOFBF, 512 * 1024);
 * But that didn't help, although strace showed that it did result in
 * input file read system calls happening in half-mebibyte chunks
 * rather than tiny ones.
 *
 * Catting the same files to /dev/null, which involves reading the
 * same amount of data, takes only 13 seconds.  What explains this
 * 14-second difference?  It's approximately equal to the amount of
 * CPU time involved, but the CPU time shouldn't make a difference
 * with DMA!
 *
 * So I don't know what's wrong, but I think the program should run
 * twice as fast as it does, independent of its CPU time.
 *
 * I profiled a mostly-unoptimized version of it and got these results:
 *  %   cumulative   self              self     total           
 * time   seconds   seconds    calls  us/call  us/call  name    
 * 54.71      2.02     2.02  1593782     1.27     1.27  readline
 * 28.44      3.08     1.05 10078850     0.10     0.10  key_lt
 *  8.13      3.38     0.30  1593767     0.19     0.75  add_to_bucket
 *  4.06      3.53     0.15   940530     0.16     3.90  output_line
 *  2.17      3.61     0.08  1593782     0.05     2.07  read_numbered_infile
 *  1.63      3.67     0.06   940530     0.06     0.24  pop_from_bucket
 *  1.08      3.71     0.04                             main
 *  0.00      3.71     0.00   940531     0.00     0.00  all_exhausted
 *  0.00      3.71     0.00        1     0.00    30.98  open_infiles
 *
 * This suggests that a better "bucket" data structure could reduce
 * the CPU usage by up to a third.
 *
 */

/**********************************************************************/
/* multiple-input-file merging stuff. */

/* Our approach is that we have N input files, and we maintain the
 * next input line from each input file in a "bucket".  From time to
 * time we pull the lexically first input line out of the "bucket" and
 * read the next line from that file into the bucket.  Since our
 * desired output is one output line for each group of input lines
 * with the same key, we have to hang on to those input lines until
 * we've seen all of them. */

/*
 * input_line represents a line that has been read from some input
 * file. 
 */

typedef struct input_line {
  /* keylen contains the number of characters in the key. */
  int keylen;
  /* next is used to chain together input lines in a linked list. */
  struct input_line *next;
  /* filenum specifies which file this line came from, so we can read in
   *   another line from that file when we pull this one out of the
   *   bucket. */
  int filenum;
  /* s is a NUL-terminated string, which I'm not fond of, but fgets's
   *   interface doesn't let me make the program 8-bit clean anyway,
   *   so I might as well take advantage of that bug to simplify the
   *   program.  Doesn't contain the newline. */
  char s[0];
} input_line;

/* look Ma, a whole dynamic allocation library in one function! */
inline void *realloc_or_die(void *s, size_t newsize) {
  s = realloc(s, newsize);
  if (!s) {
    fprintf(stderr, "Couldn't realloc(%d)\n", sizeof(input_line) + newsize);
    abort();
  }
  return s;
}

/* create an input_line from the next line from a given file.  Leaves
 * filenum uninitialized! */
input_line *readline(FILE *infile) {
  /* just use stdio for input files; otherwise we'd have to roll our
   * own buffering, ick. */
  int current_length = 0;
  int new_length = 28;  /* a guess at a usually-sufficient value */
  char *nl;
  /* since we don't need to initialize any of the fields, we can let
   * the realloc in the loop do the initial allocation */
  input_line *rv = NULL;
  for (;;) {
    rv = realloc_or_die(rv, sizeof(input_line) + new_length);
    if (!fgets(rv->s + current_length, new_length - current_length, infile)) 
      break;  /* EOF or error */
    if ((nl = strchr(rv->s + current_length, '\n'))) {
      *nl = '\0';
      current_length = strlen(rv->s);
      break;
    }
    current_length = strlen(rv->s);
    new_length = current_length * 2;
  }
  if (!current_length) {
    free(rv);
    return NULL;
  }
  rv->keylen = strchr(rv->s, ':') - rv->s;
  rv->next = NULL;
  return rv;
}

inline char keychar(input_line *line, int offset) {
  if (offset < line->keylen) return line->s[offset];
  return '\0';
}

/* return true if a's key is less than b's key */
inline int key_lt(input_line *a, input_line *b) {
  int minlen = a->keylen;
  if (b->keylen < minlen) minlen = b->keylen;
  for (int i = 0; i < minlen; i++) {
    if (a->s[i] != b->s[i]) return a->s[i] < b->s[i];
  }
  return a->keylen < b->keylen;
}

/* Our bucket at present is just a sorted linked list!  We even abuse
 * the 'next' link to build it.  Merging 15 indices totaling about 87
 * megabytes, which you'd expect would involve 8 or so comparisons per
 * insertion, results in 1.5 million lines of input, almost a million
 * lines of output, and ten million comparisons --- pretty close to
 * the 8.  These comparisons actually consume a substantial fraction
 * of the program's CPU time, but as mentioned in the comment about
 * PERFORMANCE, I think reducing its CPU time wouldn't actually speed
 * it up on my machine. */

inline int bucket_lt(input_line **bucket, input_line *line) {
  /* we compare filenums so the last file comes first, in order to
   * exactly reproduce the behavior of the old code */
  int rv = key_lt(*bucket, line);
  if (rv) return rv;
  rv = key_lt(line, *bucket);
  if (rv) return !rv;
  return (*bucket)->filenum > line->filenum;
}

/* add a new input line to the bucket. */
void add_to_bucket(input_line **bucket, input_line *line) {
  while (*bucket && bucket_lt(bucket, line)) bucket = &((*bucket)->next);
  line->next = *bucket;
  *bucket = line;
}

/* remove the lexically-first key equivalence class from the bucket
 * and return it, all linked together nicely */
input_line *pop_from_bucket(input_line **bucket) {
  assert(*bucket);
  input_line *rv = *bucket;
  input_line **last = bucket;
  /* find equivalence class */
  while ((*last)->next && !key_lt(*last, (*last)->next)) 
    last = &((*last)->next);
  input_line **end = &((*last)->next);

  /* remove it from bucket */
  *bucket = *end;
  /* unlink it from the rest of the bucket */
  *end = NULL;
  return rv;
}

typedef struct {
  FILE **infiles;
  input_line *bucket;
} infile_collection;

void read_numbered_infile(infile_collection *infiles, int n) {
  input_line *new_line = readline(infiles->infiles[n]);
  if (!new_line) return;
  new_line->filenum = n;
  add_to_bucket(&(infiles->bucket), new_line);
}

void output_line(infile_collection *infiles, FILE *output) {
  input_line *lines = pop_from_bucket(&(infiles->bucket));
  fputs(lines->s, output);
  for (;;) {
    read_numbered_infile(infiles, lines->filenum);
    input_line *next = lines->next;
    free(lines);
    lines = next;
    if (!lines) break;
    fprintf(output, " %s", lines->s + lines->keylen + 2);  /* 2: ": " */
  }
  fprintf(output, "\n");
}

int all_exhausted(infile_collection *ic) {
  return !ic->bucket;
}

infile_collection *open_infiles(int n, char **names) {
  infile_collection *ic = realloc_or_die(NULL, sizeof(*ic));
  ic->bucket = NULL;
  ic->infiles = realloc_or_die(NULL, sizeof(ic->infiles[0]) * n);
  for (int i = 0; i < n; i++) {
    ic->infiles[i] = fopen(names[i], "r");
    if (!ic->infiles[i]) {
      fprintf(stderr, "fopen: ");
      perror(names[i]);
      exit(1);
    }
    read_numbered_infile(ic, i);
  }
  return ic;
}

int main(int argc, char **argv) {
  int n = argc - 1;
  if (!n) {
    fprintf(stderr, "Usage: %s file1 [file2 [file3 ...]] > outfile\n", argv[0]);
    return 1;
  }
  infile_collection *ic = open_infiles(n, argv + 1);
  while (!all_exhausted(ic)) {
    output_line(ic, stdout);
  }
  return 0;
}

/* end mergedex.c */



The Python program follows:

#!/usr/bin/env python
"""maildex.py: Index my mailbox.

This utility generates a term index of an mbox file and allows you to
search it.  If you run it with just the mbox file as an argument, it
assumes you want to reindex it; if you run it with the mbox file and
some other terms, it does a search.  Like Google, it's default-and,
and you can precede a term with a - to exclude messages containing it.

Requires recent Python, probably 2.3.  Won't work in 2.1.3.

I have only 4.5M postings out of 7K messages in 50MB, so I can make a
list of all postings in RAM.  It takes about a minute or two and
another 50MB of RAM.  Writing it to disk with Berkeley DB used to take
another 10 minutes or so!  The index files are about 27MB (down from
45MB with Berkeley DB), but gzip to <11MB.

Unimplemented feature: handle mbox files bigger than the address
space.

"""
# - Python space costs:
#   Looks like a dict is about 154 bytes, an empty list about 48,
#   and a one-item list about 96 bytes.  There are about 340K terms
#   if we use the naive [a-zA-Z0-9]+ term definition, so about 340K *
#   96 + 4M * 4 bytes = ~48MB.

# "Well, I've read about inverted indices and stuff, but I've never
# built a full-text search engine." - me on Thursday
# "Dude, it's, like, cake." - DMH

import mmap, re, sys, os, merge, time

def sorted(alist):
    "Returns a list after sorting it."
    alist.sort()
    return alist

class lindb:
    """Linear database.

    I was storing data in a Berkeley DB file, but it was really slow ---
    it took 10 minutes to write out 45 megabytes of data with 340K keys.
    So I thought I'd take the Lucene approach; just write a sorted list
    of key-value pairs, write a "headwords" file to make it easy to find
    the right part of the file, then linearly search.

    This module lets it take much less time, by just sorting the data
    and storing it in a text file.  It's very slightly slower than the
    Berkeley DB version, but uses about half the space (for my data.)

    The big compromises are that this module doesn't allow the same
    flexibility in access as Berkeley DB, and it doesn't allow the storage
    of arbitrary data --- data with newlines or ": " in it can screw
    things up.

    The basic operations:
    - db = lindb(filename) --- open database in 'filename', creating if nec.
    - db.set(somedict) --- set contents of database to contents of 'somedict'
    - db[key] --- return value for 'key' or raise KeyError
    - db.get(key, default) --- return value for 'key' or default

    """

    ### basic primitives

    # I tried various powers of 2 here: 131072, 32768, 1024, 4096, and
    # now 8192; the smaller it was, the better the performance was,
    # but the difference between 4096 and 1024 wasn't significant.

    threshold_size = 4096

    def open(self, filename, mode='r'):
        return file("%s/%s" % (self.dirname, filename), mode)

    def readline(self, f):
        """Read the next key-value pair from open file f.

        Returns (key, value) on success, or (None, None) on EOF.

        """
        line = f.readline()
        if not line: return None, None
        # XXX no robustness against incomplete writes
        return line[:-1].split(': ', 2)

    def writeline(self, fh, key, value):
        fh.write("%s: %s\n" % (key, value))

    ### internal routines

    def headwords(self, filename):
        """Generate a headwords file for 'filename'.

        The headwords file lists a few words from the file, along with
        the positions of their entries in the file.  This allows the
        lookup process to find a particular entry relatively quickly,
        while retaining high locality of access.

        """
        f = self.open(filename)
        hwdict = {}
        blockno = None
        while 1:
            pos = f.tell()
            name, value = self.readline(f)
            if name is None: break
            nblockno = pos // self.threshold_size
            if nblockno != blockno:
                hwdict[name] = pos
                blockno = nblockno
        self.writefile_with_headwords(filename + '.hw', hwdict)

    def writefile_with_headwords(self, filename, contents):
        size = self.writefile(filename, contents)
        if size > self.threshold_size: self.headwords(filename)

    def writefile(self, filename, contents):
        """Write file named 'filename' with contents of dict 'contents'.

        If necessary, this writes a headwords file for 'filename'.

        """
        # obviously this technique won't work at all for stuff that doesn't
        # fit in RAM
        f = self.open(filename, 'w')
        for key in sorted(contents.keys()):
            self.writeline(f, key, contents[key])
        size = f.tell()
        f.close()
        try: os.unlink('%s/%s.hw' % (self.dirname, filename))
        except OSError: pass
        return size
        

    def lookup(self, filename, term):
        """Return greatest (key, value) pair not greater than 'term'.

        This returns the key-value pair where the key is term if there
        is one, otherwise the one before where 'term' would be.

        Uses headwords files if they exist to speed up access.

        """
        start = 0
        if os.path.exists('%s/%s.hw' % (self.dirname, filename)):
            name, value = self.lookup(filename + '.hw', term)
            if name is not None:
                assert type(name) is type(term)
                assert name <= term
                start = int(value)

        f = self.open(filename)
        f.seek(start, 0)

        name, value = None, None
        while 1:
            nname, nvalue = self.readline(f)
            if nname is None or nname > term: return name, value
            name, value = nname, nvalue

    def readlines(self, filename):
        """Yield each name-value pair from the specified file.

        Note that this interface is inconsistent with readline, which
        takes a file, not a filename.

        """
        f = self.open(filename)
        while 1:
            n, v = self.readline(f)
            if n is None: return
            yield n, v

    def merge_indices(self, outfilename, filenames):
        outfile = self.open(outfilename, 'w')
        files = [self.readlines(filename) for filename in filenames]
        lastkey, values = None, []
        for (key, value), filenum in merge.merge(files):
            if key != lastkey:
                if lastkey is not None:
                    values.reverse()
                    self.writeline(outfile, lastkey, ' '.join(values))
                lastkey, values = key, []
            values.extend(value.split())
        if lastkey is not None:
            values.reverse()
            self.writeline(outfile, lastkey, ' '.join(values))
        size = outfile.tell()
        outfile.close()
        if size > self.threshold_size: self.headwords(outfilename)

    # XXX maybe mmap?

    ### external interfaces

    def __init__(self, dirname):
        self.dirname = dirname
        if not os.path.exists(dirname): os.mkdir(dirname)
    def __getitem__(self, name):
        lname, lvalue = self.lookup('contents', name)
        if lname == name: return lvalue
        raise KeyError, name
    def get(self, name, value=None):
        try: return self[name]
        except KeyError: return value
    def set(self, contents):
        self.writefile_with_headwords('contents', contents)

def ok(a, b):
    "Regression test function."
    assert a == b, (a, b)

def test_lindb():
    "Very basic regression test for linear db class lindb."
    os.system('rm -rf tmp.db')
    x = lindb("tmp.db")
    x.set({'roar': 'lion'})
    ok(x['roar'], 'lion')
    ok(x.get('roar', 'boo'), 'lion')
    ok(x.get('foar', 'boo'), 'boo')
    os.system('rm -rf tmp.db')

#test_lindb()


def mkindex(mm, start, maxpostings):
    """Create an in-memory index of part of an mbox file.

    This function takes a string-like object, such as an mmap object,
    and returns a list of byte offsets in it where mail messages
    start, and a postings dictionary that maps words to lists of
    message starting byte offsets.

    """
    poses = [start]
    wordpat = re.compile("[a-zA-Z0-9]+")
    fi = wordpat.finditer(mm, start)
    allpostings = {}
    totalpostings = 0
    while 1:
        # This won't match the beginning of the first message.
        nps = mm.find("\nFrom ", poses[-1])
        if nps == -1: nps = len(mm)
        nps += 1
        this_guys_postings = {}
        for mo in fi:
            if mo.start() >= nps:
                # I wish I could push back the item we just got, because
                # it belongs to the next message and won't get indexed,
                # but it's the "From" at the beginning of the message.
                # So it doesn't really matter.
                break
            this_guys_postings[mo.group(0).lower()] = 1
        for word in this_guys_postings.iterkeys():
            if not allpostings.has_key(word): allpostings[word] = []
            allpostings[word].append(poses[-1])
        totalpostings += len(this_guys_postings)
        if nps > len(mm) or totalpostings >= maxpostings: break
        poses.append(nps)
        if len(poses) % 250 == 0: print "%d msgs" % len(poses)
    print "indexed %d total postings" % totalpostings
    return poses, allpostings, nps

class index:
    """Index of an mbox.

    Stores a lindb under mboxname.idx which contains posting lists for
    the mbox's messages, and allows you to search it.

    """
    def postings(self, term):
        "Returns the posting list for a term."
        return [int(id) for id in self.db.get(term, '').split()]

    def msg(self, pos):
        """Returns the message at a particular byte offset in the mbox.

        The items in posting lists are just such byte offsets.

        """
        npos = self.mm.find("\nFrom ", pos + 1)
        if npos == -1: npos = self.size
        rv = self.mm[pos:npos]
        assert "\nFrom " not in rv
        return rv

    def __init__(self, fname):
        "init.  fname is the path to the mbox."
        self.f = file(fname)
        self.f.seek(0, 2) # EOF
        self.size = self.f.tell()
        self.mm = mmap.mmap(self.f.fileno(), self.size,
                            access=mmap.ACCESS_READ)
        self.db = lindb(fname + '.idx')

    def write(self):
        "Reindex an mbox."
        print "starting at", time.time()
        pos = 0
        filenames = []
        while pos < len(self.mm):
            # 1/2 million fit in RAM easily
            poses, allpostings, pos = mkindex(self.mm, pos, 500 * 1000) 
            print "indexed to %d;" % pos
            for term in allpostings.keys():
                allpostings[term] = ' '.join([str(x) for x in allpostings[term]])
            filename = 'newindex%d' % len(filenames)
            self.db.writefile(filename, allpostings)
            filenames.append(filename)
            print "wrote index;"
        print "merging indices at", time.time()
        self.db.merge_indices('contents', filenames)
        print "done at", time.time()

    def search(self, terms, exclusions=[]):
        """Returns a posting list for some search.

        'terms' is a list of terms to require the presence of;
        'exclusions' is an optional list of terms to require the
        absence of.

        """
        lists = [self.postings(term) for term in terms]
        excludelists = [self.postings(term) for term in exclusions]

        # intersect lists.
        # sort by length.  ii is in the tuples to prevent comparing the lists
        # themselves.
        sorted_lists = sorted([(len(lists[ii]), ii, lists[ii])
                               for ii in range(len(lists))])
        # start with smallest 
        rvdict = dict([(key, 1) for key in sorted_lists[0][2]])

        # it might be better to, I don't know, do this last?
        for list in excludelists:
            for message in list:
                if rvdict.has_key(message): del rvdict[message]

        # now remove items not in all the other lists
        for _, _, list in sorted_lists[1:]:
            newdict = {}
            for message in list:
                if rvdict.has_key(message): newdict[message] = 1
            rvdict = newdict

        rv = sorted(rvdict.keys())
        rv.reverse()
        return rv
    
    def cmdlinesearch(self, terms):
        """Parses a search, runs it, and returns a posting list.

        You have to break it up into terms first, as the Unix command
        line does for you, but then this parses it to see what has a
        '-' before it and what doesn't.

        """
        plusterms = []
        minusterms = []
        for term in terms:
            if term.startswith('-'): minusterms.append(term[1:].lower())
            else: plusterms.append(term.lower())
        return self.search(plusterms, minusterms)
            
    
def main(argv, stdout):
    """Command-line search/reindex interface."""
    if len(argv) > 2:
        idx = index(argv[1])
        for pos in idx.cmdlinesearch(argv[2:]):
            stdout.write(idx.msg(pos))
    elif len(argv) == 2:
        index(argv[1]).write()
    else:
        print ("Usage:\n\t%s mailboxfilename -- to reindex\n"
          "\t%s mailboxfilename term1 term2 -unwanted1 -unwanted2 -- to look"
               % (argv[0], argv[0]))

if __name__ == '__main__': main(sys.argv, sys.stdout)
# end maildex.py

And here's merge.py, which maildex.py imports:
#!/usr/bin/env python
'''Generic lazy merge algorithm.

There are lots of "merge" algorithms; generally they make one pass
over each of several sorted lists.  They work well in environments
where locality of reference is important, such as when your data is
much larger than your physical RAM.  Some of them work on arbitrary
numbers of lists of items of the same type, while others work on
specific numbers of lists of items of different types.

The thing that binds them all together is that they all traverse their
input lists in the same way.  This way is not, in general, trivial to
implement.  This module implements it so I won't have to implement it
again, at least not for small numbers of lists.

Most problems that have merge-algorithm solutions also have hashtable
solutions which run faster when you have sufficient random-access
storage to hold all your data.  Accordingly, merge algorithms have
fallen very much out of favor over the last thirty or forty years as
main memory sizes have increased from megabits to terabits.

Some examples of merge algorithms:
- produce a single sorted list from multiple sorted lists.  (This is
  the kind of merge used in mergesort.)
- produce the set union, set intersection, or set difference of two
  sorted lists.
- given a master file and an update file sorted in the same order,
  produce a new master file with all the updates applied.

It turns out that you can implement all the other algorithms on top of
the first one, as long as you remember which input list each output
item came from.

My particular problem is that I have inverted index files rather too
large for my RAM, so I have to index my corpus in chunks, then merge
the individual chunks to form a master index.

'''

def merge2(seq1, seq2):
    """Generator for the merged output of two sorted sequences."""
    it1, it2 = iter(seq1), iter(seq2)
    try: thing1 = it1.next()
    except StopIteration:
        for item in it2: yield item
        return
    try: thing2 = it2.next()
    except StopIteration:
        yield thing1
        for item in it1: yield item
        return
    while 1:
        if thing1 > thing2:
            it1, it2, thing1, thing2 = it2, it1, thing2, thing1
        yield thing1
        try: thing1 = it1.next()
        except StopIteration:
            yield thing2
            for item in it2: yield item
            return

def taggediter(iter, tag):
    """Generator that tags each item in a sequence with its source."""
    for item in iter: yield (item, tag)

def _mergen(seqs):
    """Generate the merged output of N sorted sequences, N >= 0.

    Creates a heap of merge2 iterators that do the actual work, then
    returns the root of that heap.
    """
    if len(seqs) == 0: return []
    if len(seqs) == 1: return seqs[0]
    elif len(seqs) == 2: return merge2(seqs[0], seqs[1])
    else:
        part = len(seqs)/2  # 1 -> 0, 2 -> 1, 3 -> 1, 4 -> 2, 5 -> 2
        return merge2(_mergen(seqs[:part]), _mergen(seqs[part:]))

def merge(iters):
    """Generate the tagged, merged output of N sorted sequences, N >= 0.

    Items from the first sequence will be returned as (item, 0), items
    from the second as (item, 1), etc.

    """
    return _mergen([taggediter(iters[i], i) for i in range(len(iters))])

def ok(a, b): assert a==b, (a,b)
def test():
    ok(list(merge([])), [])
    ok(list(merge([['a', 'b', 'c']])), [('a', 0), ('b', 0), ('c', 0)])
    ok(list(merge([[1], []])), [(1, 0)])
    ok(list(merge([[], [1]])), [(1, 1)])
    ok(list(merge([[1], [2]])), [(1, 0), (2, 1)])
    ok(list(merge([[2], [1]])), [(1, 1), (2, 0)])
    
    ok(list(merge([[1, 2, 3], [1.5, 3.25, 3.5]])),
       [(1, 0), (1.5, 1), (2, 0), (3, 0), (3.25, 1), (3.5, 1)])
    ok(list(merge([[1], [5], [4], [6]])),
       [(1, 0), (4, 2), (5, 1), (6, 3)])

test()
# end merge.py

Reply via email to