Persuant to the discussions currently on -hackers, here's a patch that
uses zlib to compress the tapes as they go to disk. I default to the
compression level 3 (think gzip -3).

Please speed test all you like, I *think* it's bug free, but you never
know.

Outstanding questions:

- I use zlib because the builtin pg_lzcompress can't do what zlib does.
Here we setup input and output buffers and zlib will process as much as
it can (input empty or output full). This means no marshalling is
required. We can compress the whole file without having it in memory.

- zlib allocates memory for compression and decompression, I don't know
how much. However, it allocates via the postgres mcxt system so it
shouldn't too hard to find out. Simon pointed out that we'll need to
track this because we might allow hundreds of tapes.

- Each tape is compressed as one long compressed stream. Currently no
seeking is allowed, so only sorts, no joins! (As tom said, quick and
dirty numbers). This should show this possibility in its best light
but if we want to support seeking we're going to need to change that.
Maybe no compression on the last pass?

- It's probable that the benefits are strongly correlated to the speed
of your disk subsystem. We need to measure this effect. I can't
accuratly measure this because my compiler doesn't inline any of the
functions in tuplesort.c.

In my test of a compression ratio around 100-to-1, on 160MB of data
with tiny work_mem on my 5 year old laptop, it speeds it up by 60% so
it's obviously not a complete waste of time. Ofcourse, YMMV :)

Have a nice day,
-- 
Martijn van Oosterhout   <kleptog@svana.org>   http://svana.org/kleptog/
> From each according to his ability. To each according to his ability to 
> litigate.
Index: src/backend/Makefile
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/Makefile,v
retrieving revision 1.114
diff -c -r1.114 Makefile
*** src/backend/Makefile        5 Jan 2006 01:56:29 -0000       1.114
--- src/backend/Makefile        17 May 2006 16:16:20 -0000
***************
*** 25,31 ****
  LIBS := $(filter-out -lpgport, $(LIBS))
  
  # The backend doesn't need everything that's in LIBS, however
! LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, 
$(LIBS))
  
  ##########################################################################
  
--- 25,31 ----
  LIBS := $(filter-out -lpgport, $(LIBS))
  
  # The backend doesn't need everything that's in LIBS, however
! LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
  
  ##########################################################################
  
Index: src/backend/utils/sort/logtape.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/logtape.c,v
retrieving revision 1.21
diff -c -r1.21 logtape.c
*** src/backend/utils/sort/logtape.c    7 Mar 2006 23:46:24 -0000       1.21
--- src/backend/utils/sort/logtape.c    17 May 2006 16:16:21 -0000
***************
*** 80,91 ****
--- 80,96 ----
  #include "storage/buffile.h"
  #include "utils/logtape.h"
  
+ #include "zlib.h"
+ 
  /*
   * Block indexes are "long"s, so we can fit this many per indirect block.
   * NB: we assume this is an exact fit!
   */
  #define BLOCKS_PER_INDIR_BLOCK        ((int) (BLCKSZ / sizeof(long)))
  
+ /* compression level to use for zlib */
+ #define COMPRESSION_LEVEL    3
+ 
  /*
   * We use a struct like this for each active indirection level of each
   * logical tape.  If the indirect block is not the highest level of its
***************
*** 131,136 ****
--- 136,142 ----
         * reading.
         */
        char       *buffer;                     /* physical buffer (separately 
palloc'd) */
+       z_stream        zstream;                /* zlib stream */
        long            curBlockNumber; /* this block's logical blk# within 
tape */
        int                     pos;                    /* next read/write 
position in buffer */
        int                     nbytes;                 /* total # of valid 
bytes in buffer */
***************
*** 509,514 ****
--- 515,532 ----
  }
  
  
+ static void *
+ lts_zalloc(void *cxt, unsigned int items, unsigned int size)
+ {
+       return MemoryContextAlloc( cxt, items*size );
+ }
+ 
+ static void 
+ lts_zfree(void *cxt, void *ptr)
+ {
+       return pfree( ptr );
+ }
+ 
  /*
   * Create a set of logical tapes in a temporary underlying file.
   *
***************
*** 556,561 ****
--- 574,586 ----
                lt->curBlockNumber = 0L;
                lt->pos = 0;
                lt->nbytes = 0;
+               
+               memset( &lt->zstream, 0, sizeof(lt->zstream) );
+               lt->zstream.zalloc = lts_zalloc;
+               lt->zstream.zfree = lts_zfree;
+               lt->zstream.opaque = CurrentMemoryContext;
+               
+               deflateInit( &lt->zstream, COMPRESSION_LEVEL );  /* Fast 
compression */
        }
        return lts;
  }
***************
*** 627,633 ****
                                 void *ptr, size_t size)
  {
        LogicalTape *lt;
-       size_t          nthistime;
  
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
--- 652,657 ----
***************
*** 643,650 ****
                lt->indirect->nextup = NULL;
        }
  
!       while (size > 0)
        {
                if (lt->pos >= BLCKSZ)
                {
                        /* Buffer full, dump it out */
--- 667,678 ----
                lt->indirect->nextup = NULL;
        }
  
!       lt->zstream.next_in = ptr;
!       lt->zstream.avail_in = size;
!       
!       while (lt->zstream.avail_in > 0)
        {
+               int err;
                if (lt->pos >= BLCKSZ)
                {
                        /* Buffer full, dump it out */
***************
*** 661,680 ****
                        lt->nbytes = 0;
                }
  
!               nthistime = BLCKSZ - lt->pos;
!               if (nthistime > size)
!                       nthistime = size;
!               Assert(nthistime > 0);
  
!               memcpy(lt->buffer + lt->pos, ptr, nthistime);
  
                lt->dirty = true;
!               lt->pos += nthistime;
!               if (lt->nbytes < lt->pos)
!                       lt->nbytes = lt->pos;
!               ptr = (void *) ((char *) ptr + nthistime);
!               size -= nthistime;
        }
  }
  
  /*
--- 689,741 ----
                        lt->nbytes = 0;
                }
  
!               lt->zstream.next_out = lt->buffer + lt->pos;
!               lt->zstream.avail_out = BLCKSZ - lt->pos;
!               err = deflate( &lt->zstream, Z_NO_FLUSH );
!               lt->pos = BLCKSZ - lt->zstream.avail_out;
!               lt->nbytes = lt->pos;
!               lt->dirty = true;
!               
!               if( err != Z_OK )
!                       elog(ERROR, "deflate returned error: %d\n", err );
!       }
! }
  
! /* Flushes any data still present in the buffers of the compression module. 
!  * Only necessary for writing (deflating), for reading we don't particularly
!  * care about unflished data */
  
+ static void 
+ ltsFlush( LogicalTapeSet *lts, LogicalTape *lt)
+ {
+       int err;
+       
+       if( !lt->dirty )
+               return;
+               
+       lt->zstream.next_in = NULL;
+       lt->zstream.avail_in = 0;
+ 
+       for(;;)
+       {
+               lt->zstream.next_out = lt->buffer + lt->pos;
+               lt->zstream.avail_out = BLCKSZ - lt->pos;
+               err = deflate( &lt->zstream, Z_FINISH );
                lt->dirty = true;
! 
!               if( err == Z_STREAM_END )
!                       break;
!               Assert(err == Z_OK);
!               Assert(lt->zstream.avail_out == 0);
!               ltsDumpBuffer(lts, lt);
!               lt->numFullBlocks++;
!               lt->curBlockNumber++;
!               lt->nbytes = 0;
!               lt->pos = 0;
        }
+       lt->pos = BLCKSZ - lt->zstream.avail_out;
+       lt->nbytes = lt->pos;
+       return;
  }
  
  /*
***************
*** 692,697 ****
--- 753,761 ----
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
  
+       lt->zstream.next_in = NULL;
+       lt->zstream.avail_in = 0;
+ 
        if (!forWrite)
        {
                if (lt->writing)
***************
*** 702,711 ****
--- 766,781 ----
                         * (destructive) read.
                         */
                        if (lt->dirty)
+                       {
+                               ltsFlush(lts, lt);
                                ltsDumpBuffer(lts, lt);
+                       }
                        lt->lastBlockBytes = lt->nbytes;
                        lt->writing = false;
                        datablocknum = ltsRewindIndirectBlock(lts, 
lt->indirect, false);
+                       
+                       deflateEnd( &lt->zstream );
+                       inflateInit( &lt->zstream );
                }
                else
                {
***************
*** 715,720 ****
--- 785,791 ----
                         */
                        Assert(lt->frozen);
                        datablocknum = ltsRewindFrozenIndirectBlock(lts, 
lt->indirect);
+                       inflateReset( &lt->zstream );
                }
                /* Read the first block, or reset if tape is empty */
                lt->curBlockNumber = 0L;
***************
*** 761,766 ****
--- 832,840 ----
                lt->curBlockNumber = 0L;
                lt->pos = 0;
                lt->nbytes = 0;
+               
+               inflateEnd( &lt->zstream );
+               deflateInit( &lt->zstream, COMPRESSION_LEVEL );
        }
  }
  
***************
*** 774,788 ****
                                void *ptr, size_t size)
  {
        LogicalTape *lt;
!       size_t          nread = 0;
!       size_t          nthistime;
  
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
        Assert(!lt->writing);
  
!       while (size > 0)
        {
                if (lt->pos >= lt->nbytes)
                {
                        /* Try to load more data into buffer. */
--- 848,865 ----
                                void *ptr, size_t size)
  {
        LogicalTape *lt;
! //    size_t          nread = 0;
  
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
        Assert(!lt->writing);
  
!       lt->zstream.next_out = ptr;
!       lt->zstream.avail_out = size;
!       
!       while (lt->zstream.avail_out > 0)
        {
+               int err;
                if (lt->pos >= lt->nbytes)
                {
                        /* Try to load more data into buffer. */
***************
*** 801,821 ****
                        if (lt->nbytes <= 0)
                                break;                  /* EOF (possible here?) 
*/
                }
! 
!               nthistime = lt->nbytes - lt->pos;
!               if (nthistime > size)
!                       nthistime = size;
!               Assert(nthistime > 0);
! 
!               memcpy(ptr, lt->buffer + lt->pos, nthistime);
! 
!               lt->pos += nthistime;
!               ptr = (void *) ((char *) ptr + nthistime);
!               size -= nthistime;
!               nread += nthistime;
        }
  
!       return nread;
  }
  
  /*
--- 878,895 ----
                        if (lt->nbytes <= 0)
                                break;                  /* EOF (possible here?) 
*/
                }
!               
!               lt->zstream.next_in = lt->buffer + lt->pos;
!               lt->zstream.avail_in = lt->nbytes - lt->pos;
!               err = inflate( &lt->zstream, Z_NO_FLUSH );
!               lt->pos = lt->nbytes - lt->zstream.avail_in;
!               /* Here we can detect the end of the compressed stream,
!                * can't do anything with that information though */
!               if( err != Z_OK && err != Z_STREAM_END)
!                       elog(ERROR, "Inflate returned error: %d\n", err);
        }
  
!       return size - lt->zstream.avail_out;
  }
  
  /*
***************
*** 844,850 ****
--- 918,927 ----
         * partial indirect blocks, rewind for nondestructive read.
         */
        if (lt->dirty)
+       {
+               ltsFlush(lts, lt);
                ltsDumpBuffer(lts, lt);
+       }
        lt->lastBlockBytes = lt->nbytes;
        lt->writing = false;
        lt->frozen = true;
***************
*** 853,858 ****
--- 930,939 ----
        lt->curBlockNumber = 0L;
        lt->pos = 0;
        lt->nbytes = 0;
+                       
+       deflateEnd( &lt->zstream );
+       inflateInit( &lt->zstream );
+ 
        if (datablocknum != -1L)
        {
                ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
***************
*** 879,884 ****
--- 960,967 ----
        long            nblocks;
        int                     newpos;
  
+       elog(PANIC,"LogicalTapeBackspace on compressed file not yet 
implemented");
+       
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
        Assert(lt->frozen);
***************
*** 944,949 ****
--- 1027,1034 ----
  {
        LogicalTape *lt;
  
+       elog(PANIC,"LogicalTapeSeek on compressed file not yet implemented");
+       
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
        Assert(lt->frozen);
***************
*** 1007,1012 ****
--- 1092,1099 ----
  {
        LogicalTape *lt;
  
+       elog(PANIC,"LogicalTapeTell on compressed file not yet implemented");
+       
        Assert(tapenum >= 0 && tapenum < lts->nTapes);
        lt = &lts->tapes[tapenum];
        *blocknum = lt->curBlockNumber;

Attachment: signature.asc
Description: Digital signature

Reply via email to