(2018/11/12 20:41), Etsuro Fujita wrote:
(2018/11/12 18:52), Kyotaro HORIGUCHI wrote:
I read the Thomas's messages as "TTIO/TTOU are not
needed to our busines and we don't have a reason to restore them
before calling external programs other than just plaster
seemingly consistency." And I agree to the analysis and I agree
to you on the point that it doens't need consideration just now.
OK
Attached is an updated version of Tom's POC patch. Here are changes:
* I modified his patch so that the called program inherits SIG_DFL for
SIGUSR2 as well, as discussed upthread.
* I think it's better to ignore the SIGPIPE failure in
ClosePipeToProgram if we were in a COPY FROM PROGRAM that was allowed to
terminate early and keep the behavior as-is otherwise. If we ignore
that failure unconditionally in that function, eg, COPY TO PROGRAM would
fail to get a (better) error message in CopySendEndOfRow or EndCopy when
the invoked program was terminated on SIGPIPE, as discussed before [1].
And to do so, I added a new argument to BeginCopyFrom to specify
whether COPY FROM PROGRAM can terminate early or not.
Best regards,
Etsuro Fujita
[1] https://www.postgresql.org/message-id/5BE18409.2070004%40lab.ntt.co.jp
*** a/contrib/file_fdw/file_fdw.c
--- b/contrib/file_fdw/file_fdw.c
***************
*** 678,683 **** fileBeginForeignScan(ForeignScanState *node, int eflags)
--- 678,684 ----
filename,
is_program,
NULL,
+ true,
NIL,
options);
***************
*** 754,759 **** fileReScanForeignScan(ForeignScanState *node)
--- 755,761 ----
festate->filename,
festate->is_program,
NULL,
+ true,
NIL,
festate->options);
}
***************
*** 1117,1124 **** file_acquire_sample_rows(Relation onerel, int elevel,
/*
* Create CopyState from FDW options.
*/
! cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
! options);
/*
* Use per-tuple memory context to prevent leak of memory used to read
--- 1119,1126 ----
/*
* Create CopyState from FDW options.
*/
! cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, false,
! NIL, options);
/*
* Use per-tuple memory context to prevent leak of memory used to read
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 119,124 **** typedef struct CopyStateData
--- 119,125 ----
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ bool allow_partial; /* true if partial execution is allowed */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
***************
*** 998,1004 **** DoCopy(ParseState *pstate, const CopyStmt *stmt,
PreventCommandIfParallelMode("COPY FROM");
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
! NULL, stmt->attlist, stmt->options);
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
--- 999,1005 ----
PreventCommandIfParallelMode("COPY FROM");
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
! NULL, false, stmt->attlist, stmt->options);
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
***************
*** 1727,1737 **** ClosePipeToProgram(CopyState cstate)
--- 1728,1749 ----
(errcode_for_file_access(),
errmsg("could not close pipe to external command: %m")));
else if (pclose_rc != 0)
+ {
+ /*
+ * If we were in a COPY FROM PROGRAM that was allowed to terminate
+ * early and the called program terminated on SIGPIPE, assume it's OK;
+ * we must have chosen to stop reading its output early.
+ */
+ if (cstate->allow_partial &&
+ WIFSIGNALED(pclose_rc) && WTERMSIG(pclose_rc) == SIGPIPE)
+ return;
+
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("program \"%s\" failed",
cstate->filename),
errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+ }
}
/*
***************
*** 3184,3189 **** BeginCopyFrom(ParseState *pstate,
--- 3196,3202 ----
const char *filename,
bool is_program,
copy_data_source_cb data_source_cb,
+ bool allow_partial,
List *attnamelist,
List *options)
{
***************
*** 3301,3306 **** BeginCopyFrom(ParseState *pstate,
--- 3314,3320 ----
cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
+ cstate->allow_partial = allow_partial;
if (data_source_cb)
{
*** a/src/backend/replication/logical/tablesync.c
--- b/src/backend/replication/logical/tablesync.c
***************
*** 799,805 **** copy_table(Relation rel)
NULL, false, false);
attnamelist = make_copy_attnamelist(relmapentry);
! cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
/* Do the copy */
(void) CopyFrom(cstate);
--- 799,805 ----
NULL, false, false);
attnamelist = make_copy_attnamelist(relmapentry);
! cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, false, attnamelist, NIL);
/* Do the copy */
(void) CopyFrom(cstate);
*** a/src/backend/storage/file/fd.c
--- b/src/backend/storage/file/fd.c
***************
*** 2271,2281 **** OpenTransientFilePerm(const char *fileName, int fileFlags, mode_t fileMode)
--- 2271,2287 ----
* Routines that want to initiate a pipe stream should use OpenPipeStream
* rather than plain popen(). This lets fd.c deal with freeing FDs if
* necessary. When done, call ClosePipeStream rather than pclose.
+ *
+ * This function also ensures that the popen'd program is run with default
+ * SIGPIPE/SIGUSR2 processing, rather than the SIG_IGN setting the backend
+ * normally uses. This ensures desirable response to, eg, closing a read pipe
+ * early.
*/
FILE *
OpenPipeStream(const char *command, const char *mode)
{
FILE *file;
+ int save_errno;
DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
numAllocatedDescs, command));
***************
*** 2293,2300 **** OpenPipeStream(const char *command, const char *mode)
TryAgain:
fflush(stdout);
fflush(stderr);
errno = 0;
! if ((file = popen(command, mode)) != NULL)
{
AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
--- 2299,2313 ----
TryAgain:
fflush(stdout);
fflush(stderr);
+ pqsignal(SIGPIPE, SIG_DFL);
+ pqsignal(SIGUSR2, SIG_DFL);
errno = 0;
! file = popen(command, mode);
! save_errno = errno;
! pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR2, SIG_IGN);
! errno = save_errno;
! if (file != NULL)
{
AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
***************
*** 2307,2314 **** TryAgain:
if (errno == EMFILE || errno == ENFILE)
{
- int save_errno = errno;
-
ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("out of file descriptors: %m; release and retry")));
--- 2320,2325 ----
*** a/src/include/commands/copy.h
--- b/src/include/commands/copy.h
***************
*** 29,35 **** extern void DoCopy(ParseState *state, const CopyStmt *stmt,
extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
! bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
extern void EndCopyFrom(CopyState cstate);
extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
Datum *values, bool *nulls, Oid *tupleOid);
--- 29,35 ----
extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
! bool is_program, copy_data_source_cb data_source_cb, bool allow_partial, List *attnamelist, List *options);
extern void EndCopyFrom(CopyState cstate);
extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
Datum *values, bool *nulls, Oid *tupleOid);