(2018/11/12 20:41), Etsuro Fujita wrote:
(2018/11/12 18:52), Kyotaro HORIGUCHI wrote:
I read the Thomas's messages as "TTIO/TTOU are not
needed to our busines and we don't have a reason to restore them
before calling external programs other than just plaster
seemingly consistency." And I agree to the analysis and I agree
to you on the point that it doens't need consideration just now.

OK

Attached is an updated version of Tom's POC patch.  Here are changes:

* I modified his patch so that the called program inherits SIG_DFL for SIGUSR2 as well, as discussed upthread.

* I think it's better to ignore the SIGPIPE failure in ClosePipeToProgram if we were in a COPY FROM PROGRAM that was allowed to terminate early and keep the behavior as-is otherwise. If we ignore that failure unconditionally in that function, eg, COPY TO PROGRAM would fail to get a (better) error message in CopySendEndOfRow or EndCopy when the invoked program was terminated on SIGPIPE, as discussed before [1]. And to do so, I added a new argument to BeginCopyFrom to specify whether COPY FROM PROGRAM can terminate early or not.

Best regards,
Etsuro Fujita

[1] https://www.postgresql.org/message-id/5BE18409.2070004%40lab.ntt.co.jp
*** a/contrib/file_fdw/file_fdw.c
--- b/contrib/file_fdw/file_fdw.c
***************
*** 678,683 **** fileBeginForeignScan(ForeignScanState *node, int eflags)
--- 678,684 ----
  						   filename,
  						   is_program,
  						   NULL,
+ 						   true,
  						   NIL,
  						   options);
  
***************
*** 754,759 **** fileReScanForeignScan(ForeignScanState *node)
--- 755,761 ----
  									festate->filename,
  									festate->is_program,
  									NULL,
+ 									true,
  									NIL,
  									festate->options);
  }
***************
*** 1117,1124 **** file_acquire_sample_rows(Relation onerel, int elevel,
  	/*
  	 * Create CopyState from FDW options.
  	 */
! 	cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL,
! 						   options);
  
  	/*
  	 * Use per-tuple memory context to prevent leak of memory used to read
--- 1119,1126 ----
  	/*
  	 * Create CopyState from FDW options.
  	 */
! 	cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, false,
! 						   NIL, options);
  
  	/*
  	 * Use per-tuple memory context to prevent leak of memory used to read
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 119,124 **** typedef struct CopyStateData
--- 119,125 ----
  	int			file_encoding;	/* file or remote side's character encoding */
  	bool		need_transcoding;	/* file encoding diff from server? */
  	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+ 	bool		allow_partial;	/* true if partial execution is allowed */
  
  	/* parameters from the COPY command */
  	Relation	rel;			/* relation to copy to or from */
***************
*** 998,1004 **** DoCopy(ParseState *pstate, const CopyStmt *stmt,
  		PreventCommandIfParallelMode("COPY FROM");
  
  		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
! 							   NULL, stmt->attlist, stmt->options);
  		*processed = CopyFrom(cstate);	/* copy from file to database */
  		EndCopyFrom(cstate);
  	}
--- 999,1005 ----
  		PreventCommandIfParallelMode("COPY FROM");
  
  		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
! 							   NULL, false, stmt->attlist, stmt->options);
  		*processed = CopyFrom(cstate);	/* copy from file to database */
  		EndCopyFrom(cstate);
  	}
***************
*** 1727,1737 **** ClosePipeToProgram(CopyState cstate)
--- 1728,1749 ----
  				(errcode_for_file_access(),
  				 errmsg("could not close pipe to external command: %m")));
  	else if (pclose_rc != 0)
+ 	{
+ 		/*
+ 		 * If we were in a COPY FROM PROGRAM that was allowed to terminate
+ 		 * early and the called program terminated on SIGPIPE, assume it's OK;
+ 		 * we must have chosen to stop reading its output early.
+ 		 */
+ 		if (cstate->allow_partial &&
+ 			WIFSIGNALED(pclose_rc) && WTERMSIG(pclose_rc) == SIGPIPE)
+ 			return;
+ 
  		ereport(ERROR,
  				(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
  				 errmsg("program \"%s\" failed",
  						cstate->filename),
  				 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+ 	}
  }
  
  /*
***************
*** 3184,3189 **** BeginCopyFrom(ParseState *pstate,
--- 3196,3202 ----
  			  const char *filename,
  			  bool is_program,
  			  copy_data_source_cb data_source_cb,
+ 			  bool allow_partial,
  			  List *attnamelist,
  			  List *options)
  {
***************
*** 3301,3306 **** BeginCopyFrom(ParseState *pstate,
--- 3314,3320 ----
  	cstate->volatile_defexprs = volatile_defexprs;
  	cstate->num_defaults = num_defaults;
  	cstate->is_program = is_program;
+ 	cstate->allow_partial = allow_partial;
  
  	if (data_source_cb)
  	{
*** a/src/backend/replication/logical/tablesync.c
--- b/src/backend/replication/logical/tablesync.c
***************
*** 799,805 **** copy_table(Relation rel)
  								  NULL, false, false);
  
  	attnamelist = make_copy_attnamelist(relmapentry);
! 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
  
  	/* Do the copy */
  	(void) CopyFrom(cstate);
--- 799,805 ----
  								  NULL, false, false);
  
  	attnamelist = make_copy_attnamelist(relmapentry);
! 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, false, attnamelist, NIL);
  
  	/* Do the copy */
  	(void) CopyFrom(cstate);
*** a/src/backend/storage/file/fd.c
--- b/src/backend/storage/file/fd.c
***************
*** 2271,2281 **** OpenTransientFilePerm(const char *fileName, int fileFlags, mode_t fileMode)
--- 2271,2287 ----
   * Routines that want to initiate a pipe stream should use OpenPipeStream
   * rather than plain popen().  This lets fd.c deal with freeing FDs if
   * necessary.  When done, call ClosePipeStream rather than pclose.
+  *
+  * This function also ensures that the popen'd program is run with default
+  * SIGPIPE/SIGUSR2 processing, rather than the SIG_IGN setting the backend
+  * normally uses.  This ensures desirable response to, eg, closing a read pipe
+  * early.
   */
  FILE *
  OpenPipeStream(const char *command, const char *mode)
  {
  	FILE	   *file;
+ 	int			save_errno;
  
  	DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
  			   numAllocatedDescs, command));
***************
*** 2293,2300 **** OpenPipeStream(const char *command, const char *mode)
  TryAgain:
  	fflush(stdout);
  	fflush(stderr);
  	errno = 0;
! 	if ((file = popen(command, mode)) != NULL)
  	{
  		AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
  
--- 2299,2313 ----
  TryAgain:
  	fflush(stdout);
  	fflush(stderr);
+ 	pqsignal(SIGPIPE, SIG_DFL);
+ 	pqsignal(SIGUSR2, SIG_DFL);
  	errno = 0;
! 	file = popen(command, mode);
! 	save_errno = errno;
! 	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR2, SIG_IGN);
! 	errno = save_errno;
! 	if (file != NULL)
  	{
  		AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
  
***************
*** 2307,2314 **** TryAgain:
  
  	if (errno == EMFILE || errno == ENFILE)
  	{
- 		int			save_errno = errno;
- 
  		ereport(LOG,
  				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
  				 errmsg("out of file descriptors: %m; release and retry")));
--- 2320,2325 ----
*** a/src/include/commands/copy.h
--- b/src/include/commands/copy.h
***************
*** 29,35 **** extern void DoCopy(ParseState *state, const CopyStmt *stmt,
  
  extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
  extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
! 			  bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
  extern void EndCopyFrom(CopyState cstate);
  extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
  			 Datum *values, bool *nulls, Oid *tupleOid);
--- 29,35 ----
  
  extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
  extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
! 			  bool is_program, copy_data_source_cb data_source_cb, bool allow_partial, List *attnamelist, List *options);
  extern void EndCopyFrom(CopyState cstate);
  extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
  			 Datum *values, bool *nulls, Oid *tupleOid);

Reply via email to