Persuant to the discussions currently on -hackers, here's a patch that uses zlib to compress the tapes as they go to disk. I default to the compression level 3 (think gzip -3).
Please speed test all you like, I *think* it's bug free, but you never know. Outstanding questions: - I use zlib because the builtin pg_lzcompress can't do what zlib does. Here we setup input and output buffers and zlib will process as much as it can (input empty or output full). This means no marshalling is required. We can compress the whole file without having it in memory. - zlib allocates memory for compression and decompression, I don't know how much. However, it allocates via the postgres mcxt system so it shouldn't too hard to find out. Simon pointed out that we'll need to track this because we might allow hundreds of tapes. - Each tape is compressed as one long compressed stream. Currently no seeking is allowed, so only sorts, no joins! (As tom said, quick and dirty numbers). This should show this possibility in its best light but if we want to support seeking we're going to need to change that. Maybe no compression on the last pass? - It's probable that the benefits are strongly correlated to the speed of your disk subsystem. We need to measure this effect. I can't accuratly measure this because my compiler doesn't inline any of the functions in tuplesort.c. In my test of a compression ratio around 100-to-1, on 160MB of data with tiny work_mem on my 5 year old laptop, it speeds it up by 60% so it's obviously not a complete waste of time. Ofcourse, YMMV :) Have a nice day, -- Martijn van Oosterhout <kleptog@svana.org> http://svana.org/kleptog/ > From each according to his ability. To each according to his ability to > litigate.
Index: src/backend/Makefile =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/Makefile,v retrieving revision 1.114 diff -c -r1.114 Makefile *** src/backend/Makefile 5 Jan 2006 01:56:29 -0000 1.114 --- src/backend/Makefile 17 May 2006 16:16:20 -0000 *************** *** 25,31 **** LIBS := $(filter-out -lpgport, $(LIBS)) # The backend doesn't need everything that's in LIBS, however ! LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) ########################################################################## --- 25,31 ---- LIBS := $(filter-out -lpgport, $(LIBS)) # The backend doesn't need everything that's in LIBS, however ! LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) ########################################################################## Index: src/backend/utils/sort/logtape.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/logtape.c,v retrieving revision 1.21 diff -c -r1.21 logtape.c *** src/backend/utils/sort/logtape.c 7 Mar 2006 23:46:24 -0000 1.21 --- src/backend/utils/sort/logtape.c 17 May 2006 16:16:21 -0000 *************** *** 80,91 **** --- 80,96 ---- #include "storage/buffile.h" #include "utils/logtape.h" + #include "zlib.h" + /* * Block indexes are "long"s, so we can fit this many per indirect block. * NB: we assume this is an exact fit! */ #define BLOCKS_PER_INDIR_BLOCK ((int) (BLCKSZ / sizeof(long))) + /* compression level to use for zlib */ + #define COMPRESSION_LEVEL 3 + /* * We use a struct like this for each active indirection level of each * logical tape. If the indirect block is not the highest level of its *************** *** 131,136 **** --- 136,142 ---- * reading. */ char *buffer; /* physical buffer (separately palloc'd) */ + z_stream zstream; /* zlib stream */ long curBlockNumber; /* this block's logical blk# within tape */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ *************** *** 509,514 **** --- 515,532 ---- } + static void * + lts_zalloc(void *cxt, unsigned int items, unsigned int size) + { + return MemoryContextAlloc( cxt, items*size ); + } + + static void + lts_zfree(void *cxt, void *ptr) + { + return pfree( ptr ); + } + /* * Create a set of logical tapes in a temporary underlying file. * *************** *** 556,561 **** --- 574,586 ---- lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + memset( <->zstream, 0, sizeof(lt->zstream) ); + lt->zstream.zalloc = lts_zalloc; + lt->zstream.zfree = lts_zfree; + lt->zstream.opaque = CurrentMemoryContext; + + deflateInit( <->zstream, COMPRESSION_LEVEL ); /* Fast compression */ } return lts; } *************** *** 627,633 **** void *ptr, size_t size) { LogicalTape *lt; - size_t nthistime; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; --- 652,657 ---- *************** *** 643,650 **** lt->indirect->nextup = NULL; } ! while (size > 0) { if (lt->pos >= BLCKSZ) { /* Buffer full, dump it out */ --- 667,678 ---- lt->indirect->nextup = NULL; } ! lt->zstream.next_in = ptr; ! lt->zstream.avail_in = size; ! ! while (lt->zstream.avail_in > 0) { + int err; if (lt->pos >= BLCKSZ) { /* Buffer full, dump it out */ *************** *** 661,680 **** lt->nbytes = 0; } ! nthistime = BLCKSZ - lt->pos; ! if (nthistime > size) ! nthistime = size; ! Assert(nthistime > 0); ! memcpy(lt->buffer + lt->pos, ptr, nthistime); lt->dirty = true; ! lt->pos += nthistime; ! if (lt->nbytes < lt->pos) ! lt->nbytes = lt->pos; ! ptr = (void *) ((char *) ptr + nthistime); ! size -= nthistime; } } /* --- 689,741 ---- lt->nbytes = 0; } ! lt->zstream.next_out = lt->buffer + lt->pos; ! lt->zstream.avail_out = BLCKSZ - lt->pos; ! err = deflate( <->zstream, Z_NO_FLUSH ); ! lt->pos = BLCKSZ - lt->zstream.avail_out; ! lt->nbytes = lt->pos; ! lt->dirty = true; ! ! if( err != Z_OK ) ! elog(ERROR, "deflate returned error: %d\n", err ); ! } ! } ! /* Flushes any data still present in the buffers of the compression module. ! * Only necessary for writing (deflating), for reading we don't particularly ! * care about unflished data */ + static void + ltsFlush( LogicalTapeSet *lts, LogicalTape *lt) + { + int err; + + if( !lt->dirty ) + return; + + lt->zstream.next_in = NULL; + lt->zstream.avail_in = 0; + + for(;;) + { + lt->zstream.next_out = lt->buffer + lt->pos; + lt->zstream.avail_out = BLCKSZ - lt->pos; + err = deflate( <->zstream, Z_FINISH ); lt->dirty = true; ! ! if( err == Z_STREAM_END ) ! break; ! Assert(err == Z_OK); ! Assert(lt->zstream.avail_out == 0); ! ltsDumpBuffer(lts, lt); ! lt->numFullBlocks++; ! lt->curBlockNumber++; ! lt->nbytes = 0; ! lt->pos = 0; } + lt->pos = BLCKSZ - lt->zstream.avail_out; + lt->nbytes = lt->pos; + return; } /* *************** *** 692,697 **** --- 753,761 ---- Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; + lt->zstream.next_in = NULL; + lt->zstream.avail_in = 0; + if (!forWrite) { if (lt->writing) *************** *** 702,711 **** --- 766,781 ---- * (destructive) read. */ if (lt->dirty) + { + ltsFlush(lts, lt); ltsDumpBuffer(lts, lt); + } lt->lastBlockBytes = lt->nbytes; lt->writing = false; datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); + + deflateEnd( <->zstream ); + inflateInit( <->zstream ); } else { *************** *** 715,720 **** --- 785,791 ---- */ Assert(lt->frozen); datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); + inflateReset( <->zstream ); } /* Read the first block, or reset if tape is empty */ lt->curBlockNumber = 0L; *************** *** 761,766 **** --- 832,840 ---- lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + inflateEnd( <->zstream ); + deflateInit( <->zstream, COMPRESSION_LEVEL ); } } *************** *** 774,788 **** void *ptr, size_t size) { LogicalTape *lt; ! size_t nread = 0; ! size_t nthistime; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(!lt->writing); ! while (size > 0) { if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ --- 848,865 ---- void *ptr, size_t size) { LogicalTape *lt; ! // size_t nread = 0; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(!lt->writing); ! lt->zstream.next_out = ptr; ! lt->zstream.avail_out = size; ! ! while (lt->zstream.avail_out > 0) { + int err; if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ *************** *** 801,821 **** if (lt->nbytes <= 0) break; /* EOF (possible here?) */ } ! ! nthistime = lt->nbytes - lt->pos; ! if (nthistime > size) ! nthistime = size; ! Assert(nthistime > 0); ! ! memcpy(ptr, lt->buffer + lt->pos, nthistime); ! ! lt->pos += nthistime; ! ptr = (void *) ((char *) ptr + nthistime); ! size -= nthistime; ! nread += nthistime; } ! return nread; } /* --- 878,895 ---- if (lt->nbytes <= 0) break; /* EOF (possible here?) */ } ! ! lt->zstream.next_in = lt->buffer + lt->pos; ! lt->zstream.avail_in = lt->nbytes - lt->pos; ! err = inflate( <->zstream, Z_NO_FLUSH ); ! lt->pos = lt->nbytes - lt->zstream.avail_in; ! /* Here we can detect the end of the compressed stream, ! * can't do anything with that information though */ ! if( err != Z_OK && err != Z_STREAM_END) ! elog(ERROR, "Inflate returned error: %d\n", err); } ! return size - lt->zstream.avail_out; } /* *************** *** 844,850 **** --- 918,927 ---- * partial indirect blocks, rewind for nondestructive read. */ if (lt->dirty) + { + ltsFlush(lts, lt); ltsDumpBuffer(lts, lt); + } lt->lastBlockBytes = lt->nbytes; lt->writing = false; lt->frozen = true; *************** *** 853,858 **** --- 930,939 ---- lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + deflateEnd( <->zstream ); + inflateInit( <->zstream ); + if (datablocknum != -1L) { ltsReadBlock(lts, datablocknum, (void *) lt->buffer); *************** *** 879,884 **** --- 960,967 ---- long nblocks; int newpos; + elog(PANIC,"LogicalTapeBackspace on compressed file not yet implemented"); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->frozen); *************** *** 944,949 **** --- 1027,1034 ---- { LogicalTape *lt; + elog(PANIC,"LogicalTapeSeek on compressed file not yet implemented"); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->frozen); *************** *** 1007,1012 **** --- 1092,1099 ---- { LogicalTape *lt; + elog(PANIC,"LogicalTapeTell on compressed file not yet implemented"); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; *blocknum = lt->curBlockNumber;
signature.asc
Description: Digital signature