On Wed, Jan 25, 2017 at 06:16:16AM -0800, David Fetter wrote:
> On Mon, Oct 31, 2016 at 04:45:40PM -0400, Corey Huinker wrote:
> > Attached is a patch that implements copy_srf().
> >
> > The function signature reflects cstate more than it reflects the COPY
> > options (filename+is_program instead of FILENAME or PROGRAM, etc)
>
> The patch as it stands needs a rebase. I'll see what I can do today.
Please find attached a rebase, which fixes an OID collision that crept in.
- The patch builds against master and passes "make check".
- The patch does not contain user-visible documentation or examples.
- The patch does not contain tests.
I got the following when I did a brief test.
SELECT * FROM copy_srf(filename => 'ls', is_program => true) AS t(l text);
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david(dot)fetter(at)gmail(dot)com
Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
diff --git a/src/backend/catalog/system_views.sql
b/src/backend/catalog/system_views.sql
index 4dfedf8..ae07cfb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1065,6 +1065,21 @@ LANGUAGE INTERNAL
STRICT IMMUTABLE PARALLEL SAFE
AS 'jsonb_insert';
+CREATE OR REPLACE FUNCTION copy_srf(
+ IN filename text DEFAULT null,
+ IN is_program boolean DEFAULT false,
+ IN format text DEFAULT 'text',
+ IN delimiter text DEFAULT null,
+ IN null_string text DEFAULT E'\\N',
+ IN header boolean DEFAULT false,
+ IN quote text DEFAULT null,
+ IN escape text DEFAULT null,
+ IN encoding text DEFAULT null)
+RETURNS SETOF RECORD
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT
+AS 'copy_srf';
+
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
-- than use explicit 'superuser()' checks in those functions, we use the GRANT
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f9362be..8e1bd39 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -30,6 +30,7 @@
#include "commands/defrem.h"
#include "commands/trigger.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
@@ -562,7 +563,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread,
int maxread)
errmsg("could not read from
COPY file: %m")));
break;
case COPY_OLD_FE:
-
/*
* We cannot read more than minread bytes (which in
practice is 1)
* because old protocol doesn't have any clear way of
separating
@@ -4740,3 +4740,377 @@ CreateCopyDestReceiver(void)
return (DestReceiver *) self;
}
+
+Datum
+copy_srf(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore = NULL;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ FmgrInfo *in_functions;
+ Oid *typioparams;
+ Oid in_func_oid;
+
+ CopyStateData copy_state;
+ int col;
+
+ Datum *values;
+ bool *nulls;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context
that cannot accept a set")));
+ }
+
+ if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc
== NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is
not allowed in this context")));
+ }
+
+ tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+ values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
+ in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo));
+ typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid));
+
+ for (col = 0; col < tupdesc->natts; col++)
+ {
+
getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]);
+ fmgr_info(in_func_oid,&in_functions[col]);
+ }
+
+ /*
+ * Function signature is:
+ * copy_srf( filename text default null,
+ * is_program boolean default false,
+ * format text default 'text',
+ * delimiter text default E'\t' in text mode, ',' in csv mode,
+ * null_string text default '\N',
+ * header boolean default false,
+ * quote text default '"' in csv mode only,
+ * escape text default null, -- defaults to whatever quote is
+ * encoding text default null
+ */
+
+ /* Mock up a copy state */
+ initStringInfo(©_state.line_buf);
+ initStringInfo(©_state.attribute_buf);
+ copy_state.fe_msgbuf = makeStringInfo();
+ copy_state.oids = false;
+ copy_state.freeze = false;
+
+ copy_state.need_transcoding = false;
+ copy_state.encoding_embeds_ascii = false;
+ copy_state.rel = NULL;
+ copy_state.queryDesc = NULL;
+
+ /* param 0: filename */
+ if (PG_ARGISNULL(0))
+ {
+ copy_state.copy_dest = COPY_NEW_FE;
+ copy_state.filename = NULL;
+ }
+ else
+ {
+ copy_state.copy_dest = COPY_FILE;
+ copy_state.filename = TextDatumGetCString(PG_GETARG_TEXT_P(0));
+ }
+
+ /* param 1: is_program */
+ if (PG_ARGISNULL(1))
+ {
+ copy_state.is_program = false;
+ }
+ else
+ {
+ copy_state.is_program = PG_GETARG_BOOL(1);
+ }
+
+ /* param 2: format - text, csv, binary */
+ if (PG_ARGISNULL(2))
+ {
+ copy_state.binary = false;
+ copy_state.csv_mode = false;
+ }
+ else
+ {
+ char* format_str = TextDatumGetCString(PG_GETARG_TEXT_P(2));
+ if (strcmp(format_str,"text") == 0)
+ {
+ copy_state.binary = false;
+ copy_state.csv_mode = false;
+ }
+ else if (strcmp(format_str,"csv") == 0)
+ {
+ copy_state.binary = false;
+ copy_state.csv_mode = true;
+ }
+ else if (strcmp(format_str,"binary") == 0)
+ {
+ copy_state.binary = true;
+ copy_state.csv_mode = false;
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Format must be one of: text csv
binary")));
+ }
+ }
+
+ /* param 3: delimiter text default E'\t', */
+ if (PG_ARGISNULL(3))
+ {
+ copy_state.delim = copy_state.csv_mode ? "," : "\t";
+ }
+ else
+ {
+ if (copy_state.binary)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify DELIMITER in
BINARY mode")));
+ }
+ copy_state.delim = TextDatumGetCString(PG_GETARG_TEXT_P(3));
+
+ if (strlen(copy_state.delim) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY delimiter must be a single
one-byte character")));
+ }
+
+ /* Disallow end-of-line characters */
+ if (strchr(copy_state.delim, '\r') != NULL ||
+ strchr(copy_state.delim, '\n') != NULL)
+ {
+ ereport(ERROR,
+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY delimiter cannot be newline or
carriage return")));
+ }
+ }
+
+ /* param 4: null_string text default '\N', */
+ if (PG_ARGISNULL(4))
+ {
+ copy_state.null_print = copy_state.csv_mode ? "" : "\\N";
+ }
+ else
+ {
+ copy_state.null_print =
TextDatumGetCString(PG_GETARG_TEXT_P(4));
+ }
+ copy_state.null_print_len = strlen(copy_state.null_print);
+ /* NOT SET char *null_print_client; */
+
+ /* param 5: header boolean default false, */
+ if (PG_ARGISNULL(5))
+ {
+ copy_state.header_line = false;
+ }
+ else
+ {
+ copy_state.header_line = PG_GETARG_BOOL(5);
+ }
+
+ /* param 6: quote text default '"', */
+ if (PG_ARGISNULL(6))
+ {
+ copy_state.quote = "\"";
+ }
+ else
+ {
+ if (copy_state.csv_mode)
+ {
+ if (strlen(copy_state.quote) != 1)
+ {
+ ereport(ERROR,
+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY quote must be a
single one-byte character")));
+ }
+
+ if (copy_state.delim[0] == copy_state.quote[0])
+ {
+ ereport(ERROR,
+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY delimiter and
quote must be different")));
+ }
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY quote available only in
CSV mode")));
+ }
+ copy_state.quote = TextDatumGetCString(PG_GETARG_TEXT_P(6));
+ }
+
+ /* param 7: escape text default null, -- defaults to whatever quote is
*/
+ if (PG_ARGISNULL(7))
+ {
+ copy_state.escape = copy_state.quote;
+ }
+ else
+ {
+ if (copy_state.csv_mode)
+ {
+ copy_state.escape =
TextDatumGetCString(PG_GETARG_TEXT_P(7));
+ if (strlen(copy_state.escape) != 1)
+ {
+ ereport(ERROR,
+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY escape must be a
single one-byte character")));
+ }
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY escape available only in
CSV mode")));
+ }
+ }
+
+ /* param 8: encoding text default null */
+ if (PG_ARGISNULL(8))
+ {
+ copy_state.file_encoding = pg_get_client_encoding();
+ }
+ else
+ {
+ char* encoding = TextDatumGetCString(PG_GETARG_TEXT_P(8));
+ copy_state.file_encoding = pg_char_to_encoding(encoding);
+ if (copy_state.file_encoding < 0)
+ {
+ ereport(ERROR,
+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must
be a valid encoding name",
+ encoding)));
+ }
+ }
+
+ copy_state.max_fields = tupdesc->natts;
+ copy_state.raw_fields = (char **) palloc(tupdesc->natts * sizeof(char
*));
+
+ /* let the caller know we're sending back a tuplestore */
+ rsinfo->returnMode = SFRM_Materialize;
+ per_query_ctx = fcinfo->flinfo->fn_mcxt;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true,false,work_mem);
+
+ /* open "file" */
+ if (copy_state.is_program)
+ {
+ copy_state.copy_file = OpenPipeStream(copy_state.filename,
PG_BINARY_R);
+
+ if (copy_state.copy_file == NULL)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not execute command
\"%s\": %m",
+ copy_state.filename)));
+ }
+ }
+ else
+ {
+ struct stat st;
+
+ copy_state.copy_file = AllocateFile(copy_state.filename,
PG_BINARY_R);
+ if (copy_state.copy_file == NULL)
+ {
+ /* copy errno because ereport subfunctions might change
it */
+ int save_errno = errno;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for
reading: %m",
+ copy_state.filename),
+ (save_errno == ENOENT || save_errno ==
EACCES) ?
+ errhint("copy_srf instructs the
PostgreSQL server process to read a file. "
+ "You may want a
client-side facility such as psql's \\copy.") : 0));
+ }
+
+ if (fstat(fileno(copy_state.copy_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\":
%m",
+ copy_state.filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory",
copy_state.filename)));
+ }
+
+ while(1)
+ {
+ char **field_strings;
+ int field_strings_count;
+ int col;
+ HeapTuple tuple;
+
+ if (!
NextCopyFromRawFields(©_state,&field_strings,&field_strings_count))
+ {
+ break;
+ }
+ if (field_strings_count != tupdesc->natts)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("found %d fields but expected
%d on line %d",
+ field_strings_count,
tupdesc->natts, copy_state.cur_lineno)));
+ }
+
+ for (col = 0; col < tupdesc->natts; col++)
+ {
+ values[col] = InputFunctionCall(&in_functions[col],
+
field_strings[col],
+
typioparams[col],
+
tupdesc->attrs[col]->atttypmod);
+ nulls[col] = (field_strings[col] == NULL);
+ }
+
+ tuple = heap_form_tuple(tupdesc,values,nulls);
+ //tuple = BuildTupleFromCStrings(attinmeta, field_strings);
+ tuplestore_puttuple(tupstore, tuple);
+ }
+
+ /* close "file" */
+ if (copy_state.is_program)
+ {
+ int pclose_rc;
+
+ pclose_rc = ClosePipeStream(copy_state.copy_file);
+ if (pclose_rc == -1)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close pipe to
external command: %m")));
+ else if (pclose_rc != 0)
+ ereport(ERROR,
+
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("program \"%s\" failed",
+ copy_state.filename),
+ errdetail_internal("%s",
wait_result_to_str(pclose_rc))));
+ }
+ else
+ {
+ if (copy_state.filename != NULL &&
FreeFile(copy_state.copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\":
%m",
+ copy_state.filename)));
+ }
+
+ tuplestore_donestoring(tupstore);
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+ MemoryContextSwitchTo(oldcontext);
+
+ return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index c1f492b..9fb2ff7 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5359,6 +5359,10 @@ DESCR("pg_controldata init state information as a
function");
DATA(insert OID = 3445 ( pg_import_system_collations PGNSP PGUID 12 100 0 0 0
f f f f t f v r 2 0 2278 "16 4089" _null_ _null_ "{if_not_exists,schema}"
_null_ _null_ pg_import_system_collations _null_ _null_ _null_ ));
DESCR("import collations from operating system");
+DATA(insert OID = 3353 ( copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0
2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf
_null_ _null_ _null_ ));
+DESCR("set-returning COPY proof of concept");
+
+
/*
* Symbolic values for provolatile column: these indicate whether the result
* of a function is dependent *only* on the values of its explicit arguments,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index d63ca0f..de09841 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -38,4 +38,7 @@ extern void CopyFromErrorCallback(void *arg);
extern DestReceiver *CreateCopyDestReceiver(void);
+extern Datum copy_srf(PG_FUNCTION_ARGS);
+
+
#endif /* COPY_H */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers