Rebased and added some extra logs for testing.
Thanks
From ef3d0b4237e4a0eb10e1394d22119488ca1c54d0 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Thu, 22 May 2025 10:20:15 +0000
Subject: [PATCH v9 5/5] [POC] Add tests to pg_dump.pl
* POC test in pg_dump.pl. More tests will be added and this patch
merged with the other test patch.
* Add extra logging for POC.
---
src/bin/pg_dump/compress_none.c | 10 +++++++++-
src/bin/pg_dump/t/002_pg_dump.pl | 19 +++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index bc63ccabdb6..465ff861688 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -212,9 +212,15 @@ close_none(CompressFileHandle *CFH)
{
errno = 0;
if (CFH->path_is_pipe_command)
- ret = pclose(fp);
+ {
+ pg_log_debug("closing pipe %p.", (void*) fp);
+ ret = pclose_check(fp);
+ }
else
+ {
+ pg_log_debug("closing normal file.");
ret = fclose(fp);
+ }
if (ret != 0)
pg_log_error("could not close file: %m");
}
@@ -246,6 +252,7 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
if (CFH->private_data == NULL)
return false;
+ pg_log_debug("After successful open, the filep is %p", (void*) CFH->private_data);
return true;
}
@@ -265,6 +272,7 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
if (CFH->private_data == NULL)
return false;
+ pg_log_debug("After successful open_write, the filep is %p", (void*) CFH->private_data);
return true;
}
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index f15bd06adcc..e1a843b59f2 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -223,6 +223,25 @@ my %pgdump_runs = (
],
},
+ # This test kept failing.
+ defaults_dir_format_pipe => {
+ test_key => 'defaults',
+ dump_cmd => [
+ 'pg_dump',
+ '--format' => 'directory',
+ '--pipe-command' => "cat > $tempdir/defaults_dir_format/%f",
+ '--statistics',
+ 'postgres',
+ ],
+ restore_cmd => [
+ 'pg_restore',
+ '--format' => 'directory',
+ '--file' => "$tempdir/defaults_dir_format_pipe.sql",
+ '--statistics',
+ "$tempdir/defaults_dir_format",
+ ],
+ },
+
# Do not use --no-sync to give test coverage for data sync.
defaults_parallel => {
test_key => 'defaults',
--
2.53.0.473.g4a7958ca14-goog
From d46348304e1c110e2ccedf6d83ef7e5c5d176590 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Fri, 4 Apr 2025 14:34:48 +0000
Subject: [PATCH v9 4/5] Add documentation for pipe-command in pg_dump and
pg_restore
* Add the descriptions of the new flags and constraints
regarding which mode and other flags they can't be used with.
* Explain the purpose of the flags.
* Add a few examples of the usage of the flags.
---
doc/src/sgml/ref/pg_dump.sgml | 56 ++++++++++++++++++++++++++
doc/src/sgml/ref/pg_restore.sgml | 68 +++++++++++++++++++++++++++++++-
2 files changed, 123 insertions(+), 1 deletion(-)
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 7f538e90194..51ce73f390f 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -297,6 +297,7 @@ PostgreSQL documentation
specifies the target directory instead of a file. In this case the
directory is created by <command>pg_dump</command> unless the directory
exists and is empty.
+ This option and <option>--pipe-command</option> can't be used together.
</para>
</listitem>
</varlistentry>
@@ -1224,6 +1225,32 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>--pipe-command</option></term>
+ <listitem>
+ <para>
+ This option is only supported with the directory output
+ format. It can be used to write to multiple streams which
+ otherwise would not be possible with the directory mode.
+ For each stream, it starts a process which runs the
+ specified command and pipes the pg_dump output to this
+ process.
+ This option is not valid if <option>--file</option>
+ is also specified.
+ </para>
+ <para>
+ The pipe-command can be used to perform operations like compress
+ using a custom algorithm, filter, or write the output to a cloud
+ storage etc. The user would need a way to pipe the final output of
+ each stream to a file. To handle that, the pipe command supports a format
+ specifier %f. And all the instances of %f in the command string
+ will be replaced with the corresponding file name which
+ would have been used in the directory mode with <option>--file</option>.
+ See <xref linkend="pg-dump-examples"/> below.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>--quote-all-identifiers</option></term>
<listitem>
@@ -1791,6 +1818,35 @@ CREATE DATABASE foo WITH TEMPLATE template0;
</screen>
</para>
+ <para>
+ To use pipe-command to dump a database into a directory-format archive
+ (the directory <literal>dumpdir</literal> needs to exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command to dump a database into a directory-format archive
+ in parallel with 5 worker jobs (the directory <literal>dumpdir</literal> needs to exist
+ before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb -j 5 --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command to compress and dump a database into a
+ directory-format archive (the directory <literal>dumpdir</literal> needs to
+ exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="gzip > dumpdir/%f.gz"</userinput>
+</screen>
+ </para>
+
<para>
To reload an archive file into a (freshly created) database named
<literal>newdb</literal>:
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 4a21a089840..3d88dfd0626 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -118,7 +118,10 @@ PostgreSQL documentation
<para>
Specifies the location of the archive file (or directory, for a
directory-format archive) to be restored.
- If not specified, the standard input is used.
+ This option and <option>--pipe-command</option> can't be set
+ at the same time.
+ If neither this option nor <option>--pipe-command</option> is specified,
+ the standard input is used.
</para>
</listitem>
</varlistentry>
@@ -896,6 +899,32 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>--pipe-command</option></term>
+ <listitem>
+ <para>
+ This option is only supported with the directory output
+ format. It can be used to read from multiple streams which
+ otherwise would not be possible with the directory mode.
+ For each stream, it starts a process which runs the
+ specified command and pipes its output to the pg_restore process.
+ This option is not valid if <option>filename</option> is also specified.
+ </para>
+ <para>
+ The pipe-command can be used to perform operations like
+ decompress using a custom algorithm, filter, or read from
+ a cloud storage. When reading from the pg_dump output,
+ the user would need a way to read the correct file in each
+ stream. To handle that, the pipe command supports a format
+ specifier %f. And all the instances of %f in the command string
+ will be replaced with the corresponding file name which
+ would have been used in the directory mode with <option>filename</option>.
+ This is same as the <option>--pipe-command</option> of pg-dump.
+ See <xref linkend="app-pgrestore-examples"/> below.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term>
<listitem>
@@ -1331,6 +1360,43 @@ CREATE DATABASE foo WITH TEMPLATE template0;
<prompt>$</prompt> <userinput>pg_restore -L db.list db.dump</userinput>
</screen></para>
+ <para>
+ To use pg_restore with pipe-command to recreate from a dump in
+ directory-archive format. The database should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in <literal>dumpdir</literal>.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pg_restore with pipe-command to first decompress and then
+ recreate from a dump in directory-archive format. The database
+ should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in <literal>dumpdir</literal>. And all files are
+ <literal>gzip</literal> compressed.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f.gz | gunzip"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command along with <option>-L</option> to recreate only
+ selectd items from a dump in the directory-archive format.
+ The database should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in dumpdir.
+ The <literal>db.list</literal> file is the same as one used in the previous example with <option>-L</option>
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L db.list</userinput>
+</screen>
+ </para>
+
</refsect1>
<refsect1>
--
2.53.0.473.g4a7958ca14-goog
From 4f6cc47f3c5dfbfb950b6373582200ed7e3ca722 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Tue, 11 Feb 2025 08:31:02 +0000
Subject: [PATCH v9 1/5] Add pipe-command support for directory mode of pg_dump
* We add a new flag --pipe-command which can be used in directory
mode. This allows us to support multiple streams and we can
do post processing like compression, filtering etc. This is
currently not possible with directory-archive format.
* Currently this flag is only supported with compression none
and archive format directory.
* This flag can't be used with the flag --file. Only one of the
two flags can be used at a time.
* We reuse the filename field for the --pipe-command also. And add a
bool to specify that the field will be used as a pipe command.
* Most of the code remains as it is. The core change is that
in case of --pipe-command, instead of fopen we do popen.
* The user would need a way to store the post-processing output
in files. For that we support the same format as the directory
mode currently does with the flag --file. We allow the user
to add a format specifier %f to the --pipe-command. And for each
stream, the format specifier is replaced with the corresponding
file name. This file name is the same as it would have been if
the flag --file had been used.
* To enable the above, there are a few places in the code where
we change the file name creation logic. Currently the file name
is appended to the directory name which is provided with --file flag.
In case of --pipe-command, we instead replace %f with the file name.
This change is made for the common use case and separately for
blob files.
* There is an open question on what mode to use in case of large objects
TOC file. Currently the code uses "ab" but that won't work for popen.
We have proposed a few options in the comments regarding this. For the
time being we are using mode PG_BINARY_W for the pipe use case.
---
src/bin/pg_dump/compress_gzip.c | 9 ++-
src/bin/pg_dump/compress_gzip.h | 3 +-
src/bin/pg_dump/compress_io.c | 26 +++++--
src/bin/pg_dump/compress_io.h | 11 ++-
src/bin/pg_dump/compress_lz4.c | 11 ++-
src/bin/pg_dump/compress_lz4.h | 3 +-
src/bin/pg_dump/compress_none.c | 26 ++++++-
src/bin/pg_dump/compress_none.h | 3 +-
src/bin/pg_dump/compress_zstd.c | 10 ++-
src/bin/pg_dump/compress_zstd.h | 3 +-
src/bin/pg_dump/pg_backup.h | 5 +-
src/bin/pg_dump/pg_backup_archiver.c | 22 +++---
src/bin/pg_dump/pg_backup_archiver.h | 2 +
src/bin/pg_dump/pg_backup_directory.c | 103 +++++++++++++++++++++-----
src/bin/pg_dump/pg_dump.c | 37 ++++++++-
src/bin/pg_dump/pg_dumpall.c | 2 +-
src/bin/pg_dump/pg_restore.c | 6 +-
17 files changed, 223 insertions(+), 59 deletions(-)
diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index c9ce8a53aaa..46ac5bddcd5 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -417,8 +417,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
void
InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
+ if (path_is_pipe_command)
+ pg_fatal("cPipe command not supported for Gzip");
+
CFH->open_func = Gzip_open;
CFH->open_write_func = Gzip_open_write;
CFH->read_func = Gzip_read;
@@ -443,7 +447,8 @@ InitCompressorGzip(CompressorState *cs,
void
InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "gzip");
}
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index af1a2a3445e..f77c5c86c56 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -19,6 +19,7 @@
extern void InitCompressorGzip(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 52652b0d979..bc521dd274b 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -191,20 +191,29 @@ free_keep_errno(void *p)
* Initialize a compress file handle for the specified compression algorithm.
*/
CompressFileHandle *
-InitCompressFileHandle(const pg_compress_specification compression_spec)
+InitCompressFileHandle(const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
CompressFileHandle *CFH;
CFH = pg_malloc0_object(CompressFileHandle);
- if (compression_spec.algorithm == PG_COMPRESSION_NONE)
- InitCompressFileHandleNone(CFH, compression_spec);
+ /*
+ * Always set to non-compressed when path_is_pipe_command assuming that
+ * external compressor as part of pipe is more efficient. Can review in
+ * the future.
+ */
+ if (path_is_pipe_command)
+ InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+
+ else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
+ InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
- InitCompressFileHandleGzip(CFH, compression_spec);
+ InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
- InitCompressFileHandleLZ4(CFH, compression_spec);
+ InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
- InitCompressFileHandleZstd(CFH, compression_spec);
+ InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
return CFH;
}
@@ -237,7 +246,8 @@ check_compressed_file(const char *path, char **fname, char *ext)
* On failure, return NULL with an error code in errno.
*/
CompressFileHandle *
-InitDiscoverCompressFileHandle(const char *path, const char *mode)
+InitDiscoverCompressFileHandle(const char *path, const char *mode,
+ bool path_is_pipe_command)
{
CompressFileHandle *CFH = NULL;
struct stat st;
@@ -268,7 +278,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
}
- CFH = InitCompressFileHandle(compression_spec);
+ CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
errno = 0;
if (!CFH->open_func(fname, -1, mode, CFH))
{
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index ed7b14f0963..bd0fc2634dc 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -186,6 +186,11 @@ struct CompressFileHandle
*/
pg_compress_specification compression_spec;
+ /*
+ * Compression specification for this file handle.
+ */
+ bool path_is_pipe_command;
+
/*
* Private data to be used by the compressor.
*/
@@ -195,7 +200,8 @@ struct CompressFileHandle
/*
* Initialize a compress file handle with the requested compression.
*/
-extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
+extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
/*
* Initialize a compress file stream. Infer the compression algorithm
@@ -203,6 +209,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
* suffixes in 'path'.
*/
extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
- const char *mode);
+ const char *mode,
+ bool path_is_pipe_command);
extern bool EndCompressFileHandle(CompressFileHandle *CFH);
#endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index b72bad130ad..f016627ceab 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -739,10 +739,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
*/
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
LZ4State *state;
+ if (path_is_pipe_command)
+ pg_fatal("Pipe command not supported for LZ4");
+
CFH->open_func = LZ4Stream_open;
CFH->open_write_func = LZ4Stream_open_write;
CFH->read_func = LZ4Stream_read;
@@ -758,6 +762,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
if (CFH->compression_spec.level >= 0)
state->prefs.compressionLevel = CFH->compression_spec.level;
+ CFH->path_is_pipe_command = path_is_pipe_command;
+
CFH->private_data = state;
}
#else /* USE_LZ4 */
@@ -770,7 +776,8 @@ InitCompressorLZ4(CompressorState *cs,
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "LZ4");
}
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 7360a469fc0..490141ee8a1 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -19,6 +19,7 @@
extern void InitCompressorLZ4(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index d862d8ca6e9..bc63ccabdb6 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -211,7 +211,10 @@ close_none(CompressFileHandle *CFH)
if (fp)
{
errno = 0;
- ret = fclose(fp);
+ if (CFH->path_is_pipe_command)
+ ret = pclose(fp);
+ else
+ ret = fclose(fp);
if (ret != 0)
pg_log_error("could not close file: %m");
}
@@ -233,7 +236,12 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
if (fd >= 0)
CFH->private_data = fdopen(dup(fd), mode);
else
- CFH->private_data = fopen(path, mode);
+ {
+ if (CFH->path_is_pipe_command)
+ CFH->private_data = popen(path, mode);
+ else
+ CFH->private_data = fopen(path, mode);
+ }
if (CFH->private_data == NULL)
return false;
@@ -246,7 +254,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
{
Assert(CFH->private_data == NULL);
- CFH->private_data = fopen(path, mode);
+ pg_log_debug("Opening %s, pipe is %s",
+ path, CFH->path_is_pipe_command ? "true" : "false");
+
+ if (CFH->path_is_pipe_command)
+ CFH->private_data = popen(path, mode);
+ else
+ CFH->private_data = fopen(path, mode);
+
if (CFH->private_data == NULL)
return false;
@@ -259,7 +274,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
void
InitCompressFileHandleNone(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
CFH->open_func = open_none;
CFH->open_write_func = open_write_none;
@@ -271,5 +287,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
CFH->eof_func = eof_none;
CFH->get_error_func = get_error_none;
+ CFH->path_is_pipe_command = path_is_pipe_command;
+
CFH->private_data = NULL;
}
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index 5134f012ee9..d898a2d411c 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -19,6 +19,7 @@
extern void InitCompressorNone(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index cf2db2649ac..a2c50822566 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -27,7 +27,8 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
}
void
-InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "ZSTD");
}
@@ -558,8 +559,12 @@ Zstd_get_error(CompressFileHandle *CFH)
void
InitCompressFileHandleZstd(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
+ if (path_is_pipe_command)
+ pg_fatal("Pipe command not supported for Zstd");
+
CFH->open_func = Zstd_open;
CFH->open_write_func = Zstd_open_write;
CFH->read_func = Zstd_read;
@@ -571,6 +576,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
CFH->get_error_func = Zstd_get_error;
CFH->compression_spec = compression_spec;
+ CFH->path_is_pipe_command = path_is_pipe_command;
CFH->private_data = NULL;
}
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index 1222d7107d9..1f23e7266bf 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -20,6 +20,7 @@
extern void InitCompressorZstd(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index fda912ba0a9..6466bd4bded 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -316,14 +316,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
extern void RestoreArchive(Archive *AHX, bool append_data);
/* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
/* Create a new archive */
extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker,
- DataDirSyncMethod sync_method);
+ DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe);
/* The --list option */
extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index df8a69d3b79..e6c3bc24dbc 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -56,7 +56,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupWorkerPtr,
- DataDirSyncMethod sync_method);
+ DataDirSyncMethod sync_method, bool FileSpecIsPipe);
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
static void _doSetFixedOutputState(ArchiveHandle *AH);
@@ -232,11 +232,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker,
- DataDirSyncMethod sync_method)
+ DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe)
{
ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
- dosync, mode, setupDumpWorker, sync_method);
+ dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe);
return (Archive *) AH;
}
@@ -244,7 +245,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
/* Open an existing archive */
/* Public */
Archive *
-OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe)
{
ArchiveHandle *AH;
pg_compress_specification compression_spec = {0};
@@ -252,7 +253,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
compression_spec.algorithm = PG_COMPRESSION_NONE;
AH = _allocAH(FileSpec, fmt, compression_spec, true,
archModeRead, setupRestoreWorker,
- DATA_DIR_SYNC_METHOD_FSYNC);
+ DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe);
return (Archive *) AH;
}
@@ -1742,7 +1743,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
else
mode = PG_BINARY_W;
- CFH = InitCompressFileHandle(compression_spec);
+ CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
if (!CFH->open_func(filename, fn, mode, CFH))
{
@@ -2398,7 +2399,8 @@ static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
- SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
+ SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe)
{
ArchiveHandle *AH;
CompressFileHandle *CFH;
@@ -2439,6 +2441,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
else
AH->fSpec = NULL;
+ AH->fSpecIsPipe = FileSpecIsPipe;
+
AH->currUser = NULL; /* unknown */
AH->currSchema = NULL; /* ditto */
AH->currTablespace = NULL; /* ditto */
@@ -2451,14 +2455,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->mode = mode;
AH->compression_spec = compression_spec;
- AH->dosync = dosync;
+ AH->dosync = FileSpecIsPipe ? false : dosync;
AH->sync_method = sync_method;
memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
/* Open stdout with no compression for AH output handle */
out_compress_spec.algorithm = PG_COMPRESSION_NONE;
- CFH = InitCompressFileHandle(out_compress_spec);
+ CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
pg_fatal("could not open stdout for appending: %m");
AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 365073b3eae..16b7f6fa3b4 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,6 +301,8 @@ struct _archiveHandle
int loCount; /* # of LOs restored */
char *fSpec; /* Archive File Spec */
+ bool fSpecIsPipe; /* fSpec is a pipe command template requiring
+ * replacing %f with file name */
FILE *FH; /* General purpose file handle */
void *OF; /* Output file */
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index d6a1428c67a..74fc651f6f4 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -39,7 +39,8 @@
#include <dirent.h>
#include <sys/stat.h>
-#include "common/file_utils.h"
+/* #include "common/file_utils.h" */
+#include "common/percentrepl.h"
#include "compress_io.h"
#include "dumputils.h"
#include "parallel.h"
@@ -157,8 +158,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
if (AH->mode == archModeWrite)
{
- /* we accept an empty existing directory */
- create_or_open_dir(ctx->directory);
+ if (!AH->fSpecIsPipe) /* no checks for pipe */
+ {
+ /* we accept an empty existing directory */
+ create_or_open_dir(ctx->directory);
+ }
}
else
{ /* Read Mode */
@@ -167,7 +171,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
setFilePath(AH, fname, "toc.dat");
- tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
+ tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
if (tocFH == NULL)
pg_fatal("could not open input file \"%s\": %m", fname);
@@ -295,7 +299,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
setFilePath(AH, fname, tctx->filename);
- ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+ ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
@@ -353,7 +357,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
if (!filename)
return;
- CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+ CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
if (!CFH)
pg_fatal("could not open input file \"%s\": %m", filename);
@@ -416,7 +420,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
else
setFilePath(AH, tocfname, tctx->filename);
- CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
+ CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
if (ctx->LOsTocFH == NULL)
pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -427,6 +431,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
{
char lofname[MAXPGPATH + 1];
char path[MAXPGPATH];
+ char *pipe;
/* Can't overflow because line and lofname are the same length */
if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -545,7 +550,7 @@ _CloseArchive(ArchiveHandle *AH)
/* The TOC is always created uncompressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
- tocFH = InitCompressFileHandle(compression_spec);
+ tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
ctx->dataFH = tocFH;
@@ -606,13 +611,46 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
pg_compress_specification compression_spec = {0};
char fname[MAXPGPATH];
+ const char *mode;
setFilePath(AH, fname, tctx->filename);
/* The LO TOC file is never compressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
- ctx->LOsTocFH = InitCompressFileHandle(compression_spec);
- if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH))
+ ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+
+ /*
+ * XXX: We can probably simplify this code by using the mode 'w' for all
+ * cases. The current implementation is due to historical reason that the
+ * mode for the LOs TOC file has been "ab" from the start. That is
+ * something we can't do for pipe-command as popen only supports read and
+ * write. So here a different mode is used for pipes.
+ *
+ * But in future we can evaluate using 'w' for everything.there is one
+ * ToCEntry There is only one ToCEntry per blob group. And it is written
+ * by @WriteDataChunksForToCEntry. This function calls _StartLOs once
+ * before the dumper function and and _EndLOs once after the dumper. And
+ * the dumper dumps all the LOs in the group. So a blob_NNN.toc is only
+ * opened once and closed after all the entries are written. Therefore the
+ * mode can be made 'w' for all the cases. We tested changing the mode to
+ * PG_BINARY_W and the tests passed. But in case there are some missing
+ * scenarios, we have not made that change here. Instead for now only
+ * doing it for the pipe command.
+ *
+ * Another alternative is to keep the 'ab' mode for regular files and use
+ * 'w' mode for pipe files but now also cache the pipe handle to keep it
+ * open till all the LOs in the dump group are done. This is not needed
+ * because of the same reason listed above that a file handle is only
+ * opened once. In short there are 3 solutions : 1. Change the mode for
+ * everything (preferred) 2. Change it only for pipe-command (current) 3.
+ * Change it for pipe-command and then cache those handles and close them
+ * in the end (not needed).
+ */
+ if (AH->fSpecIsPipe)
+ mode = PG_BINARY_W;
+ else
+ mode = "ab";
+ if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -626,10 +664,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
lclContext *ctx = (lclContext *) AH->formatData;
char fname[MAXPGPATH];
+ char *pipe;
+ char blob_name[MAXPGPATH];
- snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+ if (AH->fSpecIsPipe)
+ {
+ snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+ pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
+ strcpy(fname, pipe);
+ pfree(pipe);
+ }
+ else
+ {
+ snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+ }
- ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+ ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -683,15 +733,27 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
{
lclContext *ctx = (lclContext *) AH->formatData;
char *dname;
+ char *pipe;
dname = ctx->directory;
- if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
- pg_fatal("file name too long: \"%s\"", dname);
- strcpy(buf, dname);
- strcat(buf, "/");
- strcat(buf, relativeFilename);
+ if (AH->fSpecIsPipe)
+ {
+ pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+ strcpy(buf, pipe);
+ pfree(pipe);
+ }
+ else /* replace all ocurrences of %f in dname with
+ * relativeFilename */
+ {
+ if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
+ pg_fatal("file name too long: \"%s\"", dname);
+
+ strcpy(buf, dname);
+ strcat(buf, "/");
+ strcat(buf, relativeFilename);
+ }
}
/*
@@ -733,17 +795,24 @@ _PrepParallelRestore(ArchiveHandle *AH)
* only need an approximate indicator of that.
*/
setFilePath(AH, fname, tctx->filename);
+ pg_log_error("filename: %s", fname);
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
{
+ if (AH->fSpecIsPipe)
+ pg_log_error("pipe and compressed");
if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
strlcat(fname, ".gz", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
strlcat(fname, ".lz4", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+ {
+ pg_log_error("filename: %s", fname);
strlcat(fname, ".zst", sizeof(fname));
+ pg_log_error("filename: %s", fname);
+ }
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 6df79067db5..b7b5de695a8 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -419,6 +419,7 @@ main(int argc, char **argv)
{
int c;
const char *filename = NULL;
+ bool filename_is_pipe = false;
const char *format = "p";
TableInfo *tblinfo;
int numTables;
@@ -535,6 +536,7 @@ main(int argc, char **argv)
{"exclude-extension", required_argument, NULL, 17},
{"sequence-data", no_argument, &dopt.sequence_data, 1},
{"restrict-key", required_argument, NULL, 25},
+ {"pipe-command", required_argument, NULL, 26},
{NULL, 0, NULL, 0}
};
@@ -606,7 +608,14 @@ main(int argc, char **argv)
break;
case 'f':
+ if (filename != NULL)
+ {
+ pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+ exit_nicely(1);
+ }
filename = pg_strdup(optarg);
+ filename_is_pipe = false; /* it already is, setting again
+ * here just for clarity */
break;
case 'F':
@@ -799,6 +808,16 @@ main(int argc, char **argv)
dopt.restrict_key = pg_strdup(optarg);
break;
+ case 26: /* pipe command */
+ if (filename != NULL)
+ {
+ pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+ exit_nicely(1);
+ }
+ filename = pg_strdup(optarg);
+ filename_is_pipe = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -920,14 +939,26 @@ main(int argc, char **argv)
else if (dopt.restrict_key)
pg_fatal("option %s can only be used with %s",
"--restrict-key", "--format=plain");
+ if (filename_is_pipe && archiveFormat != archDirectory)
+ {
+ pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+ exit_nicely(1);
+ }
+
+ if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+ {
+ pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
+ exit_nicely(1);
+ }
/*
* Custom and directory formats are compressed by default with gzip when
* available, not the others. If gzip is not available, no compression is
- * done by default.
+ * done by default. If directory format is being used with pipe-command,
+ * no compression is done.
*/
if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
- !user_compression_defined)
+ !filename_is_pipe && !user_compression_defined)
{
#ifdef HAVE_LIBZ
compression_algorithm_str = "gzip";
@@ -977,7 +1008,7 @@ main(int argc, char **argv)
/* Open the output file */
fout = CreateArchive(filename, archiveFormat, compression_spec,
- dosync, archiveMode, setupDumpWorker, sync_method);
+ dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
/* Make dump options accessible right away */
SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index 1165a0f4afe..ebf05120e1b 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -672,7 +672,7 @@ main(int argc, char *argv[])
/* Open the output file */
fout = CreateArchive(global_path, archCustom, compression_spec,
- dosync, archModeWrite, NULL, DATA_DIR_SYNC_METHOD_FSYNC);
+ dosync, archModeWrite, NULL, DATA_DIR_SYNC_METHOD_FSYNC, false);
/* Make dump options accessible right away */
SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 14d886fc86e..a064249086f 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -1,5 +1,5 @@
/*-------------------------------------------------------------------------
- *
+*
* pg_restore.c
* pg_restore is an utility extracting postgres database definitions
* from a backup archive created by pg_dump/pg_dumpall using the archiver
@@ -689,7 +689,7 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
opts->format = archCustom;
opts->txn_size = 0;
- AH = OpenArchive(inputFileSpec, opts->format);
+ AH = OpenArchive(inputFileSpec, opts->format, false);
SetArchiveOptions(AH, NULL, opts);
@@ -731,7 +731,7 @@ restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
Archive *AH;
int n_errors;
- AH = OpenArchive(inputFileSpec, opts->format);
+ AH = OpenArchive(inputFileSpec, opts->format, false);
SetArchiveOptions(AH, NULL, opts);
--
2.53.0.473.g4a7958ca14-goog
From 7e50c1cb9259bbe3ae00862620139ed2348b9510 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Sat, 15 Feb 2025 08:05:25 +0000
Subject: [PATCH v9 2/5] Add pipe-command support in pg_restore
* This is same as the pg_dump change. We add support
for --pipe-command in directory archive format. This can be used
to read from multiple streams and do pre-processing (decompression
with a custom algorithm, filtering etc) before restore.
Currently that is not possible because the pg_dump output of
directory format can't just be piped.
* Like pg_dump, here also either filename or --pipe-command can be
set. If neither are set, the standard input is used as before.
* This is only supported with compression none and archive format
directory.
* We reuse the inputFileSpec field for the pipe-command. And add
a bool to specify if it is a pipe.
* The changes made for pg_dump to handle the pipe case with popen
and pclose also work here.
* The logic of %f format specifier to read from the pg_dump output
is the same too. Most of the code from the pg_dump commit works.
We add similar logic to the function to read large objects.
* The --pipe command works -l and -L option.
---
src/bin/pg_dump/compress_io.c | 30 +++++++++------
src/bin/pg_dump/pg_backup_directory.c | 16 +++++++-
src/bin/pg_dump/pg_restore.c | 53 ++++++++++++++++++++-------
3 files changed, 72 insertions(+), 27 deletions(-)
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index bc521dd274b..88488186b34 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -260,22 +260,28 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
fname = pg_strdup(path);
- if (hasSuffix(fname, ".gz"))
- compression_spec.algorithm = PG_COMPRESSION_GZIP;
- else if (hasSuffix(fname, ".lz4"))
- compression_spec.algorithm = PG_COMPRESSION_LZ4;
- else if (hasSuffix(fname, ".zst"))
- compression_spec.algorithm = PG_COMPRESSION_ZSTD;
- else
+ /*
+ * If the path is a pipe command, the compression algorithm is none.
+ */
+ if (!path_is_pipe_command)
{
- if (stat(path, &st) == 0)
- compression_spec.algorithm = PG_COMPRESSION_NONE;
- else if (check_compressed_file(path, &fname, "gz"))
+ if (hasSuffix(fname, ".gz"))
compression_spec.algorithm = PG_COMPRESSION_GZIP;
- else if (check_compressed_file(path, &fname, "lz4"))
+ else if (hasSuffix(fname, ".lz4"))
compression_spec.algorithm = PG_COMPRESSION_LZ4;
- else if (check_compressed_file(path, &fname, "zst"))
+ else if (hasSuffix(fname, ".zst"))
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+ else
+ {
+ if (stat(path, &st) == 0)
+ compression_spec.algorithm = PG_COMPRESSION_NONE;
+ else if (check_compressed_file(path, &fname, "gz"))
+ compression_spec.algorithm = PG_COMPRESSION_GZIP;
+ else if (check_compressed_file(path, &fname, "lz4"))
+ compression_spec.algorithm = PG_COMPRESSION_LZ4;
+ else if (check_compressed_file(path, &fname, "zst"))
+ compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+ }
}
CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 74fc651f6f4..2b18c3c8270 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -439,7 +439,21 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
tocfname, line);
StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
- snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+
+ /*
+ * XXX : Create a helper function for blob files naming common to
+ * _LoadLOs an _StartLO.
+ */
+ if (AH->fSpecIsPipe)
+ {
+ pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
+ strcpy(path, pipe);
+ pfree(pipe);
+ }
+ else
+ {
+ snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+ }
_PrintFileData(AH, path);
EndRestoreLO(AH, oid);
}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index a064249086f..1356a9398b4 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -60,11 +60,11 @@ static void usage(const char *progname);
static void read_restore_filters(const char *filename, RestoreOptions *opts);
static bool file_exists_in_directory(const char *dir, const char *filename);
static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
- int numWorkers, bool append_data);
-static int restore_global_objects(const char *inputFileSpec, RestoreOptions *opts);
+ int numWorkers, bool append_data, bool filespec_is_pipe);
+static int restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe);
static int restore_all_databases(const char *inputFileSpec,
- SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers);
+ SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers, bool filespec_is_pipe);
static int get_dbnames_list_to_restore(PGconn *conn,
SimplePtrList *dbname_oid_list,
SimpleStringList db_exclude_patterns);
@@ -93,6 +93,7 @@ main(int argc, char **argv)
int n_errors = 0;
bool globals_only = false;
SimpleStringList db_exclude_patterns = {NULL, NULL};
+ bool filespec_is_pipe = false;
static int disable_triggers = 0;
static int enable_row_security = 0;
static int if_exists = 0;
@@ -171,6 +172,7 @@ main(int argc, char **argv)
{"filter", required_argument, NULL, 4},
{"restrict-key", required_argument, NULL, 6},
{"exclude-database", required_argument, NULL, 7},
+ {"pipe-command", required_argument, NULL, 8},
{NULL, 0, NULL, 0}
};
@@ -357,6 +359,11 @@ main(int argc, char **argv)
simple_string_list_append(&db_exclude_patterns, optarg);
break;
+ case 8: /* pipe-command */
+ inputFileSpec = pg_strdup(optarg);
+ filespec_is_pipe = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -364,11 +371,29 @@ main(int argc, char **argv)
}
}
- /* Get file name from command line */
+ /*
+ * Get file name from command line. Note that filename argument and
+ * pipe-command can't both be set.
+ */
if (optind < argc)
+ {
+ if (filespec_is_pipe)
+ {
+ pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
+ exit_nicely(1);
+ }
inputFileSpec = argv[optind++];
- else
+ }
+
+ /*
+ * Even if the file argument is not provided, if the pipe-command is
+ * specified, we need to use that as the file arg and not fallback to
+ * stdio.
+ */
+ else if (!filespec_is_pipe)
+ {
inputFileSpec = NULL;
+ }
/* Complain if any arguments remain */
if (optind < argc)
@@ -632,7 +657,7 @@ main(int argc, char **argv)
*/
snprintf(global_path, MAXPGPATH, "%s/toc.glo", inputFileSpec);
- n_errors = restore_global_objects(global_path, tmpopts);
+ n_errors = restore_global_objects(global_path, tmpopts, filespec_is_pipe);
if (globals_only)
pg_log_info("database restoring skipped because option %s was specified",
@@ -641,7 +666,7 @@ main(int argc, char **argv)
{
/* Now restore all the databases from map.dat */
n_errors = n_errors + restore_all_databases(inputFileSpec, db_exclude_patterns,
- opts, numWorkers);
+ opts, numWorkers, filespec_is_pipe);
}
/* Free db pattern list. */
@@ -661,7 +686,7 @@ main(int argc, char **argv)
"-g/--globals-only");
/* Process if toc.glo file does not exist. */
- n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false);
+ n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, filespec_is_pipe);
}
/* Done, print a summary of ignored errors during restore. */
@@ -680,7 +705,7 @@ main(int argc, char **argv)
* This restore all global objects.
*/
static int
-restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
+restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe)
{
Archive *AH;
int nerror = 0;
@@ -689,7 +714,7 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
opts->format = archCustom;
opts->txn_size = 0;
- AH = OpenArchive(inputFileSpec, opts->format, false);
+ AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
SetArchiveOptions(AH, NULL, opts);
@@ -726,12 +751,12 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts)
*/
static int
restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
- int numWorkers, bool append_data)
+ int numWorkers, bool append_data, bool filespec_is_pipe)
{
Archive *AH;
int n_errors;
- AH = OpenArchive(inputFileSpec, opts->format, false);
+ AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
SetArchiveOptions(AH, NULL, opts);
@@ -1189,7 +1214,7 @@ get_dbname_oid_list_from_mfile(char *dumpdirpath, SimplePtrList *dbname_oid_list
static int
restore_all_databases(const char *inputFileSpec,
SimpleStringList db_exclude_patterns, RestoreOptions *opts,
- int numWorkers)
+ int numWorkers, bool filespec_is_pipe)
{
SimplePtrList dbname_oid_list = {NULL, NULL};
int num_db_restore = 0;
@@ -1356,7 +1381,7 @@ restore_all_databases(const char *inputFileSpec,
}
/* Restore the single database. */
- n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true);
+ n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true, filespec_is_pipe);
n_errors_total += n_errors;
--
2.53.0.473.g4a7958ca14-goog
From 22b9376f8784bb3fce6a14a0b25433249a1b9373 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Sat, 15 Feb 2025 04:29:17 +0000
Subject: [PATCH v9 3/5] Add basic tests for pipe-command
* This currently only adds a few basic tests for pg_dump with --pipe-command.
* These tests include the invalid usages of --pipe-command with other flags.
* We are still working on adding other tests in pg_dump.pl. But
we ran into some issues which might be related to setup.
---
src/bin/pg_dump/t/001_basic.pl | 36 ++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index a895bc314b0..f97d08ff6d8 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -74,6 +74,42 @@ command_fails_like(
'pg_dump: options --statistics-only and --no-statistics cannot be used together'
);
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-f', 'testdir', 'test'],
+ qr/\Qpg_dump: hint: Only one of [--file, --pipe-command] allowed\E/,
+ 'pg_dump: hint: Only one of [--file, --pipe-command] allowed'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', 'gzip', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '--compress=lz4', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', '1', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fc', '--pipe-command="cat"', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+ 'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
+command_fails_like(
+ [ 'pg_dump', '--format=tar', '--pipe-command="cat"', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+ 'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
command_fails_like(
[ 'pg_dump', '-j2', '--include-foreign-data=xxx' ],
qr/\Qpg_dump: error: option --include-foreign-data is not supported with parallel backup\E/,
--
2.53.0.473.g4a7958ca14-goog