The good news is that on an Athlon 850 with half a gigabyte of RAM, I can index my 989-megabyte mailbox in 491 megabytes and 9 minutes, 27 seconds, which is relatively fast as full-text indexing goes, and its working set is customizable and pretty small (set to 64MB for this test). The bad news is:
- all the disk I/O I need to do during this process adds up to only about 60 seconds' worth of disk I/O (hdparm -t says 37 megabytes per second), so it's not limited by the disk interface bandwidth; - it doesn't run reliably because it's trying to mmap a file that's nearly a gigabyte (I think that's why?) - out of those 9:27, only 4:35 was CPU time, and I'm not sure what happened to the rest of it --- was it spent waiting for disk seeks or running other user processes or what? - it's way too much code, partly because the searcher is still in Python, and is used to create the headwords files, but contains a lot of dead code too - it's written in C99, so you need a recent compiler to compile it (although it might compile OK as C++ --- I haven't tried) - it's kludgy Like everything posted to kragen-hacks without an explicit notice to the contrary, I relinquish copyright on this work and place it in the public domain. Here's the "reindexmail" shell script: #!/bin/sh time rm tmp.mboxtail.idx/* time ~/devel/maildex-parts tmp.mboxtail time ~/devel/mergedex tmp.mboxtail.idx/partindex* > tmp.mboxtail.idx/contents time PYTHONPATH=~/devel python -c '__import__("maildex").lindb("tmp.mboxtail.idx").headwords("contents")' This invokes three other programs: maildex-parts, mergedex, and maildex.py. maildex.py imports merge.py, although it doesn't use it unless it's doing the merging itself. Which it isn't, here. maildex-parts and mergedex are in C. Here's maildex-parts.c: /* maildex-parts.c: find words, index by mail message byte offset This program computes an inverted index for a mailbox file, in the same format produced by maildex.py, which I posted to kragen-hacks in March 2004. But it does it much faster. I'm compiling with this command line with gcc 3.3.3: cc -Wall -ftracer -O3 -std=c99 maildex-parts.c -o maildex-parts Using -fprofile-arcs/-fbranch-probabilities might make it faster. Now this program should be able to index arbitrarily large mailboxes, up to nearly the size of virtual memory. TODO: It'd probably be good to automatically expand the allocated memory instead of allocating all of it up front. It's fine as long as you don't have more than 512MB of RAM though. (And as long as you have at least that much swap.) Detailed Notes on Optimizations ------------------------------- TODO: I still can't figure out why disk bandwidth * total I/O is so much less than total run-time for this program. This is the program's biggest performance problem right now. It spends 30%-75% of its time waiting for the disk for no good reason. TODO: Maybe using a trie instead of a hash table (since we have to produce sorted output anyhow) would improve things. TODO: to compare the words, it's using repz cmpsb, which is pretty cool as far as it goes, but don't all of our words start on 32-bit boundaries? Maybe we could zero out the remainder and compare them a 32-bit cell at a time. It's also uselessly checking for a zero wordlen. For a while, I thought it might be better to case-fold at output time, so I'd only have to downcase 3 million characters or so instead of 50 million or so. Then I changed the program so it fetches the lowercase versions of characters as a side effect of testing whether or not the character is a word character, so there's no extra cost. (If I changed to a faster way of distinguishing word from nonword characters, it might not have this side effect and I might have to reconsider. I haven't found one. I tried a switch statement, and gcc turned it into a series of conditional branches on the theory that it would be faster than an indirect jump. Maybe it would have been, but it was still a lot slower than indexing into the wordchar[] array. (gcc also didn't realize that it could perform common subexpression elimination on the call to the function containing the switch statement, so I did the common subexpression elimination myself by hand, but it was still slower.)) Much of the time and space this program spends on my mailbox, it spends indexing binary attachments. If I skipped the binary attachments it would work so much better and faster. The hash function I started with was Michael Neumann's version of Peter J. Weinberger's well-known hash function (usually known as "hashpjw".) Briefly, this function rotates the hash left by four bits each iteration, then adds in the current character. This version had a couple of strange features: if it discovered that the leftmost four bits, before rotation, were zero, it avoided merging them in at the low end, and it took pains to clear the upper four bits after the rotation. The first seemed like a misguided attempt at optimization to me, and the second just seemed like a waste of time. Simplifying the hash function saved 0.4 CPU seconds on 50 megabytes of input text. I also changed the hash function to count downward from the end of the word instead of up from the beginning, which seems to have saved about 0.1 CPU seconds on the same amount of input text, but that's about a 1% improvement --- within my measurement error. On Memory Usage, with Special Attention to the Worst Case --------------------------------------------------------- This program, at present, allocates memory only when it's adding postings to its postings hash. A posting is 8 bytes, and most of the time we only add one of those, or nothing, when we encounter a new posting. But some of the time we add a wordentry, which is presently 20 bytes. The program allocates a bunch of these structures, then it twiddles them around for a while to sort lists for output, and then it outputs stuff and frees it all. When I was using malloc, the malloc overhead on all these 8-byte objects was killing me. (mallocing 8 bytes 4 million times consumes 64 megs of RAM on my machine. So the best case for malloc overhead is one posting per word, with 16 bytes of malloc overhead and 28 bytes of user data. Ick.) Typical empirical numbers, back when I was using malloc, follow: 50_000_000 bytes of email 5560 messages 4_416_563 words in them (according to wc) 411_168 different terms 2_826_904 postings + search terms, thus 2_415_736 postings 48M peak memory use (+ 48MB mmapped file) 22 seconds run-time We'd expect 2_415_73616 + 411_16828 = 50.1 megabytes 22.6MB of this was malloc overhead. So now this program allocates a single large area to start with, and allocates stuff inside of it stack-wise by nudging a pointer. It does all the work inside there, and then deallocates it all at once when it moves on to the next mailbox segment. This halved the amount of dirty data. Once we excluded terms of 20 characters or more, storing our strings in our wordentry structure only cost us an extra 4-8 bytes, on average. We stored them at the end, with a count, obviating the char *word inside the structure. This allowed us to eliminate the actual document from our working set when we're sorting and writing out the wordlist, and made it possible to shrink the posting lists significantly by making things case-insensitive up front instead of, say, at output time. I just built this indexer and ran it on a 1GHz machine with a 40MB/sec disk on 989 megabytes of email, using the 128MB index-chunk-size mentioned below, and got these numbers: real 6m6.135s (to run this indexer and write out eight index chunks) user 2m41.040s sys 0m5.520s real 1m23.873s (to run the merge process on the chunks --- another C file) user 0m18.900s sys 0m5.360s real 1m57.151s (to run the Python program to create headwords for the merged index) user 1m19.700s sys 0m4.030s real 9m27.427s (total) user 4m19.650s sys 0m14.910s This read in 989 megabytes of mailbox and wrote out 491 megabytes of index, with 502 megabytes of temporary files in the middle that got both written and read, for a total of 2484 megabytes of disk I/O --- 60 seconds or so of solid disk I/O. */ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <sys/mman.h> #include <unistd.h> #include <ctype.h> #include <string.h> /* what's the longest word we'll bother to index? Mostly for * efficiency. */ #define MAX_WORD_LEN 20 /* How much mail do we index at a time, in bytes? Normally the * index in RAM is about half this size. Note that we index slightly * *more* than this amount, not slightly *less*. */ #define INDEX_CHUNK_SIZE (32 * 1024 * 1024) /* How big of an area of address space do we work in? Needs to be * smaller than physical memory + swap, and also smaller than * virtual_memory - mailbox_size, but large enough to contain the * largest index chunk safely. */ #define ARENA_SIZE (512 * 1024 * 1024) /* for panacea */ /* #define INDEX_CHUNK_SIZE (128 * 1024 * 1024) */ /* #define ARENA_SIZE (256 * 1024 * 1024) */ #define min(a, b) ((a) < (b) ? (a) : (b)) #define align(val, size) (((int)(val) + ((size) - 1)) & ~((size)-1)) /* file stuff */ int open_or_die(char *fname, int *length) { int rv = open(fname, O_RDONLY); struct stat st; if (rv < 0) { perror(fname); exit(1); } fstat(rv, &st); *length = st.st_size; return rv; } /* word indexing stuff */ typedef struct _posting { struct _posting *next; int doc_id; } posting; typedef struct _wordentry { struct _wordentry *next; int wordlen; int last_doc_found; posting *postings; char word[0]; } wordentry; char *arena = 0; char *arena_ptr = 0; void *allocate(size_t size) { char *rv = arena_ptr; arena_ptr += align(size, 4); return rv; } void add_posting_to_wordentry(wordentry *wep, int doc_id) { if (doc_id != wep->last_doc_found) { posting *new = allocate(sizeof(*new)); new->doc_id = doc_id; new->next = wep->postings; wep->postings = new; wep->last_doc_found = doc_id; } } /* hot spot: takes 24% of program's run time */ void add_posting_to_wordentry_list(wordentry **list, char *word, int wordlen, int doc_id) { wordentry **wepp; wordentry *new; for (wepp = list; *wepp; wepp = &((*wepp)->next)) { if ((*wepp)->wordlen == wordlen && !memcmp((*wepp)->word, word, wordlen)) break; } new = *wepp; if (!new) { new = allocate(sizeof(*new)+wordlen); memcpy(new->word, word, wordlen); new->wordlen = wordlen; new->last_doc_found = -1; /* invalid doc_id */ new->postings = NULL; } add_posting_to_wordentry(new, doc_id); /* delete from old position in list */ if (*wepp) *wepp = (*wepp)->next; /* move to front */ new->next = *list; *list = new; } void dump_wordentry_list(wordentry *list, FILE *out) { posting *p; for (; list; list = list->next) { fwrite(list->word, list->wordlen, 1, out); fprintf(out, ": "); for (p = list->postings; p; p = p->next) { fprintf(out, "%d", p->doc_id); if (p->next) fprintf(out, " "); } fprintf(out, "\n"); } } void test_word_indexing(void) { wordentry *list = NULL; char *words[] = { "fuzzy", "bunny", "bin", "packing", "bunny", "bunny", "bin", 0 }; char **wp; for (wp = words; *wp; wp++) { add_posting_to_wordentry_list(&list, *wp, strlen(*wp), wp-words); add_posting_to_wordentry_list(&list, *wp, strlen(*wp), wp-words); } dump_wordentry_list(list, stdout); } /* hash table */ /* Since we have move-to-front, this size is only about 10%-15% better * than 1021 for a sample corpus with 14 million words, 112K distinct * words, and 6M or so postings. */ #define HASHSIZE 32749 wordentry *index_hash[HASHSIZE]; /* thanks P. J. Weinberger and Michael Neumann. ick, this takes 12% of the program's run time, at 120 ns per call (according to gprof). */ int hash_word(char *s, int len) { int i; unsigned h = 0; for (i = len; i;) { i--; h = ((h << 4) ^ ((h&0xf0000000) >> 24)) + s[i]; } return h % HASHSIZE; } /* dumb approach, used to be 10% of whole indexing process, unless my * profiler lies, which it might. Dead code; see below for current * version. */ char *memstr(char *haystack, char *needle, int haystack_size) { char *t; char *lim = haystack + haystack_size - strlen(needle); int i; for (t = haystack; t < lim; t++) { for (i = 0; ; i++) { if (needle[i] == '\0') return t; if (needle[i] != t[i]) break; } } return NULL; } /* really "smart" approach. Boyer-Moore string search for "\nFrom ", * hand-compiled to C. About twice as fast as "dumb approach" above. * This makes the whole program about 5% faster, according to * gprof. Maybe that wasn't so smart after all. */ char *findfrom(char *haystack, int haystack_size) { char *s; char *t = s = haystack + 5; for (;;) { if (*t != ' ') goto advance; t--; if (*t != 'm') goto advance; t--; if (*t != 'o') goto advance; t--; if (*t != 'r') goto advance; t--; if (*t != 'F') goto advance; t--; if (*t == '\n') return t; advance: /* we know there's no match ending at or before 's', and the * characters after 't' look plausible. Given this particular search * string, there are two possibilities: either t<s, so the next * possible match starts at s+1 and runs to s+6, or (the usual case) * t==s, and we need to be alert to the opportunity that we're in the * middle of a match now. */ /* post switch: 12.12, 12.28, 12.24. * pre switch (if/elsif/else): 12.24, 12.33, 12.25. No noticeable * speed difference, but this is clearer. */ if (t==s) { switch(*t) { case 'm': s += 1; break; case 'o': s += 2; break; case 'r': s += 3; break; case 'F': s += 4; break; case '\n': s += 5; break; default: s += 6; break; } } else s += 6; t = s; if (t > haystack + haystack_size) return 0; } } /* a substitute for isalnum() and tolower(); using this for isalnum() * reduced user CPU time from 13.6 to 12.3 user seconds on the old * 50MB test data set, and using it for tolower() reduced total CPU * time from 11.3 to 10.77 seconds on the new 50MB data set. */ char wordchar[] = { "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" /* control chars */ "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" /* space through / */ "0123" "4567" "89\0\0" "\0\0\0\0" /* digits, :;<=>? */ "\0abc" "defg" "hijk" "lmno" /* capitals */ "pqrs" "tuvw" "xyz\0" "\0\0\0\0" "\0abc" "defg" "hijk" "lmno" /* lower case */ "pqrs" "tuvw" "xyz\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" "\0\0\0\0" }; /* this is the offset of the message start from the beginning of the mailbox */ int current_doc_id; /* hot spot: takes up 18% of program's CPU time */ void find_words(char *s, int len) { char *t = s; static char word[MAX_WORD_LEN]; int i; for (;;) { skip_nonword_characters: for (;;) { if (t >= s + len) return; if (wordchar[(unsigned char)*t]) break; t++; } /* scan word */ i = 0; while (t < s + len && wordchar[(unsigned char)*t]) { if (i == MAX_WORD_LEN) { while (t < s + len && wordchar[(unsigned char)*t]) t++; goto skip_nonword_characters; } word[i] = wordchar[(unsigned char)*t]; i++; t++; } add_posting_to_wordentry_list(index_hash + hash_word(word, i), word, i, current_doc_id); } } /* index some of a mailbox */ void index_by_mail_messages(char *s, int len) { char *new_msg_start; char *where_to_stop = min(s + current_doc_id + INDEX_CHUNK_SIZE, s + len); for (;;) { new_msg_start = findfrom(s + current_doc_id, len - current_doc_id); if (!new_msg_start) new_msg_start = s + len; find_words(s + current_doc_id, new_msg_start - s - current_doc_id); current_doc_id = new_msg_start - s + 1; if (new_msg_start >= where_to_stop) break; } } /* sorting the output by term */ void push(wordentry **stack, wordentry *item) { item->next = *stack; *stack = item; } wordentry *pop(wordentry **stack) { wordentry *rv = *stack; *stack = rv->next; /* should probably rv->next = NULL; */ return rv; } wordentry *nconc(wordentry *shortlist, wordentry *longlist) { wordentry **join; if (!shortlist) return longlist; join = &(shortlist->next); while (*join) join = &((*join)->next); *join = longlist; return shortlist; } wordentry *all_entries() { int i; wordentry *rv = NULL; for (i = 0; i < HASHSIZE; i++) { rv = nconc(index_hash[i], rv); index_hash[i] = 0; } return rv; } /* sort linked lists by moving entries to different lists. No * robustness against quicksort's O(N^2) worst case, because the input * data is sorted by hash value, then by recentness of occurrence, so * is very unlikely to be nearly sorted already. Hotspot; 14% of run * time is here. */ wordentry *quicksort_wordentries(wordentry *list) { wordentry *first, *before = NULL, *after = NULL, *wep; int cmp; if (!list) return list; first = pop(&list); while (list) { wep = pop(&list); cmp = memcmp(wep->word, first->word, min(wep->wordlen, first->wordlen)); if (!cmp && wep->wordlen < first->wordlen) cmp = -1; push((cmp < 0 ? &before : &after), wep); } first->next = quicksort_wordentries(after); return nconc(quicksort_wordentries(before), first); } /* forget everything in preparation for indexing the next mailbox segment */ void reset() { memset(index_hash, '\0', sizeof(index_hash)); arena_ptr = arena; /* deallocate all memory */ } /* compute the filename for a partial index */ char *part_filename(char *base, int filenum) { char *template = "%s.idx/partindex%04d"; /* zero-pad for sortability */ static char *buf = NULL; if (!buf) buf = malloc(strlen(base) + strlen(template) + 10); /* 10 should be enough! */ if (!buf) abort(); sprintf(buf, template, base, filenum); return buf; } /* main */ int main(int argc, char **argv) { int length; int fh = open_or_die(argv[1], &length); char *mm = mmap(0, length, PROT_READ, MAP_SHARED, fh, 0); if (mm == MAP_FAILED) { perror("mmap"); return 2; } arena_ptr = arena = malloc(ARENA_SIZE); if (!arena) { perror("malloc"); return 3; } current_doc_id = 0; int filenum = 0; while (current_doc_id < length) { index_by_mail_messages(mm, length); char *indexname = part_filename(argv[1], filenum); FILE *out = fopen(indexname, "w"); if (!out) { perror(indexname); return 1; } dump_wordentry_list(quicksort_wordentries(all_entries()), out); reset(); filenum++; } munmap(mm, length); return 0; } /* end maildex-parts.c */ So here's mergedex.c: /* * Merge maildex index files in C: mergedex.c */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <assert.h> /* * BUG HISTORY * * This was the first time I'd written anything in C in a while, and * apparently I'd forgotten a lot about how to be careful. I was sort * of coding in a rather happy-go-lucky fashion, without tests or a * lot of mental double-checking, since this is sort of throwaway * code. As you might expect, this resulted in a lot of bugs. * * 1. forgot to include a check for end of input file in readline() (fixed) * 2. accidentally tripped newly inserted check for end of input file unless * initial buffer size was too small (introduced by fix for #1, fixed) * 3. stopped noticing when I'd read a full line, due to fix for #2 (fixed badly) * (Since I was clearly just banging on shit rather than actually * debugging at this point, I slept.) * 4. pop_from_bucket popped too much from the bucket because I was confused * about whether I had a pointer to the last item I wanted to return or a * pointer to the item after it. (fixed.) * 5. pop_from_bucket crashed in the case where it was returning only one line, * because it was trying to set (*last)->next to NULL, but *last * had changed to point to something else since it had decided that * was the thing to do. (fixed) * 6. Trying to simplify the fix for #3, I tried having readline() look * directly at strlen(rv->s) instead of current_length, rather than * setting current_length to strlen(rv->s) just before exiting the * loop. But the loop has two exits, one of which often leaves * rv->s uninitialized. */ /* * PERFORMANCE * * My benchmark is a set of 15 partial index files I have lying * around, totaling almost 1.6 million lines and 98 megabytes, which * produce an output file of about a million lines and 87 megabytes. * This takes longer than I'd like: * * [EMAIL PROTECTED]:~/devel$ time ./mergedex ~/sdc1/mail/tmp.mboxtail.idx/newindex* > tmp.newidx * * real 0m37.771s * user 0m9.740s * sys 0m3.170s * * This large discrepancy between CPU and wallclock time on an * otherwise idle machine suggests that the program is I/O-bound, * which is OK, except that it's only succeeding in writing output at * about 2 megabytes per (wallclock) second. hdparm -t says my disk * can read at almost 10 megabytes per second, so presumably it should * be able to write roughly as fast. Even if I direct output to * /dev/null, it still takes 27 wallclock seconds (which suggests, * indeed, that the data is being written at nearly ten megabytes per * second.) * * I concluded that probably it was doing too many seeks per second, * so I tried increasing input buffer sizes to compensate: * setvbuf(ic->infiles[i], realloc_or_die(NULL, 512 * 1024), _IOFBF, 512 * 1024); * But that didn't help, although strace showed that it did result in * input file read system calls happening in half-mebibyte chunks * rather than tiny ones. * * Catting the same files to /dev/null, which involves reading the * same amount of data, takes only 13 seconds. What explains this * 14-second difference? It's approximately equal to the amount of * CPU time involved, but the CPU time shouldn't make a difference * with DMA! * * So I don't know what's wrong, but I think the program should run * twice as fast as it does, independent of its CPU time. * * I profiled a mostly-unoptimized version of it and got these results: * % cumulative self self total * time seconds seconds calls us/call us/call name * 54.71 2.02 2.02 1593782 1.27 1.27 readline * 28.44 3.08 1.05 10078850 0.10 0.10 key_lt * 8.13 3.38 0.30 1593767 0.19 0.75 add_to_bucket * 4.06 3.53 0.15 940530 0.16 3.90 output_line * 2.17 3.61 0.08 1593782 0.05 2.07 read_numbered_infile * 1.63 3.67 0.06 940530 0.06 0.24 pop_from_bucket * 1.08 3.71 0.04 main * 0.00 3.71 0.00 940531 0.00 0.00 all_exhausted * 0.00 3.71 0.00 1 0.00 30.98 open_infiles * * This suggests that a better "bucket" data structure could reduce * the CPU usage by up to a third. * */ /**********************************************************************/ /* multiple-input-file merging stuff. */ /* Our approach is that we have N input files, and we maintain the * next input line from each input file in a "bucket". From time to * time we pull the lexically first input line out of the "bucket" and * read the next line from that file into the bucket. Since our * desired output is one output line for each group of input lines * with the same key, we have to hang on to those input lines until * we've seen all of them. */ /* * input_line represents a line that has been read from some input * file. */ typedef struct input_line { /* keylen contains the number of characters in the key. */ int keylen; /* next is used to chain together input lines in a linked list. */ struct input_line *next; /* filenum specifies which file this line came from, so we can read in * another line from that file when we pull this one out of the * bucket. */ int filenum; /* s is a NUL-terminated string, which I'm not fond of, but fgets's * interface doesn't let me make the program 8-bit clean anyway, * so I might as well take advantage of that bug to simplify the * program. Doesn't contain the newline. */ char s[0]; } input_line; /* look Ma, a whole dynamic allocation library in one function! */ inline void *realloc_or_die(void *s, size_t newsize) { s = realloc(s, newsize); if (!s) { fprintf(stderr, "Couldn't realloc(%d)\n", sizeof(input_line) + newsize); abort(); } return s; } /* create an input_line from the next line from a given file. Leaves * filenum uninitialized! */ input_line *readline(FILE *infile) { /* just use stdio for input files; otherwise we'd have to roll our * own buffering, ick. */ int current_length = 0; int new_length = 28; /* a guess at a usually-sufficient value */ char *nl; /* since we don't need to initialize any of the fields, we can let * the realloc in the loop do the initial allocation */ input_line *rv = NULL; for (;;) { rv = realloc_or_die(rv, sizeof(input_line) + new_length); if (!fgets(rv->s + current_length, new_length - current_length, infile)) break; /* EOF or error */ if ((nl = strchr(rv->s + current_length, '\n'))) { *nl = '\0'; current_length = strlen(rv->s); break; } current_length = strlen(rv->s); new_length = current_length * 2; } if (!current_length) { free(rv); return NULL; } rv->keylen = strchr(rv->s, ':') - rv->s; rv->next = NULL; return rv; } inline char keychar(input_line *line, int offset) { if (offset < line->keylen) return line->s[offset]; return '\0'; } /* return true if a's key is less than b's key */ inline int key_lt(input_line *a, input_line *b) { int minlen = a->keylen; if (b->keylen < minlen) minlen = b->keylen; for (int i = 0; i < minlen; i++) { if (a->s[i] != b->s[i]) return a->s[i] < b->s[i]; } return a->keylen < b->keylen; } /* Our bucket at present is just a sorted linked list! We even abuse * the 'next' link to build it. Merging 15 indices totaling about 87 * megabytes, which you'd expect would involve 8 or so comparisons per * insertion, results in 1.5 million lines of input, almost a million * lines of output, and ten million comparisons --- pretty close to * the 8. These comparisons actually consume a substantial fraction * of the program's CPU time, but as mentioned in the comment about * PERFORMANCE, I think reducing its CPU time wouldn't actually speed * it up on my machine. */ inline int bucket_lt(input_line **bucket, input_line *line) { /* we compare filenums so the last file comes first, in order to * exactly reproduce the behavior of the old code */ int rv = key_lt(*bucket, line); if (rv) return rv; rv = key_lt(line, *bucket); if (rv) return !rv; return (*bucket)->filenum > line->filenum; } /* add a new input line to the bucket. */ void add_to_bucket(input_line **bucket, input_line *line) { while (*bucket && bucket_lt(bucket, line)) bucket = &((*bucket)->next); line->next = *bucket; *bucket = line; } /* remove the lexically-first key equivalence class from the bucket * and return it, all linked together nicely */ input_line *pop_from_bucket(input_line **bucket) { assert(*bucket); input_line *rv = *bucket; input_line **last = bucket; /* find equivalence class */ while ((*last)->next && !key_lt(*last, (*last)->next)) last = &((*last)->next); input_line **end = &((*last)->next); /* remove it from bucket */ *bucket = *end; /* unlink it from the rest of the bucket */ *end = NULL; return rv; } typedef struct { FILE **infiles; input_line *bucket; } infile_collection; void read_numbered_infile(infile_collection *infiles, int n) { input_line *new_line = readline(infiles->infiles[n]); if (!new_line) return; new_line->filenum = n; add_to_bucket(&(infiles->bucket), new_line); } void output_line(infile_collection *infiles, FILE *output) { input_line *lines = pop_from_bucket(&(infiles->bucket)); fputs(lines->s, output); for (;;) { read_numbered_infile(infiles, lines->filenum); input_line *next = lines->next; free(lines); lines = next; if (!lines) break; fprintf(output, " %s", lines->s + lines->keylen + 2); /* 2: ": " */ } fprintf(output, "\n"); } int all_exhausted(infile_collection *ic) { return !ic->bucket; } infile_collection *open_infiles(int n, char **names) { infile_collection *ic = realloc_or_die(NULL, sizeof(*ic)); ic->bucket = NULL; ic->infiles = realloc_or_die(NULL, sizeof(ic->infiles[0]) * n); for (int i = 0; i < n; i++) { ic->infiles[i] = fopen(names[i], "r"); if (!ic->infiles[i]) { fprintf(stderr, "fopen: "); perror(names[i]); exit(1); } read_numbered_infile(ic, i); } return ic; } int main(int argc, char **argv) { int n = argc - 1; if (!n) { fprintf(stderr, "Usage: %s file1 [file2 [file3 ...]] > outfile\n", argv[0]); return 1; } infile_collection *ic = open_infiles(n, argv + 1); while (!all_exhausted(ic)) { output_line(ic, stdout); } return 0; } /* end mergedex.c */ The Python program follows: #!/usr/bin/env python """maildex.py: Index my mailbox. This utility generates a term index of an mbox file and allows you to search it. If you run it with just the mbox file as an argument, it assumes you want to reindex it; if you run it with the mbox file and some other terms, it does a search. Like Google, it's default-and, and you can precede a term with a - to exclude messages containing it. Requires recent Python, probably 2.3. Won't work in 2.1.3. I have only 4.5M postings out of 7K messages in 50MB, so I can make a list of all postings in RAM. It takes about a minute or two and another 50MB of RAM. Writing it to disk with Berkeley DB used to take another 10 minutes or so! The index files are about 27MB (down from 45MB with Berkeley DB), but gzip to <11MB. Unimplemented feature: handle mbox files bigger than the address space. """ # - Python space costs: # Looks like a dict is about 154 bytes, an empty list about 48, # and a one-item list about 96 bytes. There are about 340K terms # if we use the naive [a-zA-Z0-9]+ term definition, so about 340K * # 96 + 4M * 4 bytes = ~48MB. # "Well, I've read about inverted indices and stuff, but I've never # built a full-text search engine." - me on Thursday # "Dude, it's, like, cake." - DMH import mmap, re, sys, os, merge, time def sorted(alist): "Returns a list after sorting it." alist.sort() return alist class lindb: """Linear database. I was storing data in a Berkeley DB file, but it was really slow --- it took 10 minutes to write out 45 megabytes of data with 340K keys. So I thought I'd take the Lucene approach; just write a sorted list of key-value pairs, write a "headwords" file to make it easy to find the right part of the file, then linearly search. This module lets it take much less time, by just sorting the data and storing it in a text file. It's very slightly slower than the Berkeley DB version, but uses about half the space (for my data.) The big compromises are that this module doesn't allow the same flexibility in access as Berkeley DB, and it doesn't allow the storage of arbitrary data --- data with newlines or ": " in it can screw things up. The basic operations: - db = lindb(filename) --- open database in 'filename', creating if nec. - db.set(somedict) --- set contents of database to contents of 'somedict' - db[key] --- return value for 'key' or raise KeyError - db.get(key, default) --- return value for 'key' or default """ ### basic primitives # I tried various powers of 2 here: 131072, 32768, 1024, 4096, and # now 8192; the smaller it was, the better the performance was, # but the difference between 4096 and 1024 wasn't significant. threshold_size = 4096 def open(self, filename, mode='r'): return file("%s/%s" % (self.dirname, filename), mode) def readline(self, f): """Read the next key-value pair from open file f. Returns (key, value) on success, or (None, None) on EOF. """ line = f.readline() if not line: return None, None # XXX no robustness against incomplete writes return line[:-1].split(': ', 2) def writeline(self, fh, key, value): fh.write("%s: %s\n" % (key, value)) ### internal routines def headwords(self, filename): """Generate a headwords file for 'filename'. The headwords file lists a few words from the file, along with the positions of their entries in the file. This allows the lookup process to find a particular entry relatively quickly, while retaining high locality of access. """ f = self.open(filename) hwdict = {} blockno = None while 1: pos = f.tell() name, value = self.readline(f) if name is None: break nblockno = pos // self.threshold_size if nblockno != blockno: hwdict[name] = pos blockno = nblockno self.writefile_with_headwords(filename + '.hw', hwdict) def writefile_with_headwords(self, filename, contents): size = self.writefile(filename, contents) if size > self.threshold_size: self.headwords(filename) def writefile(self, filename, contents): """Write file named 'filename' with contents of dict 'contents'. If necessary, this writes a headwords file for 'filename'. """ # obviously this technique won't work at all for stuff that doesn't # fit in RAM f = self.open(filename, 'w') for key in sorted(contents.keys()): self.writeline(f, key, contents[key]) size = f.tell() f.close() try: os.unlink('%s/%s.hw' % (self.dirname, filename)) except OSError: pass return size def lookup(self, filename, term): """Return greatest (key, value) pair not greater than 'term'. This returns the key-value pair where the key is term if there is one, otherwise the one before where 'term' would be. Uses headwords files if they exist to speed up access. """ start = 0 if os.path.exists('%s/%s.hw' % (self.dirname, filename)): name, value = self.lookup(filename + '.hw', term) if name is not None: assert type(name) is type(term) assert name <= term start = int(value) f = self.open(filename) f.seek(start, 0) name, value = None, None while 1: nname, nvalue = self.readline(f) if nname is None or nname > term: return name, value name, value = nname, nvalue def readlines(self, filename): """Yield each name-value pair from the specified file. Note that this interface is inconsistent with readline, which takes a file, not a filename. """ f = self.open(filename) while 1: n, v = self.readline(f) if n is None: return yield n, v def merge_indices(self, outfilename, filenames): outfile = self.open(outfilename, 'w') files = [self.readlines(filename) for filename in filenames] lastkey, values = None, [] for (key, value), filenum in merge.merge(files): if key != lastkey: if lastkey is not None: values.reverse() self.writeline(outfile, lastkey, ' '.join(values)) lastkey, values = key, [] values.extend(value.split()) if lastkey is not None: values.reverse() self.writeline(outfile, lastkey, ' '.join(values)) size = outfile.tell() outfile.close() if size > self.threshold_size: self.headwords(outfilename) # XXX maybe mmap? ### external interfaces def __init__(self, dirname): self.dirname = dirname if not os.path.exists(dirname): os.mkdir(dirname) def __getitem__(self, name): lname, lvalue = self.lookup('contents', name) if lname == name: return lvalue raise KeyError, name def get(self, name, value=None): try: return self[name] except KeyError: return value def set(self, contents): self.writefile_with_headwords('contents', contents) def ok(a, b): "Regression test function." assert a == b, (a, b) def test_lindb(): "Very basic regression test for linear db class lindb." os.system('rm -rf tmp.db') x = lindb("tmp.db") x.set({'roar': 'lion'}) ok(x['roar'], 'lion') ok(x.get('roar', 'boo'), 'lion') ok(x.get('foar', 'boo'), 'boo') os.system('rm -rf tmp.db') #test_lindb() def mkindex(mm, start, maxpostings): """Create an in-memory index of part of an mbox file. This function takes a string-like object, such as an mmap object, and returns a list of byte offsets in it where mail messages start, and a postings dictionary that maps words to lists of message starting byte offsets. """ poses = [start] wordpat = re.compile("[a-zA-Z0-9]+") fi = wordpat.finditer(mm, start) allpostings = {} totalpostings = 0 while 1: # This won't match the beginning of the first message. nps = mm.find("\nFrom ", poses[-1]) if nps == -1: nps = len(mm) nps += 1 this_guys_postings = {} for mo in fi: if mo.start() >= nps: # I wish I could push back the item we just got, because # it belongs to the next message and won't get indexed, # but it's the "From" at the beginning of the message. # So it doesn't really matter. break this_guys_postings[mo.group(0).lower()] = 1 for word in this_guys_postings.iterkeys(): if not allpostings.has_key(word): allpostings[word] = [] allpostings[word].append(poses[-1]) totalpostings += len(this_guys_postings) if nps > len(mm) or totalpostings >= maxpostings: break poses.append(nps) if len(poses) % 250 == 0: print "%d msgs" % len(poses) print "indexed %d total postings" % totalpostings return poses, allpostings, nps class index: """Index of an mbox. Stores a lindb under mboxname.idx which contains posting lists for the mbox's messages, and allows you to search it. """ def postings(self, term): "Returns the posting list for a term." return [int(id) for id in self.db.get(term, '').split()] def msg(self, pos): """Returns the message at a particular byte offset in the mbox. The items in posting lists are just such byte offsets. """ npos = self.mm.find("\nFrom ", pos + 1) if npos == -1: npos = self.size rv = self.mm[pos:npos] assert "\nFrom " not in rv return rv def __init__(self, fname): "init. fname is the path to the mbox." self.f = file(fname) self.f.seek(0, 2) # EOF self.size = self.f.tell() self.mm = mmap.mmap(self.f.fileno(), self.size, access=mmap.ACCESS_READ) self.db = lindb(fname + '.idx') def write(self): "Reindex an mbox." print "starting at", time.time() pos = 0 filenames = [] while pos < len(self.mm): # 1/2 million fit in RAM easily poses, allpostings, pos = mkindex(self.mm, pos, 500 * 1000) print "indexed to %d;" % pos for term in allpostings.keys(): allpostings[term] = ' '.join([str(x) for x in allpostings[term]]) filename = 'newindex%d' % len(filenames) self.db.writefile(filename, allpostings) filenames.append(filename) print "wrote index;" print "merging indices at", time.time() self.db.merge_indices('contents', filenames) print "done at", time.time() def search(self, terms, exclusions=[]): """Returns a posting list for some search. 'terms' is a list of terms to require the presence of; 'exclusions' is an optional list of terms to require the absence of. """ lists = [self.postings(term) for term in terms] excludelists = [self.postings(term) for term in exclusions] # intersect lists. # sort by length. ii is in the tuples to prevent comparing the lists # themselves. sorted_lists = sorted([(len(lists[ii]), ii, lists[ii]) for ii in range(len(lists))]) # start with smallest rvdict = dict([(key, 1) for key in sorted_lists[0][2]]) # it might be better to, I don't know, do this last? for list in excludelists: for message in list: if rvdict.has_key(message): del rvdict[message] # now remove items not in all the other lists for _, _, list in sorted_lists[1:]: newdict = {} for message in list: if rvdict.has_key(message): newdict[message] = 1 rvdict = newdict rv = sorted(rvdict.keys()) rv.reverse() return rv def cmdlinesearch(self, terms): """Parses a search, runs it, and returns a posting list. You have to break it up into terms first, as the Unix command line does for you, but then this parses it to see what has a '-' before it and what doesn't. """ plusterms = [] minusterms = [] for term in terms: if term.startswith('-'): minusterms.append(term[1:].lower()) else: plusterms.append(term.lower()) return self.search(plusterms, minusterms) def main(argv, stdout): """Command-line search/reindex interface.""" if len(argv) > 2: idx = index(argv[1]) for pos in idx.cmdlinesearch(argv[2:]): stdout.write(idx.msg(pos)) elif len(argv) == 2: index(argv[1]).write() else: print ("Usage:\n\t%s mailboxfilename -- to reindex\n" "\t%s mailboxfilename term1 term2 -unwanted1 -unwanted2 -- to look" % (argv[0], argv[0])) if __name__ == '__main__': main(sys.argv, sys.stdout) # end maildex.py And here's merge.py, which maildex.py imports: #!/usr/bin/env python '''Generic lazy merge algorithm. There are lots of "merge" algorithms; generally they make one pass over each of several sorted lists. They work well in environments where locality of reference is important, such as when your data is much larger than your physical RAM. Some of them work on arbitrary numbers of lists of items of the same type, while others work on specific numbers of lists of items of different types. The thing that binds them all together is that they all traverse their input lists in the same way. This way is not, in general, trivial to implement. This module implements it so I won't have to implement it again, at least not for small numbers of lists. Most problems that have merge-algorithm solutions also have hashtable solutions which run faster when you have sufficient random-access storage to hold all your data. Accordingly, merge algorithms have fallen very much out of favor over the last thirty or forty years as main memory sizes have increased from megabits to terabits. Some examples of merge algorithms: - produce a single sorted list from multiple sorted lists. (This is the kind of merge used in mergesort.) - produce the set union, set intersection, or set difference of two sorted lists. - given a master file and an update file sorted in the same order, produce a new master file with all the updates applied. It turns out that you can implement all the other algorithms on top of the first one, as long as you remember which input list each output item came from. My particular problem is that I have inverted index files rather too large for my RAM, so I have to index my corpus in chunks, then merge the individual chunks to form a master index. ''' def merge2(seq1, seq2): """Generator for the merged output of two sorted sequences.""" it1, it2 = iter(seq1), iter(seq2) try: thing1 = it1.next() except StopIteration: for item in it2: yield item return try: thing2 = it2.next() except StopIteration: yield thing1 for item in it1: yield item return while 1: if thing1 > thing2: it1, it2, thing1, thing2 = it2, it1, thing2, thing1 yield thing1 try: thing1 = it1.next() except StopIteration: yield thing2 for item in it2: yield item return def taggediter(iter, tag): """Generator that tags each item in a sequence with its source.""" for item in iter: yield (item, tag) def _mergen(seqs): """Generate the merged output of N sorted sequences, N >= 0. Creates a heap of merge2 iterators that do the actual work, then returns the root of that heap. """ if len(seqs) == 0: return [] if len(seqs) == 1: return seqs[0] elif len(seqs) == 2: return merge2(seqs[0], seqs[1]) else: part = len(seqs)/2 # 1 -> 0, 2 -> 1, 3 -> 1, 4 -> 2, 5 -> 2 return merge2(_mergen(seqs[:part]), _mergen(seqs[part:])) def merge(iters): """Generate the tagged, merged output of N sorted sequences, N >= 0. Items from the first sequence will be returned as (item, 0), items from the second as (item, 1), etc. """ return _mergen([taggediter(iters[i], i) for i in range(len(iters))]) def ok(a, b): assert a==b, (a,b) def test(): ok(list(merge([])), []) ok(list(merge([['a', 'b', 'c']])), [('a', 0), ('b', 0), ('c', 0)]) ok(list(merge([[1], []])), [(1, 0)]) ok(list(merge([[], [1]])), [(1, 1)]) ok(list(merge([[1], [2]])), [(1, 0), (2, 1)]) ok(list(merge([[2], [1]])), [(1, 1), (2, 0)]) ok(list(merge([[1, 2, 3], [1.5, 3.25, 3.5]])), [(1, 0), (1.5, 1), (2, 0), (3, 0), (3.25, 1), (3.5, 1)]) ok(list(merge([[1], [5], [4], [6]])), [(1, 0), (4, 2), (5, 1), (6, 3)]) test() # end merge.py