Attached is a patch that implements copy_srf().

The function signature reflects cstate more than it reflects the COPY
options (filename+is_program instead of  FILENAME or PROGRAM, etc)

CREATE OR REPLACE FUNCTION copy_srf(
   filename    text DEFAULT null,
   is_program  boolean DEFAULT false,
   format      text DEFAULT 'text',  /* accepts text, csv, binary */
   delimiter   text DEFAULT null,
   null_string text DEFAULT E'\\N',
   header      boolean DEFAULT false,
   quote       text DEFAULT null,
   escape      text DEFAULT null,
   encoding    text DEFAULT null)
RETURNS SETOF RECORD

The major utility for this (at least for me) will be in ETLs that currently
make a lot of use of temp tables, and wish to either reduce I/O or avoid
pg_attribute bloat.

I have not yet implemented STDIN mode, but it's a start.

$ psql test
psql (10devel)
Type "help" for help.

# select * from    copy_srf('echo 1,2; echo 4,5',true,'csv') as t(x text, y
text);
 x | y
---+---
 1 | 2
 4 | 5
(2 rows)

Time: 1.456 ms
# select * from    copy_srf('/tmp/small_file.txt'::text,false,'text') as
t(x text, y text);
  x  |  y
-----+-----
 1   | 2
 a   | b
 cde | fgh
(3 rows)


On Mon, Oct 17, 2016 at 2:33 PM, Merlin Moncure <mmonc...@gmail.com> wrote:

> On Fri, Sep 30, 2016 at 9:56 PM, Tom Lane <t...@sss.pgh.pa.us> wrote:
> > Craig Ringer <craig.rin...@2ndquadrant.com> writes:
> >> On 1 Oct. 2016 05:20, "Tom Lane" <t...@sss.pgh.pa.us> wrote:
> >>> I think the last of those suggestions has come up before.  It has the
> >>> large advantage that you don't have to remember a different syntax for
> >>> copy-as-a-function.
> >
> >> That sounds fantastic. It'd help this copy variant retain festure parity
> >> with normal copy. And it'd bring us closer to being able to FETCH in non
> >> queries.
> >
> > On second thought, though, this couldn't exactly duplicate the existing
> > COPY syntax, because COPY relies heavily on the rowtype of the named
> > target table to tell it what it's copying.  You'd need some new syntax
> > to provide the list of column names and types, which puts a bit of
> > a hole in the "syntax we already know" argument.  A SRF-returning-record
> > would have a leg up on that, because we do have existing syntax for
> > defining the concrete rowtype that any particular call returns.
>
> One big disadvantage of SRF-returning-record syntax is that functions
> are basically unwrappable with generic wrappers sans major gymnastics
> such as dynamically generating the query and executing it.  This is a
> major disadvantage relative to the null::type hack we use in the
> populate_record style functions and perhaps ought to make this
> (SRF-returning-record syntax) style of use discouraged for useful
> library functions.  If there were a way to handle wrapping I'd
> withdraw this minor objection -- this has come up in dblink
> discussions a few times).
>
> merlin
>
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index ada2142..0876ee1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1006,6 +1006,21 @@ LANGUAGE INTERNAL
 STRICT IMMUTABLE PARALLEL SAFE
 AS 'jsonb_insert';
 
+CREATE OR REPLACE FUNCTION copy_srf(
+       IN filename text DEFAULT null, 
+       IN is_program boolean DEFAULT false,
+       IN format text DEFAULT 'text',
+       IN delimiter text DEFAULT null,
+       IN null_string text DEFAULT E'\\N',
+       IN header boolean DEFAULT false,
+       IN quote text DEFAULT null,
+       IN escape text DEFAULT null,
+       IN encoding text DEFAULT null)
+RETURNS SETOF RECORD
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT
+AS 'copy_srf';
+
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
 -- than use explicit 'superuser()' checks in those functions, we use the GRANT
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b4140eb..90ed2c5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -30,6 +30,7 @@
 #include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
@@ -555,7 +556,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, 
int maxread)
                                                 errmsg("could not read from 
COPY file: %m")));
                        break;
                case COPY_OLD_FE:
-
                        /*
                         * We cannot read more than minread bytes (which in 
practice is 1)
                         * because old protocol doesn't have any clear way of 
separating
@@ -4555,3 +4555,377 @@ CreateCopyDestReceiver(void)
 
        return (DestReceiver *) self;
 }
+
+Datum
+copy_srf(PG_FUNCTION_ARGS)
+{
+       ReturnSetInfo   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc               tupdesc;
+       Tuplestorestate *tupstore = NULL;
+       MemoryContext   per_query_ctx;
+       MemoryContext   oldcontext;
+       FmgrInfo        *in_functions;
+       Oid             *typioparams;
+       Oid             in_func_oid;
+
+       CopyStateData   copy_state;
+       int                     col;
+
+       Datum       *values;
+       bool            *nulls;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context 
that cannot accept a set")));
+       }
+
+       if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc 
== NULL)
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is 
not allowed in this context")));
+       }
+
+       tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+       values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+       nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
+       in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo));
+       typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid));
+
+       for (col = 0; col < tupdesc->natts; col++)
+       {
+               
getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]);
+               fmgr_info(in_func_oid,&in_functions[col]);
+       }
+
+       /*
+        * Function signature is:
+        * copy_srf( filename text default null,
+        *           is_program boolean default false,
+        *           format text default 'text',
+        *           delimiter text default E'\t' in text mode, ',' in csv mode,
+        *           null_string text default '\N',
+        *           header boolean default false,
+        *           quote text default '"' in csv mode only,
+        *           escape text default null, -- defaults to whatever quote is
+        *           encoding text default null
+        */
+
+       /* Mock up a copy state */
+       initStringInfo(&copy_state.line_buf);
+       initStringInfo(&copy_state.attribute_buf);
+       copy_state.fe_msgbuf = makeStringInfo();
+       copy_state.oids = false;
+       copy_state.freeze = false;
+
+       copy_state.need_transcoding = false;
+       copy_state.encoding_embeds_ascii = false;
+       copy_state.rel = NULL;
+       copy_state.queryDesc = NULL;
+
+       /* param 0: filename */
+       if (PG_ARGISNULL(0))
+       {
+               copy_state.copy_dest = COPY_NEW_FE;
+               copy_state.filename = NULL;
+       }
+       else
+       {
+               copy_state.copy_dest = COPY_FILE;
+               copy_state.filename = TextDatumGetCString(PG_GETARG_TEXT_P(0));
+       }
+
+       /* param 1: is_program */
+       if (PG_ARGISNULL(1))
+       {
+               copy_state.is_program = false;
+       }
+       else
+       {
+               copy_state.is_program = PG_GETARG_BOOL(1);
+       }
+
+       /* param 2: format - text, csv, binary */
+       if (PG_ARGISNULL(2))
+       {
+               copy_state.binary = false;
+               copy_state.csv_mode = false;
+       }
+       else
+       {
+               char* format_str = TextDatumGetCString(PG_GETARG_TEXT_P(2));
+               if (strcmp(format_str,"text") == 0)
+               {
+                       copy_state.binary = false;
+                       copy_state.csv_mode = false;
+               }
+               else if (strcmp(format_str,"csv") == 0)
+               {
+                       copy_state.binary = false;
+                       copy_state.csv_mode = true;
+               }
+               else if (strcmp(format_str,"binary") == 0)
+               {
+                       copy_state.binary = true;
+                       copy_state.csv_mode = false;
+               }
+               else
+               {
+                       ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("Format must be one of: text csv 
binary")));
+               }
+       }
+
+       /* param 3: delimiter text default E'\t', */
+       if (PG_ARGISNULL(3))
+       {
+               copy_state.delim = copy_state.csv_mode ? "," : "\t";
+       }
+       else
+       {
+               if (copy_state.binary)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("cannot specify DELIMITER in 
BINARY mode")));
+               }
+               copy_state.delim = TextDatumGetCString(PG_GETARG_TEXT_P(3));
+
+               if (strlen(copy_state.delim) != 1)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                 errmsg("COPY delimiter must be a single 
one-byte character")));
+               }
+
+               /* Disallow end-of-line characters */
+               if (strchr(copy_state.delim, '\r') != NULL ||
+                       strchr(copy_state.delim, '\n') != NULL)
+               {
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("COPY delimiter cannot be newline or 
carriage return")));
+               }
+       }
+
+       /* param 4: null_string text default '\N', */
+       if (PG_ARGISNULL(4))
+       {
+               copy_state.null_print = copy_state.csv_mode ? "" : "\\N";
+       }
+       else
+       {
+               copy_state.null_print = 
TextDatumGetCString(PG_GETARG_TEXT_P(4));
+       }
+       copy_state.null_print_len = strlen(copy_state.null_print);
+       /* NOT SET char    *null_print_client; */
+
+       /* param 5: header boolean default false, */
+       if (PG_ARGISNULL(5))
+       {
+               copy_state.header_line = false;
+       }
+       else
+       {
+               copy_state.header_line = PG_GETARG_BOOL(5);
+       }
+
+       /* param 6: quote text default '"', */
+       if (PG_ARGISNULL(6))
+       {
+               copy_state.quote = "\"";
+       }
+       else
+       {
+               if (copy_state.csv_mode)
+               {
+                       if (strlen(copy_state.quote) != 1)
+                       {
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("COPY quote must be a 
single one-byte character")));
+                       }
+
+                       if (copy_state.delim[0] == copy_state.quote[0])
+                       {
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("COPY delimiter and 
quote must be different")));
+                       }
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("COPY quote available only in 
CSV mode")));
+               }
+               copy_state.quote = TextDatumGetCString(PG_GETARG_TEXT_P(6));
+       }
+
+       /* param 7: escape text default null, -- defaults to whatever quote is 
*/
+       if (PG_ARGISNULL(7))
+       {
+               copy_state.escape = copy_state.quote;
+       }
+       else
+       {
+               if (copy_state.csv_mode)
+               {
+                       copy_state.escape = 
TextDatumGetCString(PG_GETARG_TEXT_P(7));
+                       if (strlen(copy_state.escape) != 1)
+                       {
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("COPY escape must be a 
single one-byte character")));
+                       }
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("COPY escape available only in 
CSV mode")));
+               }
+       }
+
+       /* param 8: encoding text default null */
+       if (PG_ARGISNULL(8))
+       {
+               copy_state.file_encoding = pg_get_client_encoding();
+       }
+       else
+       {
+               char* encoding = TextDatumGetCString(PG_GETARG_TEXT_P(8));
+               copy_state.file_encoding = pg_char_to_encoding(encoding);
+               if (copy_state.file_encoding < 0)
+               {
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("argument to option \"%s\" must 
be a valid encoding name",
+                                                       encoding)));
+               }
+       }
+
+       copy_state.max_fields = tupdesc->natts;
+       copy_state.raw_fields = (char **) palloc(tupdesc->natts * sizeof(char 
*));
+
+       /* let the caller know we're sending back a tuplestore */
+       rsinfo->returnMode = SFRM_Materialize;
+       per_query_ctx = fcinfo->flinfo->fn_mcxt;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true,false,work_mem);
+
+       /* open "file" */
+       if (copy_state.is_program)
+       {
+               copy_state.copy_file = OpenPipeStream(copy_state.filename, 
PG_BINARY_R);
+
+               if (copy_state.copy_file == NULL)
+               {
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not execute command 
\"%s\": %m",
+                                                       copy_state.filename)));
+               }
+       }
+       else
+       {
+               struct stat st;
+
+               copy_state.copy_file = AllocateFile(copy_state.filename, 
PG_BINARY_R);
+               if (copy_state.copy_file == NULL)
+               {
+                       /* copy errno because ereport subfunctions might change 
it */
+                       int         save_errno = errno;
+
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not open file \"%s\" for 
reading: %m",
+                                                       copy_state.filename),
+                                        (save_errno == ENOENT || save_errno == 
EACCES) ?
+                                        errhint("copy_srf instructs the 
PostgreSQL server process to read a file. "
+                                                        "You may want a 
client-side facility such as psql's \\copy.") : 0));
+               }
+
+               if (fstat(fileno(copy_state.copy_file), &st))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not stat file \"%s\": 
%m",
+                                                       copy_state.filename)));
+
+               if (S_ISDIR(st.st_mode))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("\"%s\" is a directory", 
copy_state.filename)));
+       }
+
+       while(1)
+       {
+               char    **field_strings;
+               int     field_strings_count;
+               int     col;
+               HeapTuple tuple;
+
+               if (! 
NextCopyFromRawFields(&copy_state,&field_strings,&field_strings_count))
+               {
+                       break;
+               }
+               if (field_strings_count != tupdesc->natts)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                        errmsg("found %d fields but expected 
%d on line %d",
+                                                       field_strings_count, 
tupdesc->natts, copy_state.cur_lineno)));
+               }
+
+               for (col = 0; col < tupdesc->natts; col++)
+               {
+                       values[col] = InputFunctionCall(&in_functions[col],
+                                                                               
        field_strings[col],
+                                                                               
        typioparams[col],
+                                                                               
        tupdesc->attrs[col]->atttypmod);
+                       nulls[col] = (field_strings[col] == NULL);
+               }
+
+               tuple = heap_form_tuple(tupdesc,values,nulls);
+               //tuple = BuildTupleFromCStrings(attinmeta, field_strings);
+               tuplestore_puttuple(tupstore, tuple);
+       }
+
+       /* close "file" */
+       if (copy_state.is_program)
+       {
+               int         pclose_rc;
+
+               pclose_rc = ClosePipeStream(copy_state.copy_file);
+               if (pclose_rc == -1)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close pipe to 
external command: %m")));
+               else if (pclose_rc != 0)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+                                        errmsg("program \"%s\" failed",
+                                                       copy_state.filename),
+                                        errdetail_internal("%s", 
wait_result_to_str(pclose_rc))));
+       }
+       else
+       {
+               if (copy_state.filename != NULL && 
FreeFile(copy_state.copy_file))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close file \"%s\": 
%m",
+                                                       copy_state.filename)));
+       }
+
+       tuplestore_donestoring(tupstore);
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+       MemoryContextSwitchTo(oldcontext);
+
+       return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 17ec71d..d8076ee 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5341,6 +5341,11 @@ DESCR("pg_controldata recovery state information as a 
function");
 DATA(insert OID = 3444 ( pg_control_init PGNSP PGUID 12 1 0 0 0 f f f f t f v 
s 0 0 2249 "" "{23,23,23,23,23,23,23,23,23,16,16,16,23}" 
"{o,o,o,o,o,o,o,o,o,o,o,o,o}" 
"{max_data_alignment,database_block_size,blocks_per_segment,wal_block_size,bytes_per_wal_segment,max_identifier_length,max_index_columns,max_toast_chunk_size,large_object_chunk_size,bigint_timestamps,float4_pass_by_value,float8_pass_by_value,data_page_checksum_version}"
 _null_ _null_ pg_control_init _null_ _null_ _null_ ));
 DESCR("pg_controldata init state information as a function");
 
+
+DATA(insert OID = 3445 (  copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0 
2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf 
_null_ _null_ _null_ ));
+DESCR("set-returning COPY proof of concept");
+
+
 /*
  * Symbolic values for provolatile column: these indicate whether the result
  * of a function is dependent *only* on the values of its explicit arguments,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 65eb347..c27baac 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -37,4 +37,7 @@ extern void CopyFromErrorCallback(void *arg);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern Datum copy_srf(PG_FUNCTION_ARGS);
+
+
 #endif   /* COPY_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to