>
>
>> I don't understand why do we have all these checks.  Can't we just pass
>> the values to COPY and let it apply the checks?  That way, when COPY is
>> updated to support multibyte escape chars (for example) we don't need to
>> touch this code.  Together with removing the unneeded braces that would
>> make these stanzas about six lines long instead of fifteen.
>>
>
> If I understand you correctly, COPY (via BeginCopyFrom) itself relies on
> having a relation in pg_class to reference for attributes.
> In this case, there is no such relation. So I'd have to fake a relcache
> entry, or refactor BeginCopyFrom() to extract a ReturnSetInfo from the
> Relation and pass that along to a new function BeginCopyFromReturnSet. I'm
> happy to go that route if you think it's a good idea.
>
>
>>
>>
>> > +             tuple = heap_form_tuple(tupdesc,values,nulls);
>> > +             //tuple = BuildTupleFromCStrings(attinmeta,
>> field_strings);
>> > +             tuplestore_puttuple(tupstore, tuple);
>>
>> No need to form a tuple; use tuplestore_putvalues here.
>>
>
> Good to know!
>
>
>
>>
>> I wonder if these should be an auxiliary function in copy.c to do this.
>> Surely copy.c itself does pretty much the same thing ...
>>
>
> Yes. This got started as a patch to core because not all of the parts of
> COPY are externally callable, and aren't broken down in a way that allowed
> for use in a SRF.
>
> I'll get to work on these suggestions.
>

I've put in some more work on this patch, mostly just taking Alvaro's
suggestions, which resulted in big code savings.

I had to add a TupleDesc parameter to BeginCopy() and BeginCopyFrom(). This
seemed the easiest way to leverage the existing tested code (and indeed, it
worked nearly out-of-the-box). The only drawback is that a minor change
will have to be made to the BeginCopyFrom() call in file_fdw.c, and any
other extensions that leverage COPY. We could make compatibility functions
that take the original signature and pass it along to the corresponding
function with rsTupDesc set to NULL.

Some issues:
- I'm still not sure if the direction we want to go is a set returning
function, or a change in grammar that lets us use COPY as a CTE or similar.
- This function will have the same difficulties as adding the program
option did to file_fdw: there's very little we can reference that isn't
os/environment specific
- Inline (STDIN) prompts the user for input, but gives the error: server
sent data ("D" message) without prior row description ("T" message). I
looked for a place where the Relation was consulted for the row
description, but I'm not finding it.

I can continue to flesh this out with documentation and test cases if there
is consensus that this is the way to go.


# select * from copy_srf('echo "x\ty"',true) as t(x text, y text);
 x | y
---+---
 x | y
(1 row)

Time: 1.074 ms
# select * from copy_srf('echo "x\t4"',true) as t(x text, y integer);
 x | y
---+---
 x | 4
(1 row)

Time: 1.095 ms
# select * from copy_srf(null) as t(x text, y integer);
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> a    4
>> b    5
>> \.
server sent data ("D" message) without prior row description ("T" message)
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 4dfedf8..26f81f3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1065,6 +1065,21 @@ LANGUAGE INTERNAL
 STRICT IMMUTABLE PARALLEL SAFE
 AS 'jsonb_insert';
 
+CREATE OR REPLACE FUNCTION copy_srf(
+       IN filename text,
+       IN is_program boolean DEFAULT false,
+       IN format text DEFAULT null,
+       IN delimiter text DEFAULT null,
+       IN null_string text DEFAULT null,
+       IN header boolean DEFAULT null,
+       IN quote text DEFAULT null,
+       IN escape text DEFAULT null,
+       IN encoding text DEFAULT null)
+RETURNS SETOF RECORD
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT
+AS 'copy_srf';
+
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
 -- than use explicit 'superuser()' checks in those functions, we use the GRANT
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f9362be..4e6a32c 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -30,6 +30,7 @@
 #include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
@@ -286,7 +287,8 @@ static const char BinarySignature[11] = 
"PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
+static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, 
+                 TupleDesc rsTupDesc,
                  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
                  List *options);
 static void EndCopy(CopyState cstate);
@@ -562,7 +564,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, 
int maxread)
                                                 errmsg("could not read from 
COPY file: %m")));
                        break;
                case COPY_OLD_FE:
-
                        /*
                         * We cannot read more than minread bytes (which in 
practice is 1)
                         * because old protocol doesn't have any clear way of 
separating
@@ -967,7 +968,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
                        PreventCommandIfReadOnly("COPY FROM");
                PreventCommandIfParallelMode("COPY FROM");
 
-               cstate = BeginCopyFrom(pstate, rel, stmt->filename, 
stmt->is_program,
+               cstate = BeginCopyFrom(pstate, rel, NULL, stmt->filename, 
stmt->is_program,
                                                           stmt->attlist, 
stmt->options);
                cstate->range_table = range_table;
                *processed = CopyFrom(cstate);  /* copy from file to database */
@@ -1370,6 +1371,7 @@ static CopyState
 BeginCopy(ParseState *pstate,
                  bool is_from,
                  Relation rel,
+                 TupleDesc rsTupDesc,
                  RawStmt *raw_query,
                  Oid queryRelId,
                  List *attnamelist,
@@ -1396,6 +1398,9 @@ BeginCopy(ParseState *pstate,
        /* Extract options from the statement node tree */
        ProcessCopyOptions(pstate, cstate, is_from, options);
 
+       /* Cannot have both a relation a result set */
+       Assert(rel == NULL || rsTupDesc == NULL);
+
        /* Process the source/target relation or query */
        if (rel)
        {
@@ -1436,6 +1441,14 @@ BeginCopy(ParseState *pstate,
                        cstate->partition_tuple_slot = partition_tuple_slot;
                }
        }
+       else if (rsTupDesc)
+       {
+               /* COPY FROM file/program to result set */
+               Assert(!raw_query);
+               Assert(is_from);
+               Assert(attnamelist == NIL);
+               tupDesc = rsTupDesc;
+       }
        else
        {
                List       *rewritten;
@@ -1802,7 +1815,7 @@ BeginCopyTo(ParseState *pstate,
                                                        
RelationGetRelationName(rel))));
        }
 
-       cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
+       cstate = BeginCopy(pstate, false, rel, NULL, query, queryRelId, 
attnamelist,
                                           options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
@@ -2875,6 +2888,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, 
CommandId mycid,
 CopyState
 BeginCopyFrom(ParseState *pstate,
                          Relation rel,
+                         TupleDesc rsTupDesc,
                          const char *filename,
                          bool is_program,
                          List *attnamelist,
@@ -2895,13 +2909,14 @@ BeginCopyFrom(ParseState *pstate,
        MemoryContext oldcontext;
        bool            volatile_defexprs;
 
-       cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, 
options);
+       cstate = BeginCopy(pstate, true, rel, rsTupDesc, NULL, InvalidOid, 
attnamelist, options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
        /* Initialize state variables */
        cstate->fe_eof = false;
        cstate->eol_type = EOL_UNKNOWN;
-       cstate->cur_relname = RelationGetRelationName(cstate->rel);
+       if (cstate->rel)
+               cstate->cur_relname = RelationGetRelationName(cstate->rel);
        cstate->cur_lineno = 0;
        cstate->cur_attname = NULL;
        cstate->cur_attval = NULL;
@@ -2913,7 +2928,10 @@ BeginCopyFrom(ParseState *pstate,
        cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
        cstate->raw_buf_index = cstate->raw_buf_len = 0;
 
-       tupDesc = RelationGetDescr(cstate->rel);
+       if (cstate->rel)
+               tupDesc = RelationGetDescr(cstate->rel);
+       else
+               tupDesc = rsTupDesc;
        attr = tupDesc->attrs;
        num_phys_attrs = tupDesc->natts;
        num_defaults = 0;
@@ -2950,8 +2968,10 @@ BeginCopyFrom(ParseState *pstate,
                {
                        /* attribute is NOT to be copied from input */
                        /* use default value if one exists */
-                       Expr       *defexpr = (Expr *) 
build_column_default(cstate->rel,
-                                                                               
                                                attnum);
+                       Expr       *defexpr = NULL;
+
+                       if (rel)
+                               defexpr = (Expr *) 
build_column_default(cstate->rel, attnum);
 
                        if (defexpr != NULL)
                        {
@@ -4740,3 +4760,167 @@ CreateCopyDestReceiver(void)
 
        return (DestReceiver *) self;
 }
+
+Datum
+copy_srf(PG_FUNCTION_ARGS)
+{
+       ParseState              *pstate = NULL;
+       ReturnSetInfo   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc               tupdesc;
+       Tuplestorestate *tupstore = NULL;
+       MemoryContext   per_query_ctx;
+       MemoryContext   oldcontext;
+       FmgrInfo        *in_functions;
+       Oid             *typioparams;
+       Oid             in_func_oid;
+
+       CopyState       cstate;
+       int                     col;
+
+       Datum       *values;
+       bool            *nulls;
+
+       char            *filename = NULL;
+       bool            is_program = false;
+       List            *options = NIL;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context 
that cannot accept a set")));
+
+       if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc 
== NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is 
not allowed in this context")));
+
+       tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+       values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+       nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
+       in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo));
+       typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid));
+
+       for (col = 0; col < tupdesc->natts; col++)
+       {
+               
getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]);
+               fmgr_info(in_func_oid,&in_functions[col]);
+       }
+
+       /*----------
+        * Function signature is:
+        * copy_srf( filename text,
+        *           is_program boolean default false,
+        *           format text default null,
+        *           delimiter text default null,
+        *           null_string text default null,
+        *           header boolean default null,
+        *           quote text default null,
+        *           escape text default null,
+        *           encoding text default null
+        *----------
+        */
+       /* param 0: filename */
+       if (! PG_ARGISNULL(0))
+               filename = TextDatumGetCString(PG_GETARG_TEXT_P(0));
+
+       /* param 1: is_program */
+       if (! PG_ARGISNULL(1))
+               is_program = PG_GETARG_BOOL(1);
+
+       if (filename == NULL && is_program)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("is_program cannot be true in pipe 
mode")));
+
+
+       /* param 2: format - text, csv, binary */
+       if (! PG_ARGISNULL(2))
+               options = lcons(makeDefElem("format",
+                                                                       (Node 
*) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(2))),-1),
+                                               options);
+
+       /* param 3: delimiter text default E'\t', */
+       if (! PG_ARGISNULL(3))
+               options = lcons(makeDefElem("delimiter",
+                                                                       (Node 
*) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(3))),-1),
+                                               options);
+
+       /* param 4: null_string text default '\N', */
+       if (! PG_ARGISNULL(4))
+               options = lcons(makeDefElem("null",
+                                                                       (Node 
*) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(4))),-1),
+                                               options);
+
+       /* param 5: header boolean default false, */
+       if (! PG_ARGISNULL(5))
+               if (PG_GETARG_BOOL(5))
+                       options = lcons(makeDefElem("header", (Node *) 
makeString( "true"),-1),options);
+
+       /* param 6: quote text default '"', */
+       if (! PG_ARGISNULL(6))
+               options = lcons(makeDefElem("quote",
+                                                                       (Node 
*) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(6))),-1),
+                                               options);
+
+       /* param 7: escape text default null, -- defaults to whatever quote is 
*/
+       if (! PG_ARGISNULL(7))
+               options = lcons(makeDefElem("escape",
+                                                                       (Node 
*) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(6))),-1),
+                                               options);
+
+       /* param 8: encoding text default null */
+       if (! PG_ARGISNULL(8))
+               options = lcons(makeDefElem("encoding",
+                                                                       (Node 
*) makeString( TextDatumGetCString(PG_GETARG_TEXT_P(8))),-1),
+                                               options);
+
+       /* let the caller know we're sending back a tuplestore */
+       rsinfo->returnMode = SFRM_Materialize;
+       per_query_ctx = fcinfo->flinfo->fn_mcxt;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true,false,work_mem);
+
+       cstate = BeginCopyFrom(pstate, NULL, tupdesc, filename, is_program, 
NIL, options);
+
+       while(1)
+       {
+               char    **field_strings;
+               int     field_strings_count;
+               int     col;
+
+               if (! 
NextCopyFromRawFields(cstate,&field_strings,&field_strings_count))
+               {
+                       break;
+               }
+               if (field_strings_count != tupdesc->natts)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                        errmsg("found %d fields but expected 
%d on line %d",
+                                                       field_strings_count, 
tupdesc->natts, cstate->cur_lineno)));
+               }
+
+               for (col = 0; col < tupdesc->natts; col++)
+               {
+                       values[col] = InputFunctionCall(&in_functions[col],
+                                                                               
        field_strings[col],
+                                                                               
        typioparams[col],
+                                                                               
        tupdesc->attrs[col]->atttypmod);
+                       nulls[col] = (field_strings[col] == NULL);
+               }
+
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       }
+
+       tuplestore_donestoring(tupstore);
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+
+       EndCopy(cstate);
+
+       MemoryContextSwitchTo(oldcontext);
+
+       return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index c1f492b..9fb2ff7 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5359,6 +5359,10 @@ DESCR("pg_controldata init state information as a 
function");
 DATA(insert OID = 3445 ( pg_import_system_collations PGNSP PGUID 12 100 0 0 0 
f f f f t f v r 2 0 2278 "16 4089" _null_ _null_ "{if_not_exists,schema}" 
_null_ _null_ pg_import_system_collations _null_ _null_ _null_ ));
 DESCR("import collations from operating system");
 
+DATA(insert OID = 3353 (  copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0 
2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf 
_null_ _null_ _null_ ));
+DESCR("set-returning COPY proof of concept");
+
+
 /*
  * Symbolic values for provolatile column: these indicate whether the result
  * of a function is dependent *only* on the values of its explicit arguments,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index d63ca0f..6364604 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -27,8 +27,8 @@ extern void DoCopy(ParseState *state, const CopyStmt *stmt,
           uint64 *processed);
 
 extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool 
is_from, List *options);
-extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char 
*filename,
-                         bool is_program, List *attnamelist, List *options);
+extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, TupleDesc 
rsTupDesc,
+                                       const char *filename, bool is_program, 
List *attnamelist, List *options);
 extern void EndCopyFrom(CopyState cstate);
 extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
                         Datum *values, bool *nulls, Oid *tupleOid);
@@ -38,4 +38,7 @@ extern void CopyFromErrorCallback(void *arg);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern Datum copy_srf(PG_FUNCTION_ARGS);
+
+
 #endif   /* COPY_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to