On 29.11.2010 07:11, Joachim Wieland wrote:
On Mon, Nov 22, 2010 at 3:44 PM, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com>  wrote:
* wrap long lines
* use extern in function prototypes in header files
* "inline" some functions like _StartDataCompressor, _EndDataCompressor,
_DoInflate/_DoDeflate  that aren't doing anything but call some other
function.

So here is a new round of patches. It turned out that the feature to
allow to also restore files from a different dump and with a different
compression required some changes in the compressor API. And in the
end I didn't like all the #ifdefs either and made a less #ifdef-rich
version using function pointers.

Ok. The separate InitCompressorState() and AllocateCompressorState() functions seem unnecessary. As the code stands, there's little performance gain from re-using the same CompressorState, just re-initializing it, and I can't see any other justification for them either.

I combined those, and the Free/Flush steps, and did a bunch of other editorializations and cleanups. Here's an updated patch, also available in my git repository at git://git.postgresql.org/git/users/heikki/postgres.git, branch "pg_dump-dir". I'm going to continue reviewing this later, tomorrow hopefully.

The downside now is that I have
created quite a few one-line functions that Heikki doesn't like all
that much, but I assume that they are okay in this case on the grounds
that the public compressor interface is calling the private
implementation of a certain compressor.

You could avoid the wrapper functions by calling the function pointers directly, but I agree it seems neater the way you did it.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/bin/pg_dump/Makefile
--- b/src/bin/pg_dump/Makefile
***************
*** 20,26 **** override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_files.o pg_backup_null.o pg_backup_tar.o \
! 	dumputils.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
--- 20,26 ----
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_files.o pg_backup_null.o pg_backup_tar.o \
! 	dumputils.o compress_io.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
*** /dev/null
--- b/src/bin/pg_dump/compress_io.c
***************
*** 0 ****
--- 1,415 ----
+ /*-------------------------------------------------------------------------
+  *
+  * compress_io.c
+  *   Routines for archivers to write an uncompressed or compressed data
+  *   stream.
+  *
+  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *     src/bin/pg_dump/compress_io.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "compress_io.h"
+ 
+ static const char *modulename = gettext_noop("compress_io");
+ 
+ /* Routines that are private to a specific compressor (static functions) */
+ #ifdef HAVE_LIBZ
+ /* Routines that support zlib compressed data I/O */
+ static void InitCompressorZlib(CompressorState *cs, int compression);
+ static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
+ 								  bool flush);
+ static void ReadDataFromArchiveZlib(ArchiveHandle *AH, CompressorState *cs);
+ static size_t WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
+ 									 const void *data, size_t dLen);
+ static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
+ static CompressorState *AllocateCompressorState(CompressorAction action,
+ 												int compression);
+ 
+ static CompressorFuncs cfs_zlib = {
+ 	InitCompressorZlib,
+ 	ReadDataFromArchiveZlib,
+ 	WriteDataToArchiveZlib,
+ 	EndCompressorZlib
+ };
+ #endif
+ 
+ /* Routines that support uncompressed data I/O */
+ static void InitCompressorNone(CompressorState *cs, int compression);
+ static void ReadDataFromArchiveNone(ArchiveHandle *AH, CompressorState *cs);
+ static size_t WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
+ 									 const void *data, size_t dLen);
+ static void EndCompressorNone(ArchiveHandle *AH, CompressorState *cs);
+ 
+ static CompressorFuncs cfs_none = {
+ 	InitCompressorNone,
+ 	ReadDataFromArchiveNone,
+ 	WriteDataToArchiveNone,
+ 	EndCompressorNone
+ };
+ 
+ /* Allocate a new decompressor */
+ CompressorState *
+ AllocateInflator(int compression, ReadFunc readF)
+ {
+ 	CompressorState *cs;
+ 
+ 	cs = AllocateCompressorState(COMPRESSOR_INFLATE, compression);
+ 	cs->readF = readF;
+ 
+ 	return cs;
+ }
+ 
+ /* Allocate a new compressor */
+ CompressorState *
+ AllocateDeflator(int compression, WriteFunc writeF)
+ {
+ 	CompressorState *cs;
+ 
+ 	cs = AllocateCompressorState(COMPRESSOR_DEFLATE, compression);
+ 	cs->writeF = writeF;
+ 
+ 	return cs;
+ }
+ 
+ static CompressorState *
+ AllocateCompressorState(CompressorAction action, int compression)
+ {
+ 	CompressorState	   *cs;
+ 	CompressorAlgorithm	alg;
+ 
+ 	cs = (CompressorState *) malloc(sizeof(CompressorState));
+ 	if (cs == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 	memset(cs, 0, sizeof(CompressorState));
+ 
+ 	cs->action = action;
+ 
+ 	/*
+ 	 * The compression is set either on the commandline when creating
+ 	 * an archive or by ReadHead() when restoring an archive. It can also be
+ 	 * set on a per-data item basis in the directory archive format.
+ 	 */
+ 	if (compression == Z_DEFAULT_COMPRESSION ||
+ 		(compression > 0 && compression <= 9))
+ 		alg = COMPR_ALG_LIBZ;
+ 	else if (compression == COMPRESSION_NONE)
+ 		alg = COMPR_ALG_NONE;
+ 	else
+ 	{
+ 		die_horribly(NULL, modulename, "Invalid compression code: %d\n",
+ 					 compression);
+ 		alg = COMPR_ALG_NONE; /* keep compiler quiet */
+ 	}
+ 
+ #ifndef HAVE_LIBZ
+ 	/*
+ 	 * So here we are not built with libz support.
+ 	 * For a dump, if no compression was specified issue a warning, and fall
+ 	 * back to no compression.
+ 	 */
+ 	if ((compression > 0 && compression <= 9)
+ 		|| compression == Z_DEFAULT_COMPRESSION)
+ 		if (cs->action == COMPRESSOR_DEFLATE)
+ 		{
+ 			write_msg(modulename, "WARNING: requested compression not available in "
+ 					  "this installation -- archive will be uncompressed\n");
+ 			compression = 0;
+ 			alg = COMPR_ALG_NONE;
+ 		}
+ 
+ 	if (alg == COMPR_ALG_LIBZ)
+ 		die_horribly(NULL, modulename, "not built with zlib support\n");
+ #endif
+ 
+ 	/*
+ 	 * Perform compression algorithm specific initialization.
+ 	 */
+ 	cs->comprAlg = alg;
+ 	switch(cs->comprAlg)
+ 	{
+ #ifdef HAVE_LIBZ
+ 		case COMPR_ALG_LIBZ:
+ 			cs->funcs = cfs_zlib;
+ 			break;
+ #endif
+ 		case COMPR_ALG_NONE:
+ 			cs->funcs = cfs_none;
+ 			break;
+ 		default:
+ 			break;
+ 	}
+ 	cs->funcs.initCompressor(cs, compression);
+ 
+ 	Assert(compression == 0 ?
+ 			 (cs->comprAlg == COMPR_ALG_NONE) :
+ 			 (cs->comprAlg != COMPR_ALG_NONE));
+ 
+ 	return cs;
+ }
+ 
+ /*
+  * Read compressed data from the input stream (via readF).
+  */
+ void
+ ReadDataFromArchive(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	cs->funcs.readDataFromArchive(AH, cs);
+ }
+ 
+ /*
+  * Send compressed data to the output stream (via writeF).
+  */
+ size_t
+ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
+ 				   const void *data, size_t dLen)
+ {
+ 	return cs->funcs.writeDataToArchive(AH, cs, data, dLen);
+ }
+ 
+ /*
+  * Terminate compression library context and flush its buffers. If no
+  * compression library is in use then just return.
+  */
+ void
+ EndCompressorState(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	cs->funcs.endCompressor(AH, cs);
+ 	free(cs);
+ }
+ 
+ #ifdef HAVE_LIBZ
+ /*
+  * Functions for zlib compressed output.
+  */
+ 
+ static void
+ InitCompressorZlib(CompressorState *cs, int compression)
+ {
+ 	z_streamp			zp;
+ 
+ 	zp = cs->zp = (z_streamp) malloc(sizeof(z_stream));
+ 	if (cs->zp == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 
+ 	/*
+ 	 * comprOutInitSize is the buffer size we tell zlib it can output
+ 	 * to.  We actually allocate one extra byte because some routines
+ 	 * want to append a trailing zero byte to the zlib output.  The
+ 	 * input buffer is expansible and is always of size
+ 	 * cs->comprInSize; comprInInitSize is just the initial default
+ 	 * size for it.
+ 	 */
+ 	cs->comprOut = (char *) malloc(comprOutInitSize + 1);
+ 	cs->comprIn = (char *) malloc(comprInInitSize);
+ 	cs->comprInSize = comprInInitSize;
+ 	cs->comprOutSize = comprOutInitSize;
+ 
+ 	if (cs->comprOut == NULL || cs->comprIn == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ 
+ 	zp->zalloc = Z_NULL;
+ 	zp->zfree = Z_NULL;
+ 	zp->opaque = Z_NULL;
+ 
+ 	if (cs->action == COMPRESSOR_DEFLATE)
+ 		if (deflateInit(zp, compression) != Z_OK)
+ 			die_horribly(NULL, modulename,
+ 						 "could not initialize compression library: %s\n",
+ 						 zp->msg);
+ 	if (cs->action == COMPRESSOR_INFLATE)
+ 		if (inflateInit(zp) != Z_OK)
+ 			die_horribly(NULL, modulename,
+ 						 "could not initialize compression library: %s\n",
+ 						 zp->msg);
+ 
+ 	/* Just be paranoid - maybe End is called after Start, with no Write */
+ 	zp->next_out = (void *) cs->comprOut;
+ 	zp->avail_out = comprOutInitSize;
+ }
+ 
+ static void
+ EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	z_streamp			zp = cs->zp;
+ 
+ 	zp->next_in = NULL;
+ 	zp->avail_in = 0;
+ 
+ 	/* Flush any remaining data from zlib buffer */
+ 	DeflateCompressorZlib(AH, cs, true);
+ 
+ 	if (deflateEnd(zp) != Z_OK)
+ 		die_horribly(AH, modulename,
+ 					 "could not close compression stream: %s\n", zp->msg);
+ 
+ 	free(cs->comprOut);
+ 	free(cs->comprIn);
+ 	free(cs->zp);
+ }
+ 
+ static void
+ DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
+ 					  bool flush)
+ {
+ 	z_streamp	zp = cs->zp;
+ 	char	   *out = cs->comprOut;
+ 	int			res = Z_OK;
+ 
+ 	while (cs->zp->avail_in != 0 || flush)
+ 	{
+ 		res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
+ 		if (res == Z_STREAM_ERROR)
+ 			die_horribly(AH, modulename,
+ 						 "could not compress data: %s\n", zp->msg);
+ 		if ((flush && (zp->avail_out < comprOutInitSize))
+ 			|| (zp->avail_out == 0)
+ 			|| (zp->avail_in != 0)
+ 			)
+ 		{
+ 			/*
+ 			 * Extra paranoia: avoid zero-length chunks, since a zero length
+ 			 * chunk is the EOF marker in the custom format. This should never
+ 			 * happen but...
+ 			 */
+ 			if (zp->avail_out < comprOutInitSize)
+ 			{
+ 				/*
+ 				 * Any write function shoud do its own error checking but
+ 				 * to make sure we do a check here as well...
+ 				 */
+ 				size_t len = comprOutInitSize - zp->avail_out;
+ 				if (cs->writeF(AH, out, len) != len)
+ 					die_horribly(AH, modulename,
+ 								 "could not write to output file: %s\n",
+ 								 strerror(errno));
+ 			}
+ 			zp->next_out = (void *) out;
+ 			zp->avail_out = comprOutInitSize;
+ 		}
+ 
+ 		if (res == Z_STREAM_END)
+ 			break;
+ 	}
+ }
+ 
+ static void
+ ReadDataFromArchiveZlib(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	z_streamp	zp = cs->zp;
+ 	char	   *out = cs->comprOut;
+ 	int			res = Z_OK;
+ 	size_t		cnt;
+ 	void	   *in;
+ 
+ 	/* no minimal chunk size for zlib */
+ 	while ((cnt = cs->readF(AH, &in, 0)))
+ 	{
+ 		zp->next_in = (void *) in;
+ 		zp->avail_in = cnt;
+ 
+ 		while (zp->avail_in > 0)
+ 		{
+ 			zp->next_out = (void *) out;
+ 			zp->avail_out = comprOutInitSize;
+ 
+ 			res = inflate(zp, 0);
+ 			if (res != Z_OK && res != Z_STREAM_END)
+ 				die_horribly(AH, modulename,
+ 							 "could not uncompress data: %s\n", zp->msg);
+ 
+ 			out[comprOutInitSize - zp->avail_out] = '\0';
+ 			ahwrite(out, 1, comprOutInitSize - zp->avail_out, AH);
+ 		}
+ 	}
+ 
+ 	zp->next_in = NULL;
+ 	zp->avail_in = 0;
+ 	while (res != Z_STREAM_END)
+ 	{
+ 		zp->next_out = (void *) out;
+ 		zp->avail_out = comprOutInitSize;
+ 		res = inflate(zp, 0);
+ 		if (res != Z_OK && res != Z_STREAM_END)
+ 			die_horribly(AH, modulename,
+ 						 "could not uncompress data: %s\n", zp->msg);
+ 
+ 		out[comprOutInitSize - zp->avail_out] = '\0';
+ 		ahwrite(out, 1, comprOutInitSize - zp->avail_out, AH);
+ 	}
+ 
+ 	if (inflateEnd(zp) != Z_OK)
+ 		die_horribly(AH, modulename,
+ 					 "could not close compression library: %s\n", zp->msg);
+ }
+ 
+ static size_t
+ WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
+ 					   const void *data, size_t dLen)
+ {
+ 	cs->zp->next_in = (void *) data;
+ 	cs->zp->avail_in = dLen;
+ 	DeflateCompressorZlib(AH, cs, false);
+ 	/* we have either succeeded in writing dLen bytes or we have called
+ 	 * die_horribly() */
+ 	return dLen;
+ }
+ 
+ #endif  /* HAVE_LIBZ */
+ 
+ 
+ /*
+  * Functions for uncompressed output.
+  */
+ static void
+ InitCompressorNone(CompressorState *cs, int compression)
+ {
+ 	cs->comprOut = (char *) malloc(comprOutInitSize + 1);
+ 	cs->comprIn = (char *) malloc(comprInInitSize);
+ 	cs->comprInSize = comprInInitSize;
+ 	cs->comprOutSize = comprOutInitSize;
+ 
+ 	if (cs->comprOut == NULL || cs->comprIn == NULL)
+ 		die_horribly(NULL, modulename, "out of memory\n");
+ }
+ 
+ static void
+ ReadDataFromArchiveNone(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	size_t	cnt;
+ 	void   *in;
+ 
+ 	/* no minimal chunk size for uncompressed data */
+ 	while ((cnt = cs->readF(AH, &in, 0)))
+ 	{
+ 		ahwrite(in, 1, cnt, AH);
+ 	}
+ }
+ 
+ static size_t
+ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
+ 					   const void *data, size_t dLen)
+ {
+ 	/*
+ 	 * Any write function shoud do its own error checking but to make
+ 	 * sure we do a check here as well...
+ 	 */
+ 	if (cs->writeF(AH, data, dLen) != dLen)
+ 		die_horribly(AH, modulename,
+ 					 "could not write to output file: %s\n",
+ 					 strerror(errno));
+ 	return dLen;
+ }
+ 
+ static void
+ EndCompressorNone(ArchiveHandle *AH, CompressorState *cs)
+ {
+ 	free(cs->comprOut);
+ 	free(cs->comprIn);
+ }
+ 
+ 
*** /dev/null
--- b/src/bin/pg_dump/compress_io.h
***************
*** 0 ****
--- 1,94 ----
+ /*-------------------------------------------------------------------------
+  *
+  * compress_io.h
+  *   Routines for archivers to write an uncompressed or compressed data
+  *   stream.
+  *
+  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *	pg_dump will read the system catalogs in a database and dump out a
+  *	script that reproduces the schema in terms of SQL that is understood
+  *	by PostgreSQL
+  *
+  * IDENTIFICATION
+  *     XXX
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "pg_backup_archiver.h"
+ 
+ #define comprOutInitSize 65536
+ #define comprInInitSize	 65536
+ 
+ struct _CompressorState;
+ 
+ typedef enum
+ {
+ 	COMPRESSOR_INFLATE,
+ 	COMPRESSOR_DEFLATE
+ } CompressorAction;
+ 
+ typedef enum
+ {
+ 	COMPR_ALG_NONE,
+ 	COMPR_ALG_LIBZ
+ } CompressorAlgorithm;
+ 
+ typedef size_t (*WriteFunc)(ArchiveHandle *AH, const void *buf, size_t len);
+ /*
+  * The sizeHint parameter tells the format which size is required for the
+  * algorithm.  If the format doesn't know better it should send back that many
+  * bytes of input.  If the format was written by blocks however, then the
+  * format already knows the block size and can deliver exactly the size of the
+  * next block.
+  *
+  * The custom archive is written in such blocks.  The directory archive however
+  * is just a continuous stream of data. Other compressed formats than libz
+  * however deal with blocks on the algorithm level and then the algorithm is
+  * able to tell the format the amount of data that it is ready to consume next.
+  */
+ typedef size_t (*ReadFunc)(ArchiveHandle *AH, void **buf, size_t sizeHint);
+ 
+ typedef void (*InitCompressorPtr)(struct _CompressorState *cs, int compression);
+ typedef void (*ReadDataFromArchivePtr)(ArchiveHandle *AH,
+ 									   struct _CompressorState *cs);
+ typedef size_t (*WriteDataToArchivePtr)(ArchiveHandle *AH,
+ 										struct _CompressorState *cs,
+ 										const void *data, size_t dLen);
+ typedef void (*EndCompressorPtr)(ArchiveHandle *AH,
+ 								 struct _CompressorState *cs);
+ 
+ typedef struct
+ {
+ 	InitCompressorPtr			initCompressor;
+ 	ReadDataFromArchivePtr		readDataFromArchive;
+ 	WriteDataToArchivePtr		writeDataToArchive;
+ 	EndCompressorPtr			endCompressor;
+ } CompressorFuncs;
+ 
+ typedef struct _CompressorState
+ {
+ 	CompressorAlgorithm comprAlg;
+ 	ReadFunc			readF;
+ 	WriteFunc			writeF;
+ #ifdef HAVE_LIBZ
+ 	z_streamp			zp;
+ #endif
+ 	char			   *comprOut;
+ 	char			   *comprIn;
+ 	size_t				comprInSize;
+ 	size_t				comprOutSize;
+ 	CompressorAction	action;
+ 	CompressorFuncs		funcs;
+ } CompressorState;
+ 
+ extern CompressorState *AllocateInflator(int compression, ReadFunc readF);
+ extern CompressorState *AllocateDeflator(int compression, WriteFunc writeF);
+ extern void ReadDataFromArchive(ArchiveHandle *AH, CompressorState *cs);
+ extern size_t WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
+ 								 const void *data, size_t dLen);
+ extern void EndCompressorState(ArchiveHandle *AH, CompressorState *cs);
+ extern void FreeCompressorState(CompressorState *cs);
+ 
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 22,27 ****
--- 22,28 ----
  
  #include "pg_backup_db.h"
  #include "dumputils.h"
+ #include "compress_io.h"
  
  #include <ctype.h>
  #include <unistd.h>
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
***************
*** 49,54 ****
--- 49,55 ----
  #define GZCLOSE(fh) fclose(fh)
  #define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
  #define GZREAD(p, s, n, fh) fread(p, s, n, fh)
+ /* this is just the redefinition of a libz constant */
  #define Z_DEFAULT_COMPRESSION (-1)
  
  typedef struct _z_stream
***************
*** 61,66 **** typedef struct _z_stream
--- 62,76 ----
  typedef z_stream *z_streamp;
  #endif
  
+ /* XXX eventually this should be an enum. However if we want something
+  * pluggable in the long run it can get hard to add values to a central
+  * enum from the plugins... */
+ #define COMPRESSION_UNKNOWN (-2)
+ #define COMPRESSION_NONE 0
+ 
+ /* XXX should we change the archive version for pg_dump with directory support?
+  * XXX We are not actually modifying the existing formats, but on the other hand
+  * XXX a file could now be compressed with liblzf. */
  /* Current archive version number (the format we can output) */
  #define K_VERS_MAJOR 1
  #define K_VERS_MINOR 12
***************
*** 267,272 **** typedef struct _archiveHandle
--- 277,288 ----
  
  	struct _tocEntry *currToc;	/* Used when dumping data */
  	int			compression;	/* Compression requested on open */
+ 								/* Possible values for compression:
+ 								   -2   COMPRESSION_UNKNOWN
+ 								   -1   Z_DEFAULT_COMPRESSION
+ 								    0	COMPRESSION_NONE
+ 								   1-9	levels for gzip compression
+ 								*/
  	ArchiveMode mode;			/* File mode - r or w */
  	void	   *formatData;		/* Header data specific to file format */
  
***************
*** 381,384 **** int			ahprintf(ArchiveHandle *AH, const char *fmt,...) __attribute__((format(pri
--- 397,411 ----
  
  void		ahlog(ArchiveHandle *AH, int level, const char *fmt,...) __attribute__((format(printf, 3, 4)));
  
+ #ifdef USE_ASSERT_CHECKING
+ #define Assert(condition) \
+ 	if (!(condition)) \
+ 	{ \
+ 		write_msg(NULL, "Failed assertion in %s, line %d\n", \
+ 				  __FILE__, __LINE__); \
+ 		abort();\
+ 	}
+ #else
+ #define Assert(condition)
+ #endif
  #endif
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 25,30 ****
--- 25,31 ----
   */
  
  #include "pg_backup_archiver.h"
+ #include "compress_io.h"
  
  /*--------
   * Routines in the format interface
***************
*** 58,77 **** static void _LoadBlobs(ArchiveHandle *AH, bool drop);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! /*------------
!  * Buffers used in zlib compression and extra data stored in archive and
!  * in TOC entries.
!  *------------
!  */
! #define zlibOutSize 4096
! #define zlibInSize	4096
  
  typedef struct
  {
! 	z_streamp	zp;
! 	char	   *zlibOut;
! 	char	   *zlibIn;
! 	size_t		inSize;
  	int			hasSeek;
  	pgoff_t		filePos;
  	pgoff_t		dataStart;
--- 59,70 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
! static size_t _CustomWriteFunc(ArchiveHandle *AH, const void *buf, size_t len);
! static size_t _CustomReadFunc(ArchiveHandle *AH, void **buf, size_t sizeHint);
  
  typedef struct
  {
! 	CompressorState *cs;
  	int			hasSeek;
  	pgoff_t		filePos;
  	pgoff_t		dataStart;
***************
*** 89,98 **** typedef struct
   *------
   */
  static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
- static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te);
- static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te);
  static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
- static int	_DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush);
  
  static const char *modulename = gettext_noop("custom archiver");
  
--- 82,88 ----
***************
*** 144,174 **** InitArchiveFmt_Custom(ArchiveHandle *AH)
  		die_horribly(AH, modulename, "out of memory\n");
  	AH->formatData = (void *) ctx;
  
- 	ctx->zp = (z_streamp) malloc(sizeof(z_stream));
- 	if (ctx->zp == NULL)
- 		die_horribly(AH, modulename, "out of memory\n");
- 
  	/* Initialize LO buffering */
  	AH->lo_buf_size = LOBBUFSIZE;
  	AH->lo_buf = (void *) malloc(LOBBUFSIZE);
  	if (AH->lo_buf == NULL)
  		die_horribly(AH, modulename, "out of memory\n");
  
- 	/*
- 	 * zlibOutSize is the buffer size we tell zlib it can output to.  We
- 	 * actually allocate one extra byte because some routines want to append a
- 	 * trailing zero byte to the zlib output.  The input buffer is expansible
- 	 * and is always of size ctx->inSize; zlibInSize is just the initial
- 	 * default size for it.
- 	 */
- 	ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
- 	ctx->zlibIn = (char *) malloc(zlibInSize);
- 	ctx->inSize = zlibInSize;
  	ctx->filePos = 0;
  
- 	if (ctx->zlibOut == NULL || ctx->zlibIn == NULL)
- 		die_horribly(AH, modulename, "out of memory\n");
- 
  	/*
  	 * Now open the file
  	 */
--- 134,147 ----
***************
*** 211,216 **** InitArchiveFmt_Custom(ArchiveHandle *AH)
--- 184,191 ----
  		ctx->hasSeek = checkSeek(AH->FH);
  
  		ReadHead(AH);
+ 		ctx->cs = AllocateInflator(AH->compression, _CustomReadFunc);
+ 
  		ReadToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
  	}
***************
*** 324,330 **** _StartData(ArchiveHandle *AH, TocEntry *te)
  	_WriteByte(AH, BLK_DATA);	/* Block type */
  	WriteInt(AH, te->dumpId);	/* For sanity check */
  
! 	_StartDataCompressor(AH, te);
  }
  
  /*
--- 299,305 ----
  	_WriteByte(AH, BLK_DATA);	/* Block type */
  	WriteInt(AH, te->dumpId);	/* For sanity check */
  
! 	ctx->cs = AllocateDeflator(AH->compression, _CustomWriteFunc);
  }
  
  /*
***************
*** 340,356 **** static size_t
  _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
! 
! 	zp->next_in = (void *) data;
! 	zp->avail_in = dLen;
  
! 	while (zp->avail_in != 0)
! 	{
! 		/* printf("Deflating %lu bytes\n", (unsigned long) dLen); */
! 		_DoDeflate(AH, ctx, 0);
! 	}
! 	return dLen;
  }
  
  /*
--- 315,323 ----
  _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	CompressorState	   *cs = ctx->cs;
  
! 	return WriteDataToArchive(AH, cs, data, dLen);
  }
  
  /*
***************
*** 363,372 **** _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
  static void
  _EndData(ArchiveHandle *AH, TocEntry *te)
  {
! /*	lclContext *ctx = (lclContext *) AH->formatData; */
! /*	lclTocEntry *tctx = (lclTocEntry *) te->formatData; */
  
! 	_EndDataCompressor(AH, te);
  }
  
  /*
--- 330,340 ----
  static void
  _EndData(ArchiveHandle *AH, TocEntry *te)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
  
! 	EndCompressorState(AH, ctx->cs);
! 	/* Send the end marker */
! 	WriteInt(AH, 0);
  }
  
  /*
***************
*** 401,411 **** _StartBlobs(ArchiveHandle *AH, TocEntry *te)
  static void
  _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
  	if (oid == 0)
  		die_horribly(AH, modulename, "invalid OID for large object\n");
  
  	WriteInt(AH, oid);
! 	_StartDataCompressor(AH, te);
  }
  
  /*
--- 369,382 ----
  static void
  _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 
  	if (oid == 0)
  		die_horribly(AH, modulename, "invalid OID for large object\n");
  
  	WriteInt(AH, oid);
! 
! 	ctx->cs = AllocateDeflator(AH->compression, _CustomWriteFunc);
  }
  
  /*
***************
*** 416,422 **** _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  static void
  _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
! 	_EndDataCompressor(AH, te);
  }
  
  /*
--- 387,397 ----
  static void
  _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 
! 	EndCompressorState(AH, ctx->cs);
! 	/* Send the end marker */
! 	WriteInt(AH, 0);
  }
  
  /*
***************
*** 533,639 **** static void
  _PrintData(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
- 	z_streamp	zp = ctx->zp;
- 	size_t		blkLen;
- 	char	   *in = ctx->zlibIn;
- 	size_t		cnt;
- 
- #ifdef HAVE_LIBZ
- 	int			res;
- 	char	   *out = ctx->zlibOut;
- #endif
  
! #ifdef HAVE_LIBZ
! 
! 	res = Z_OK;
! 
! 	if (AH->compression != 0)
! 	{
! 		zp->zalloc = Z_NULL;
! 		zp->zfree = Z_NULL;
! 		zp->opaque = Z_NULL;
! 
! 		if (inflateInit(zp) != Z_OK)
! 			die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
! 	}
! #endif
! 
! 	blkLen = ReadInt(AH);
! 	while (blkLen != 0)
! 	{
! 		if (blkLen + 1 > ctx->inSize)
! 		{
! 			free(ctx->zlibIn);
! 			ctx->zlibIn = NULL;
! 			ctx->zlibIn = (char *) malloc(blkLen + 1);
! 			if (!ctx->zlibIn)
! 				die_horribly(AH, modulename, "out of memory\n");
! 
! 			ctx->inSize = blkLen + 1;
! 			in = ctx->zlibIn;
! 		}
! 
! 		cnt = fread(in, 1, blkLen, AH->FH);
! 		if (cnt != blkLen)
! 		{
! 			if (feof(AH->FH))
! 				die_horribly(AH, modulename,
! 							 "could not read from input file: end of file\n");
! 			else
! 				die_horribly(AH, modulename,
! 					"could not read from input file: %s\n", strerror(errno));
! 		}
! 
! 		ctx->filePos += blkLen;
! 
! 		zp->next_in = (void *) in;
! 		zp->avail_in = blkLen;
! 
! #ifdef HAVE_LIBZ
! 		if (AH->compression != 0)
! 		{
! 			while (zp->avail_in != 0)
! 			{
! 				zp->next_out = (void *) out;
! 				zp->avail_out = zlibOutSize;
! 				res = inflate(zp, 0);
! 				if (res != Z_OK && res != Z_STREAM_END)
! 					die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg);
! 
! 				out[zlibOutSize - zp->avail_out] = '\0';
! 				ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
! 			}
! 		}
! 		else
! #endif
! 		{
! 			in[zp->avail_in] = '\0';
! 			ahwrite(in, 1, zp->avail_in, AH);
! 			zp->avail_in = 0;
! 		}
! 		blkLen = ReadInt(AH);
! 	}
! 
! #ifdef HAVE_LIBZ
! 	if (AH->compression != 0)
! 	{
! 		zp->next_in = NULL;
! 		zp->avail_in = 0;
! 		while (res != Z_STREAM_END)
! 		{
! 			zp->next_out = (void *) out;
! 			zp->avail_out = zlibOutSize;
! 			res = inflate(zp, 0);
! 			if (res != Z_OK && res != Z_STREAM_END)
! 				die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg);
! 
! 			out[zlibOutSize - zp->avail_out] = '\0';
! 			ahwrite(out, 1, zlibOutSize - zp->avail_out, AH);
! 		}
! 		if (inflateEnd(zp) != Z_OK)
! 			die_horribly(AH, modulename, "could not close compression library: %s\n", zp->msg);
! 	}
! #endif
  }
  
  static void
--- 508,516 ----
  _PrintData(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  
! 	ctx->cs = AllocateInflator(AH->compression, _CustomReadFunc);
! 	ReadDataFromArchive(AH, ctx->cs);
  }
  
  static void
***************
*** 683,701 **** static void
  _skipData(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  	size_t		blkLen;
! 	char	   *in = ctx->zlibIn;
  	size_t		cnt;
  
  	blkLen = ReadInt(AH);
  	while (blkLen != 0)
  	{
! 		if (blkLen > ctx->inSize)
  		{
! 			free(ctx->zlibIn);
! 			ctx->zlibIn = (char *) malloc(blkLen);
! 			ctx->inSize = blkLen;
! 			in = ctx->zlibIn;
  		}
  		cnt = fread(in, 1, blkLen, AH->FH);
  		if (cnt != blkLen)
--- 560,579 ----
  _skipData(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
+ 	CompressorState *cs = ctx->cs;
  	size_t		blkLen;
! 	char	   *in = cs->comprIn;
  	size_t		cnt;
  
  	blkLen = ReadInt(AH);
  	while (blkLen != 0)
  	{
! 		if (blkLen > cs->comprInSize)
  		{
! 			free(cs->comprIn);
! 			cs->comprIn = (char *) malloc(blkLen);
! 			cs->comprInSize = blkLen;
! 			in = cs->comprIn;
  		}
  		cnt = fread(in, 1, blkLen, AH->FH);
  		if (cnt != blkLen)
***************
*** 960,1105 **** _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
  	*id = ReadInt(AH);
  }
  
! /*
!  * If zlib is available, then startit up. This is called from
!  * StartData & StartBlob. The buffers are setup in the Init routine.
!  */
! static void
! _StartDataCompressor(ArchiveHandle *AH, TocEntry *te)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
  
! #ifdef HAVE_LIBZ
  
! 	if (AH->compression < 0 || AH->compression > 9)
! 		AH->compression = Z_DEFAULT_COMPRESSION;
! 
! 	if (AH->compression != 0)
! 	{
! 		zp->zalloc = Z_NULL;
! 		zp->zfree = Z_NULL;
! 		zp->opaque = Z_NULL;
! 
! 		if (deflateInit(zp, AH->compression) != Z_OK)
! 			die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg);
! 	}
! #else
! 
! 	AH->compression = 0;
! #endif
! 
! 	/* Just be paranoid - maybe End is called after Start, with no Write */
! 	zp->next_out = (void *) ctx->zlibOut;
! 	zp->avail_out = zlibOutSize;
  }
  
! /*
!  * Send compressed data to the output stream (via ahwrite).
!  * Each data chunk is preceded by it's length.
!  * In the case of Z0, or no zlib, just write the raw data.
!  *
!  */
! static int
! _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush)
  {
! 	z_streamp	zp = ctx->zp;
! 
! #ifdef HAVE_LIBZ
! 	char	   *out = ctx->zlibOut;
! 	int			res = Z_OK;
! 
! 	if (AH->compression != 0)
! 	{
! 		res = deflate(zp, flush);
! 		if (res == Z_STREAM_ERROR)
! 			die_horribly(AH, modulename, "could not compress data: %s\n", zp->msg);
! 
! 		if (((flush == Z_FINISH) && (zp->avail_out < zlibOutSize))
! 			|| (zp->avail_out == 0)
! 			|| (zp->avail_in != 0)
! 			)
! 		{
! 			/*
! 			 * Extra paranoia: avoid zero-length chunks since a zero length
! 			 * chunk is the EOF marker. This should never happen but...
! 			 */
! 			if (zp->avail_out < zlibOutSize)
! 			{
! 				/*
! 				 * printf("Wrote %lu byte deflated chunk\n", (unsigned long)
! 				 * (zlibOutSize - zp->avail_out));
! 				 */
! 				WriteInt(AH, zlibOutSize - zp->avail_out);
! 				if (fwrite(out, 1, zlibOutSize - zp->avail_out, AH->FH) != (zlibOutSize - zp->avail_out))
! 					die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
! 				ctx->filePos += zlibOutSize - zp->avail_out;
! 			}
! 			zp->next_out = (void *) out;
! 			zp->avail_out = zlibOutSize;
! 		}
! 	}
! 	else
! #endif
! 	{
! 		if (zp->avail_in > 0)
! 		{
! 			WriteInt(AH, zp->avail_in);
! 			if (fwrite(zp->next_in, 1, zp->avail_in, AH->FH) != zp->avail_in)
! 				die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
! 			ctx->filePos += zp->avail_in;
! 			zp->avail_in = 0;
! 		}
! 		else
! 		{
! #ifdef HAVE_LIBZ
! 			if (flush == Z_FINISH)
! 				res = Z_STREAM_END;
! #endif
! 		}
! 	}
  
! #ifdef HAVE_LIBZ
! 	return res;
! #else
! 	return 1;
! #endif
! }
  
! /*
!  * Terminate zlib context and flush it's buffers. If no zlib
!  * then just return.
!  */
! static void
! _EndDataCompressor(ArchiveHandle *AH, TocEntry *te)
! {
  
! #ifdef HAVE_LIBZ
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 	z_streamp	zp = ctx->zp;
! 	int			res;
  
! 	if (AH->compression != 0)
  	{
! 		zp->next_in = NULL;
! 		zp->avail_in = 0;
! 
! 		do
! 		{
! 			/* printf("Ending data output\n"); */
! 			res = _DoDeflate(AH, ctx, Z_FINISH);
! 		} while (res != Z_STREAM_END);
  
! 		if (deflateEnd(zp) != Z_OK)
! 			die_horribly(AH, modulename, "could not close compression stream: %s\n", zp->msg);
  	}
! #endif
! 
! 	/* Send the end marker */
! 	WriteInt(AH, 0);
  }
  
- 
  /*
   * Clone format-specific fields during parallel restoration.
   */
--- 838,898 ----
  	*id = ReadInt(AH);
  }
  
! static size_t
! _CustomWriteFunc(ArchiveHandle *AH, const void *buf, size_t len)
  {
! 	Assert(len != 0);
  
! 	/* never write 0-byte blocks (this should not happen) */
! 	if (len == 0)
! 		return 0;
  
! 	WriteInt(AH, len);
! 	return _WriteBuf(AH, buf, len);
  }
  
! static size_t
! _CustomReadFunc(ArchiveHandle *AH, void **buf, size_t sizeHint)
  {
! 	lclContext *ctx = (lclContext *) AH->formatData;
! 	CompressorState *cs = ctx->cs;
! 	size_t		blkLen;
! 	size_t		cnt;
  
!     /*
!      * We deliberately ignore the sizeHint parameter because we know
!      * the exact size of the next compressed block (=blkLen).
!      */
  
! 	blkLen = ReadInt(AH);
  
! 	if (blkLen == 0)
! 		return 0;
  
! 	if (blkLen + 1 > cs->comprInSize)
  	{
! 		free(cs->comprIn);
! 		cs->comprIn = NULL;
! 		cs->comprIn = (char *) malloc(blkLen + 1);
! 		if (!cs->comprIn)
! 			die_horribly(AH, modulename, "out of memory\n");
  
! 		cs->comprInSize = blkLen + 1;
  	}
! 	cnt = _ReadBuf(AH, cs->comprIn, blkLen);
! 	if (cnt != blkLen)
! 	{
! 		if (feof(AH->FH))
! 			die_horribly(AH, modulename,
! 						 "could not read from input file: end of file\n");
! 		else
! 			die_horribly(AH, modulename,
! 				"could not read from input file: %s\n", strerror(errno));
! 	}
! 	*buf = cs->comprIn;
! 	return cnt;
  }
  
  /*
   * Clone format-specific fields during parallel restoration.
   */
***************
*** 1107,1112 **** static void
--- 900,906 ----
  _Clone(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
+ 	CompressorAction action = ctx->cs->action;
  
  	AH->formatData = (lclContext *) malloc(sizeof(lclContext));
  	if (AH->formatData == NULL)
***************
*** 1114,1125 **** _Clone(ArchiveHandle *AH)
  	memcpy(AH->formatData, ctx, sizeof(lclContext));
  	ctx = (lclContext *) AH->formatData;
  
! 	ctx->zp = (z_streamp) malloc(sizeof(z_stream));
! 	ctx->zlibOut = (char *) malloc(zlibOutSize + 1);
! 	ctx->zlibIn = (char *) malloc(ctx->inSize);
! 
! 	if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL)
! 		die_horribly(AH, modulename, "out of memory\n");
  
  	/*
  	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
--- 908,917 ----
  	memcpy(AH->formatData, ctx, sizeof(lclContext));
  	ctx = (lclContext *) AH->formatData;
  
! 	if (action == COMPRESSOR_INFLATE)
! 		ctx->cs = AllocateInflator(AH->compression, _CustomReadFunc);
! 	else
! 		ctx->cs = AllocateDeflator(AH->compression, _CustomWriteFunc);
  
  	/*
  	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
***************
*** 1133,1141 **** static void
  _DeClone(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  
- 	free(ctx->zlibOut);
- 	free(ctx->zlibIn);
- 	free(ctx->zp);
  	free(ctx);
  }
--- 925,934 ----
  _DeClone(ArchiveHandle *AH)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
+ 	CompressorState	   *cs = ctx->cs;
+ 
+ 	EndCompressorState(AH, cs);
  
  	free(ctx);
  }
+ 
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 56,61 ****
--- 56,62 ----
  
  #include "pg_backup_archiver.h"
  #include "dumputils.h"
+ #include "compress_io.h"
  
  extern char *optarg;
  extern int	optind,
***************
*** 255,261 **** main(int argc, char **argv)
  	int			numObjs;
  	int			i;
  	enum trivalue prompt_password = TRI_DEFAULT;
! 	int			compressLevel = -1;
  	int			plainText = 0;
  	int			outputClean = 0;
  	int			outputCreateDB = 0;
--- 256,262 ----
  	int			numObjs;
  	int			i;
  	enum trivalue prompt_password = TRI_DEFAULT;
! 	int			compressLevel = COMPRESSION_UNKNOWN;
  	int			plainText = 0;
  	int			outputClean = 0;
  	int			outputCreateDB = 0;
***************
*** 535,540 **** main(int argc, char **argv)
--- 536,547 ----
  		exit(1);
  	}
  
+ 	/* actually we are using a zlib constant here but formats that don't
+ 	 * support compression won't care and if we are not compiled with zlib
+ 	 * compression we will be forced to no compression anyway. */
+ 	if (compressLevel == COMPRESSION_UNKNOWN)
+ 		compressLevel = Z_DEFAULT_COMPRESSION;
+ 
  	/* open the output file */
  	if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
  	{
***************
*** 2174,2180 **** dumpBlobs(Archive *AH, void *arg)
  					exit_nicely();
  				}
  
! 				WriteData(AH, buf, cnt);
  			} while (cnt > 0);
  
  			lo_close(g_conn, loFd);
--- 2181,2189 ----
  					exit_nicely();
  				}
  
! 				/* we try to avoid writing empty chunks */
! 				if (cnt > 0)
! 					WriteData(AH, buf, cnt);
  			} while (cnt > 0);
  
  			lo_close(g_conn, loFd);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to