Greetings, Attached is a patch to add a 'COMPRESSED' option to COPY which will cause COPY to expect a gzip'd file on input and which will output a gzip'd file on output. Included is support for backend COPY, psql's \copy, regression tests for both, and documentation.
On top of this I plan to submit a trivial patch to add support for this to file_fdw, allowing creation of FDW tables which operate directly on compressed files (including CSVs, which is what I need this patch for). I've also begun working on a patch to allow this capability to be used through pg_dump/pg_restore which would reduce the bandwidth used between the client and the server for backups and restores. Ideally, one would also be able to use custom format dumps, with compression, even if the client-side pg_dump/pg_restore wasn't compiled with zlib support. Thanks, Stephen
colordiff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml new file mode 100644 index 6a0fabc..5c58dd2 *** a/doc/src/sgml/ref/copy.sgml --- b/doc/src/sgml/ref/copy.sgml *************** COPY { <replaceable class="parameter">ta *** 38,43 **** --- 38,44 ---- DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>' NULL '<replaceable class="parameter">null_string</replaceable>' HEADER [ <replaceable class="parameter">boolean</replaceable> ] + COMPRESSED [ <replaceable class="parameter">boolean</replaceable> ] QUOTE '<replaceable class="parameter">quote_character</replaceable>' ESCAPE '<replaceable class="parameter">escape_character</replaceable>' FORCE_QUOTE { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } *************** COPY { <replaceable class="parameter">ta *** 254,259 **** --- 255,271 ---- </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>COMPRESSED</literal></term> + <listitem> + <para> + Specifies that the file contents are compressed using zlib. On input, + the data should be compressed with zlib (eg: using gzip). On output, + the resulting data will be the compressed contents of the table. + </para> + </listitem> + </varlistentry> <varlistentry> <term><literal>QUOTE</literal></term> colordiff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c new file mode 100644 index abd82cf..1f394de *** a/src/backend/commands/copy.c --- b/src/backend/commands/copy.c *************** *** 19,24 **** --- 19,27 ---- #include <sys/stat.h> #include <netinet/in.h> #include <arpa/inet.h> + #ifdef HAVE_LIBZ + #include <zlib.h> + #endif #include "access/heapam.h" #include "access/htup_details.h" *************** *** 59,66 **** typedef enum CopyDest { COPY_FILE, /* to/from file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ ! COPY_NEW_FE /* to/from frontend (3.0 protocol) */ } CopyDest; /* --- 62,71 ---- typedef enum CopyDest { COPY_FILE, /* to/from file */ + COPY_GZFILE, /* to/from compressed file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ ! COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ ! COPY_NEW_FE_COMPRESSED /* FE 3.0 protocol, compressed */ } CopyDest; /* *************** typedef struct CopyStateData *** 95,100 **** --- 100,110 ---- /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ + #ifdef HAVE_LIBZ + gzFile copy_gzfile; /* used if copy_dest == COPY_GZFILE */ + z_stream *zstrm; /* used if streaming compressed data */ + char *zstrm_buf; /* used if streaming compressed data */ + #endif StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ bool fe_eof; /* true if detected end of copy data */ *************** typedef struct CopyStateData *** 109,114 **** --- 119,125 ---- List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ bool binary; /* binary format? */ + bool compressed; /* compressed file? */ bool oids; /* include OIDs? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ *************** static bool CopyGetInt32(CopyState cstat *** 320,325 **** --- 331,437 ---- static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); + #ifdef HAVE_LIBZ + /* Helper functions for working with zlib */ + static void zstrm_init(CopyState cstate, bool is_from); + void *zstrm_palloc(void *curr_memctx, unsigned int num, unsigned int size); + void zstrm_pfree(void *opaque, void *addr); + + /* + * Define some useful constants. + * We're just using the default windowBits, but we have to combine it + * with 16 to turn on gzip encoding, which we want to use. + */ + #define DEF_WINDOW_BITS 15 + #define GZIP_ENCODING 16 + #define DEF_MEMLEVEL 8 + + /* + * Our wrapper to be like standard palloc(), which we later pass to zlib + * Note that we can't just use palloc() since it's a #define to + * MemoryContextAlloc. + */ + void * + zstrm_palloc(void *curr_memctx, unsigned int num, unsigned int size) + { + return MemoryContextAlloc(curr_memctx, num*size); + } + + /* + * Do-nothing function to pass as free() + * No need to actually pfree() as the memory context will be released. + */ + void + zstrm_pfree(void *opaque, void *addr) + { + opaque = NULL; /* not used */ + addr = NULL; /* will be handled by the context being released */ + + return; + } + + /* Set up zstrm for streaming data */ + static void + zstrm_init(CopyState cstate, bool is_from) + { + /* Allocate storage for our z_stream and buffer */ + cstate->zstrm = palloc(sizeof(z_stream)); + cstate->zstrm_buf = palloc(RAW_BUF_SIZE); + + /* Have zlib use our malloc/free functions */ + cstate->zstrm->zalloc = &zstrm_palloc; + cstate->zstrm->zfree = &zstrm_pfree; + + /* Provide the current memory context, used by MemoryConextAlloc() */ + cstate->zstrm->opaque = CurrentMemoryContext; + + /* Initialize avail_in- there's never data ready right away... */ + cstate->zstrm->avail_in = 0; + + /* + * Initialize our pointers and structures based on if we are going + * doing decompression (from) or compression (to) + */ + if (is_from) + { + /* + * With decompression, we set up the input buffer as where we will + * load our data in to and then ask inflate() to process the + * compressed data and store the uncompressed results in the + * caller's buffer + */ + /* Data will eventually be coming from our buffer */ + cstate->zstrm->next_in = (unsigned char*) cstate->zstrm_buf; + + /* Output space not available yet */ + cstate->zstrm->next_out = Z_NULL; + cstate->zstrm->avail_out = 0; + if (inflateInit2(cstate->zstrm, + DEF_WINDOW_BITS | GZIP_ENCODING) != Z_OK) + elog(ERROR, "Error initializing zlib stream."); + } + else + { + /* + * With compression, we set up the output buffer as where we will + * receive data from the deflate() function, to send to the client; + * the input data comes from the caller sending us uncompressed data. + */ + /* No data available initially */ + cstate->zstrm->next_in = Z_NULL; + + /* Data will eventually be going to our buffer */ + cstate->zstrm->next_out = (unsigned char*) cstate->zstrm_buf; + cstate->zstrm->avail_out = RAW_BUF_SIZE; + if (deflateInit2(cstate->zstrm, Z_DEFAULT_COMPRESSION, + Z_DEFLATED, DEF_WINDOW_BITS | GZIP_ENCODING, + DEF_MEMLEVEL, Z_DEFAULT_STRATEGY) != Z_OK) + elog(ERROR, "Error initializing zlib stream."); + } + + return; + } + #endif /* * Send copy start/stop messages for frontend copies. These have changed *************** SendCopyBegin(CopyState cstate) *** 333,339 **** /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); ! int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'H'); --- 445,451 ---- /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); ! int16 format = (cstate->binary || cstate->compressed ? 1 : 0); int i; pq_beginmessage(&buf, 'H'); *************** SendCopyBegin(CopyState cstate) *** 342,348 **** for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); ! cstate->copy_dest = COPY_NEW_FE; } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { --- 454,473 ---- for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); ! ! if (cstate->compressed) ! { ! #ifdef HAVE_LIBZ ! cstate->copy_dest = COPY_NEW_FE_COMPRESSED; ! ! /* Initialize our zstream */ ! zstrm_init(cstate,false); ! #else ! elog(ERROR, "Not compiled with zlib support."); ! #endif ! } ! else ! cstate->copy_dest = COPY_NEW_FE; } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { *************** ReceiveCopyBegin(CopyState cstate) *** 378,384 **** /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); ! int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'G'); --- 503,509 ---- /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); ! int16 format = (cstate->binary || cstate->compressed ? 1 : 0); int i; pq_beginmessage(&buf, 'G'); *************** ReceiveCopyBegin(CopyState cstate) *** 387,393 **** for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); ! cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) --- 512,532 ---- for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); ! ! if (cstate->compressed) ! { ! #ifdef HAVE_LIBZ ! cstate->copy_dest = COPY_NEW_FE_COMPRESSED; ! ! /* Initialize our zstream */ ! zstrm_init(cstate,true); ! #else ! elog(ERROR, "Not compiled with zlib support."); ! #endif ! } ! else ! cstate->copy_dest = COPY_NEW_FE; ! cstate->fe_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) *************** SendCopyEnd(CopyState cstate) *** 424,429 **** --- 563,593 ---- /* Send Copy Done message */ pq_putemptymessage('c'); } + else if (cstate->copy_dest == COPY_NEW_FE_COMPRESSED) + { + #ifdef HAVE_LIBZ + int result; + + /* Finish sending data */ + cstate->zstrm->next_in = (unsigned char*) cstate->fe_msgbuf->data; + cstate->zstrm->avail_in = cstate->fe_msgbuf->len; + + do { + result = deflate(cstate->zstrm, Z_FINISH); + if (result != Z_OK && result != Z_STREAM_END) + elog(ERROR, "Error finishing compressed stream."); + + (void) pq_putmessage('d', (char*) cstate->zstrm_buf, RAW_BUF_SIZE - cstate->zstrm->avail_out); + + /* ensure we empty the buffer */ + } while (result == Z_OK || cstate->zstrm->avail_in > 0); + + /* Send Copy Done message */ + pq_putemptymessage('c'); + #else + elog(ERROR, "Not compiled with zlib support."); + #endif + } else { CopySendData(cstate, "\\.", 2); *************** CopySendEndOfRow(CopyState cstate) *** 486,491 **** --- 650,676 ---- (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); break; + case COPY_GZFILE: + if (!cstate->binary) + { + /* Default line termination depends on platform */ + #ifndef WIN32 + CopySendChar(cstate, '\n'); + #else + CopySendString(cstate, "\r\n"); + #endif + } + + #ifdef HAVE_LIBZ + if (gzwrite(cstate->copy_gzfile, fe_msgbuf->data, + fe_msgbuf->len) != fe_msgbuf->len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to COPY file: %m"))); + #else + elog(ERROR, "Not compiled with zlib support."); + #endif + break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) *************** CopySendEndOfRow(CopyState cstate) *** 507,512 **** --- 692,731 ---- /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_NEW_FE_COMPRESSED: + #ifdef HAVE_LIBZ + /* The FE/BE protocol uses \n as newline for all platforms */ + if (!cstate->binary) + CopySendChar(cstate, '\n'); + + /* We should always have some room for more data */ + Assert(cstate->zstrm->avail_out > 0); + + cstate->zstrm->next_in = (unsigned char*) fe_msgbuf->data; + cstate->zstrm->avail_in = fe_msgbuf->len; + + /* Dump the accumulated row as one CopyData message */ + do { + if (deflate(cstate->zstrm, Z_NO_FLUSH) != Z_OK) + elog(ERROR, "Error during compression"); + + /* Once avail_out is full, send it */ + if (cstate->zstrm->avail_out == 0) + { + (void) pq_putmessage('d', (char*) cstate->zstrm_buf, RAW_BUF_SIZE); + + /* Reset the buffer */ + cstate->zstrm->avail_out = RAW_BUF_SIZE; + cstate->zstrm->next_out = (unsigned char*) cstate->zstrm_buf; + } + } while (cstate->zstrm->avail_in > 0); /* ensure we empty buffer */ + #else + elog(ERROR, "Not compiled with zlib support."); + #endif + break; + default: + /* Should never reach here */ + Assert(false); } resetStringInfo(fe_msgbuf); *************** CopyGetData(CopyState cstate, void *data *** 539,544 **** --- 758,774 ---- (errcode_for_file_access(), errmsg("could not read from COPY file: %m"))); break; + case COPY_GZFILE: + #ifdef HAVE_LIBZ + bytesread = gzread(cstate->copy_gzfile, databuf, maxread); + if (bytesread == -1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from COPY file: %m"))); + #else + elog(ERROR, "Not compiled with zlib support."); + #endif + break; case COPY_OLD_FE: /* *************** CopyGetData(CopyState cstate, void *data *** 557,562 **** --- 787,794 ---- } bytesread = minread; break; + case COPY_NEW_FE_COMPRESSED: + /* Fall through and handle as part of COPY_NEW_FE case */ case COPY_NEW_FE: while (maxread > 0 && bytesread < minread && !cstate->fe_eof) { *************** CopyGetData(CopyState cstate, void *data *** 609,621 **** break; } } ! avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; ! if (avail > maxread) ! avail = maxread; ! pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); ! databuf = (void *) ((char *) databuf + avail); ! maxread -= avail; ! bytesread += avail; } break; } --- 841,918 ---- break; } } ! if (cstate->copy_dest == COPY_NEW_FE_COMPRESSED) ! { ! #ifdef HAVE_LIBZ ! int result; ! ! /* Calculate how much data is *now* available */ ! cstate->zstrm->avail_in = cstate->zstrm->avail_in ! + cstate->fe_msgbuf->len ! - cstate->fe_msgbuf->cursor; ! ! /* ! * The amount of input data should never exceed our buffer ! * size or something has gone awry ! */ ! Assert(((char*) cstate->zstrm->next_in ! + cstate->zstrm->avail_in ! - cstate->zstrm_buf) < RAW_BUF_SIZE); ! ! /* Tack on the next set of input data to our in buffer */ ! pq_copymsgbytes(cstate->fe_msgbuf, ! (char*) cstate->zstrm->next_in, ! cstate->zstrm->avail_in); ! ! /* ! * Tell inflate() to store the uncompressed data into our ! * output buffer ! */ ! cstate->zstrm->next_out = (unsigned char*) databuf; ! cstate->zstrm->avail_out = maxread; ! ! /* decompress input data into our output buffer */ ! result = inflate(cstate->zstrm, Z_NO_FLUSH); ! if (result != Z_OK && result != Z_STREAM_END) ! elog(ERROR, "Error processing compressed input."); ! ! /* ! * Move any left-over data up to the front of our buffer, ! * to make room for the next batch; the amount of left- ! * over data is likely to be pretty small. ! */ ! if (cstate->zstrm->avail_in > 0) ! memmove(cstate->zstrm_buf, ! cstate->zstrm->next_in, ! cstate->zstrm->avail_in); ! ! /* Update our pointer to the next set of input data */ ! cstate->zstrm->next_in = (unsigned char*) cstate->zstrm_buf; ! ! /* Bytes returned from inflate() */ ! avail = maxread - cstate->zstrm->avail_out; ! ! /* ! * Update our databuf pointer, in case we need to provide ! * more data in this call. ! */ ! databuf = (void *) ((char *) databuf + avail); ! maxread -= avail; ! bytesread += avail; ! #else ! elog(ERROR,"Not compiled with zlib support."); ! #endif ! } ! else ! { ! avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; ! if (avail > maxread) ! avail = maxread; ! pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); ! databuf = (void *) ((char *) databuf + avail); ! maxread -= avail; ! bytesread += avail; ! } } break; } *************** ProcessCopyOptions(CopyState cstate, *** 889,894 **** --- 1186,1205 ---- (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY format \"%s\" not recognized", fmt))); } + else if (strcmp(defel->defname, "compressed") == 0) + { + #ifdef HAVE_LIBZ + if (cstate->compressed) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + cstate->compressed = defGetBoolean(defel); + #else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Not compiled with zlib support."))); + #endif + } else if (strcmp(defel->defname, "oids") == 0) { if (cstate->oids) *************** BeginCopy(bool is_from, *** 1379,1385 **** /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); ! cstate->copy_dest = COPY_FILE; /* default */ MemoryContextSwitchTo(oldcontext); --- 1690,1699 ---- /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); ! if (cstate->compressed) ! cstate->copy_dest = COPY_GZFILE; ! else ! cstate->copy_dest = COPY_FILE; /* default */ MemoryContextSwitchTo(oldcontext); *************** BeginCopy(bool is_from, *** 1392,1402 **** static void EndCopy(CopyState cstate) { ! if (cstate->filename != NULL && FreeFile(cstate->copy_file)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", ! cstate->filename))); MemoryContextDelete(cstate->copycontext); pfree(cstate); --- 1706,1734 ---- static void EndCopy(CopyState cstate) { ! if (cstate->filename != NULL) ! { ! if (cstate->compressed) ! { ! #ifdef HAVE_LIBZ ! if (FreeFileGz(cstate->copy_gzfile)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", ! cstate->filename))); ! #else ! elog(ERROR,"Not compiled with zlib support."); ! #endif ! } ! else ! { ! if(FreeFile(cstate->copy_file)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", ! cstate->filename))); ! } ! } MemoryContextDelete(cstate->copycontext); pfree(cstate); *************** BeginCopyTo(Relation rel, *** 1467,1486 **** cstate->filename = pstrdup(filename); oumask = umask(S_IWGRP | S_IWOTH); ! cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); ! umask(oumask); ! ! if (cstate->copy_file == NULL) ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for writing: %m", ! cstate->filename))); ! fstat(fileno(cstate->copy_file), &st); ! if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("\"%s\" is a directory", cstate->filename))); } MemoryContextSwitchTo(oldcontext); --- 1799,1846 ---- cstate->filename = pstrdup(filename); oumask = umask(S_IWGRP | S_IWOTH); ! if (cstate->compressed) ! { ! #ifdef HAVE_LIBZ ! cstate->copy_gzfile = AllocateFileGz(cstate->filename, PG_BINARY_W); ! #else ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("Not compiled with zlib support."))); ! #endif ! } ! else ! cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); ! umask(oumask); ! if (cstate->compressed) ! { ! #ifdef HAVE_LIBZ ! if (cstate->copy_gzfile == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for writing (compressed): %m", ! cstate->filename))); ! #else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("Not compiled with zlib support."))); ! #endif ! } ! else ! { ! if (cstate->copy_file == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for writing: %m", ! cstate->filename))); ! ! fstat(fileno(cstate->copy_file), &st); ! if (S_ISDIR(st.st_mode)) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("\"%s\" is a directory", cstate->filename))); ! } } MemoryContextSwitchTo(oldcontext); *************** BeginCopyFrom(Relation rel, *** 2419,2437 **** struct stat st; cstate->filename = pstrdup(filename); ! cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); ! ! if (cstate->copy_file == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for reading: %m", ! cstate->filename))); ! fstat(fileno(cstate->copy_file), &st); ! if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("\"%s\" is a directory", cstate->filename))); } if (!cstate->binary) --- 2779,2816 ---- struct stat st; cstate->filename = pstrdup(filename); ! if (cstate->compressed) ! { ! #ifdef HAVE_LIBZ ! cstate->copy_gzfile = AllocateFileGz(cstate->filename, PG_BINARY_R); ! if (cstate->copy_gzfile == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for reading: %m", ! cstate->filename))); ! #else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("Not compiled with zlib support."))); ! #endif ! } ! else ! { ! cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); ! ! if (cstate->copy_file == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for reading: %m", ! cstate->filename))); ! ! fstat(fileno(cstate->copy_file), &st); ! if (S_ISDIR(st.st_mode)) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("\"%s\" is a directory", cstate->filename))); ! } } if (!cstate->binary) *************** CopyReadLine(CopyState cstate) *** 2823,2829 **** * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ ! if (cstate->copy_dest == COPY_NEW_FE) { do { --- 3202,3209 ---- * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ ! if (cstate->copy_dest == COPY_NEW_FE || ! cstate->copy_dest == COPY_NEW_FE_COMPRESSED) { do { colordiff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y new file mode 100644 index 4568876..23ebaa7 *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** static void processCASbits(int cas_bits, *** 521,528 **** CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE CLUSTER COALESCE COLLATE COLLATION COLUMN COMMENT COMMENTS COMMIT ! COMMITTED CONCURRENTLY CONFIGURATION CONNECTION CONSTRAINT CONSTRAINTS ! CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE CROSS CSV CURRENT_P CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE --- 521,528 ---- CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE CLUSTER COALESCE COLLATE COLLATION COLUMN COMMENT COMMENTS COMMIT ! COMMITTED COMPRESSED CONCURRENTLY CONFIGURATION CONNECTION CONSTRAINT ! CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE CROSS CSV CURRENT_P CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE *************** copy_opt_item: *** 2403,2408 **** --- 2403,2412 ---- { $$ = makeDefElem("header", (Node *)makeInteger(TRUE)); } + | COMPRESSED + { + $$ = makeDefElem("compressed", (Node *)makeInteger(TRUE)); + } | QUOTE opt_as Sconst { $$ = makeDefElem("quote", (Node *)makeString($3)); *************** unreserved_keyword: *** 12471,12476 **** --- 12475,12481 ---- | COMMENTS | COMMIT | COMMITTED + | COMPRESSED | CONFIGURATION | CONNECTION | CONSTRAINTS colordiff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c new file mode 100644 index 07ee51c..4547285 *** a/src/backend/storage/file/fd.c --- b/src/backend/storage/file/fd.c *************** *** 66,71 **** --- 66,74 ---- #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> /* for getrlimit */ #endif + #ifdef HAVE_LIBZ + #include <zlib.h> + #endif #include "miscadmin.h" #include "access/xact.h" *************** static uint64 temporary_files_size = 0; *** 201,206 **** --- 204,210 ---- typedef enum { AllocateDescFile, + AllocateDescGzFile, AllocateDescDir, AllocateDescRawFD } AllocateDescKind; *************** typedef struct *** 211,216 **** --- 215,223 ---- union { FILE *file; + #ifdef HAVE_LIBZ + gzFile gzfile; + #endif DIR *dir; int fd; } desc; *************** TryAgain: *** 1543,1548 **** --- 1550,1607 ---- return NULL; } + /* + * Like AllocateFile, but uses zlib to open a compressed file. + */ + #ifdef HAVE_LIBZ + gzFile + AllocateFileGz(const char *name, const char *mode) + { + gzFile file; + + DO_DB(elog(LOG, "AllocateFileGz: Allocated %d (%s)", + numAllocatedDescs, name)); + + /* + * The test against MAX_ALLOCATED_DESCS prevents us from overflowing + * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile + * from hogging every one of the available FDs, which'd lead to infinite + * looping. + */ + if (numAllocatedDescs >= MAX_ALLOCATED_DESCS || + numAllocatedDescs >= max_safe_fds - 1) + elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to open file \"%s\"", + name); + + TryAgain: + if ((file = gzopen(name, mode)) != NULL) + { + AllocateDesc *desc = &allocatedDescs[numAllocatedDescs]; + + desc->kind = AllocateDescGzFile; + desc->desc.gzfile = file; + desc->create_subid = GetCurrentSubTransactionId(); + numAllocatedDescs++; + return desc->desc.gzfile; + } + + if (errno == EMFILE || errno == ENFILE) + { + int save_errno = errno; + + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("out of file descriptors: %m; release and retry"))); + errno = 0; + if (ReleaseLruFile()) + goto TryAgain; + errno = save_errno; + } + + return NULL; + } + #endif + /* * Like AllocateFile, but returns an unbuffered fd like open(2) *************** FreeDesc(AllocateDesc *desc) *** 1600,1605 **** --- 1659,1672 ---- case AllocateDescFile: result = fclose(desc->desc.file); break; + case AllocateDescGzFile: + #ifdef HAVE_LIBZ + result = gzclose(desc->desc.gzfile); + #else + elog(ERROR,"AllocateDesc not compiled with zlib support"); + result = 0; /* keep compiler quiet */ + #endif + break; case AllocateDescDir: result = closedir(desc->desc.dir); break; *************** FreeFile(FILE *file) *** 1648,1653 **** --- 1715,1750 ---- } /* + * Close a file returned by AllocateFileGz. + * + * Note we do not check gzclose's return value --- it is up to the caller + * to handle close errors. + */ + #ifdef HAVE_LIBZ + int + FreeFileGz(gzFile file) + { + int i; + + DO_DB(elog(LOG, "FreeFileGz: Allocated %d", numAllocatedDescs)); + + /* Remove file from list of allocated files, if it's present */ + for (i = numAllocatedDescs; --i >= 0;) + { + AllocateDesc *desc = &allocatedDescs[i]; + + if (desc->kind == AllocateDescGzFile && desc->desc.gzfile == file) + return FreeDesc(desc); + } + + /* Only get here if someone passes us a file not in allocatedDescs */ + elog(WARNING, "file passed to FreeFileGz was not obtained from AllocateFile"); + + return gzclose(file); + } + #endif + + /* * Close a file returned by OpenTransientFile. * * Note we do not check close's return value --- it is up to the caller colordiff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h new file mode 100644 index af60dac..2aecd74 *** a/src/include/parser/kwlist.h --- b/src/include/parser/kwlist.h *************** PG_KEYWORD("comment", COMMENT, UNRESERVE *** 85,90 **** --- 85,91 ---- PG_KEYWORD("comments", COMMENTS, UNRESERVED_KEYWORD) PG_KEYWORD("commit", COMMIT, UNRESERVED_KEYWORD) PG_KEYWORD("committed", COMMITTED, UNRESERVED_KEYWORD) + PG_KEYWORD("compressed", COMPRESSED, UNRESERVED_KEYWORD) PG_KEYWORD("concurrently", CONCURRENTLY, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("configuration", CONFIGURATION, UNRESERVED_KEYWORD) PG_KEYWORD("connection", CONNECTION, UNRESERVED_KEYWORD) colordiff --git a/src/include/storage/fd.h b/src/include/storage/fd.h new file mode 100644 index 940d9d4..6cd1ae4 *** a/src/include/storage/fd.h --- b/src/include/storage/fd.h *************** *** 41,46 **** --- 41,49 ---- #include <dirent.h> + #ifdef HAVE_LIBZ + #include <zlib.h> + #endif /* * FileSeek uses the standard UNIX lseek(2) flags. *************** extern char *FilePathName(File file); *** 80,85 **** --- 83,94 ---- extern FILE *AllocateFile(const char *name, const char *mode); extern int FreeFile(FILE *file); + /* Operations for compressed files */ + #ifdef HAVE_LIBZ + extern gzFile AllocateFileGz(const char *name, const char *mode); + extern int FreeFileGz(gzFile file); + #endif + /* Operations to allow use of the <dirent.h> library routines */ extern DIR *AllocateDir(const char *dirname); extern struct dirent *ReadDir(DIR *dir, const char *dirname); colordiff --git a/src/test/regress/input/copy.source b/src/test/regress/input/copy.source new file mode 100644 index ab3f508..5f83d98 *** a/src/test/regress/input/copy.source --- b/src/test/regress/input/copy.source *************** copy copytest2 from '@abs_builddir@/resu *** 91,96 **** --- 91,116 ---- select * from copytest except select * from copytest2; + truncate copytest2; + + --- same test but with compression + + copy copytest to '@abs_builddir@/results/copytest.csv.gz' csv compressed; + + copy copytest2 from '@abs_builddir@/results/copytest.csv.gz' csv compressed; + + select * from copytest except select * from copytest2; + + truncate copytest2; + + --- same test but with compression through psql + + \copy copytest to '@abs_builddir@/results/copytest.csv.gz' csv compressed; + + \copy copytest2 from '@abs_builddir@/results/copytest.csv.gz' csv compressed; + + select * from copytest except select * from copytest2; + -- test header line feature colordiff --git a/src/test/regress/output/copy.source b/src/test/regress/output/copy.source new file mode 100644 index febca71..a6730fe *** a/src/test/regress/output/copy.source --- b/src/test/regress/output/copy.source *************** select * from copytest except select * f *** 61,66 **** --- 61,84 ---- -------+------+-------- (0 rows) + truncate copytest2; + --- same test but with compression + copy copytest to '@abs_builddir@/results/copytest.csv.gz' csv compressed; + copy copytest2 from '@abs_builddir@/results/copytest.csv.gz' csv compressed; + select * from copytest except select * from copytest2; + style | test | filler + -------+------+-------- + (0 rows) + + truncate copytest2; + --- same test but with compression through psql + \copy copytest to '@abs_builddir@/results/copytest.csv.gz' csv compressed; + \copy copytest2 from '@abs_builddir@/results/copytest.csv.gz' csv compressed; + select * from copytest except select * from copytest2; + style | test | filler + -------+------+-------- + (0 rows) + -- test header line feature create temp table copytest3 ( c1 int,
signature.asc
Description: Digital signature