Hi, I added and edited some comments. > Sorry, It tha patch contains a silly bug. Please find the > attatched one.
regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From eb91a7c91e1fd3b24bf5bff0eb885f1c3d274637 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 5 Sep 2014 17:21:48 +0900 Subject: [PATCH] Simplly cutting off the socket if signalled during sending to client. --- src/backend/libpq/be-secure.c | 21 ++++++++++-- src/backend/libpq/pqcomm.c | 13 +++++++ src/backend/tcop/postgres.c | 71 ++++++++++++++++++++++------------------- src/include/libpq/libpq.h | 1 + 4 files changed, 70 insertions(+), 36 deletions(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 41ec1ad..3006697 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -145,11 +145,11 @@ secure_raw_read(Port *port, void *ptr, size_t len) { ssize_t n; - prepare_for_client_read(); + prepare_for_client_comm(); n = recv(port->sock, ptr, len, 0); - client_read_ended(); + client_comm_ended(); return n; } @@ -178,5 +178,20 @@ secure_write(Port *port, void *ptr, size_t len) ssize_t secure_raw_write(Port *port, const void *ptr, size_t len) { - return send(port->sock, ptr, len, 0); + ssize_t n; + + /* + * If we get interrupted during send under execution without blocking, + * processing interrupt immediately actually throws away the chance to + * complete sending the bytes handed, but the chance which we could send + * one more tuple or maybe the final bytes has less not significance than + * the risk that we might can't bail out forever due to blocking send. + */ + prepare_for_client_comm(); + + n = send(port->sock, ptr, len, 0); + + client_comm_ended(); + + return n; } diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 605d891..9b08529 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -1343,6 +1343,19 @@ pq_is_send_pending(void) } /* -------------------------------- + * pq_is_busy - is there any I/O command running? + * + * This function is intended for use within signal handlers to check if + * any pqcomm I/O operation is under execution. + * -------------------------------- + */ +bool +pq_is_busy(void) +{ + return PqCommBusy; +} + +/* -------------------------------- * Message-level I/O routines begin here. * * These routines understand about the old-style COPY OUT protocol. diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7b5480f..b29b200 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -303,16 +303,16 @@ InteractiveBackend(StringInfo inBuf) * * Even though we are not reading from a "client" process, we still want to * respond to signals, particularly SIGTERM/SIGQUIT. Hence we must use - * prepare_for_client_read and client_read_ended. + * prepare_for_client_comm and client_comm_ended. */ static int interactive_getc(void) { int c; - prepare_for_client_read(); + prepare_for_client_comm(); c = getc(stdin); - client_read_ended(); + client_comm_ended(); return c; } @@ -487,53 +487,47 @@ ReadCommand(StringInfo inBuf) } /* - * prepare_for_client_read -- set up to possibly block on client input + * prepare_for_client_comm -- set up to possibly block on client communication * - * This must be called immediately before any low-level read from the - * client connection. It is necessary to do it at a sufficiently low level - * that there won't be any other operations except the read kernel call - * itself between this call and the subsequent client_read_ended() call. + * This must be called immediately before any low-level read from or write to + * the client connection. It is necessary to do it at a sufficiently low + * level that there won't be any other operations except the read/write kernel + * call itself between this call and the subsequent client_comm_ended() call. * In particular there mustn't be use of malloc() or other potentially - * non-reentrant libc functions. This restriction makes it safe for us - * to allow interrupt service routines to execute nontrivial code while - * we are waiting for input. + * non-reentrant libc functions. This restriction makes it safe for us to + * allow interrupt service routines to execute nontrivial code while we are + * waiting for input or blocking of output. */ void -prepare_for_client_read(void) +prepare_for_client_comm(void) { - if (DoingCommandRead) - { - /* Enable immediate processing of asynchronous signals */ - EnableNotifyInterrupt(); - EnableCatchupInterrupt(); + /* Enable immediate processing of asynchronous signals */ + EnableNotifyInterrupt(); + EnableCatchupInterrupt(); - /* Allow cancel/die interrupts to be processed while waiting */ - ImmediateInterruptOK = true; + /* Allow cancel/die interrupts to be processed while waiting */ + ImmediateInterruptOK = true; - /* And don't forget to detect one that already arrived */ - CHECK_FOR_INTERRUPTS(); - } + /* And don't forget to detect one that already arrived */ + CHECK_FOR_INTERRUPTS(); } /* - * client_read_ended -- get out of the client-input state + * client_comm_ended -- get out of the client-communicating state * - * This is called just after low-level reads. It must preserve errno! + * This is called just after low-level reads/writes. It must preserve errno! */ void -client_read_ended(void) +client_comm_ended(void) { - if (DoingCommandRead) - { - int save_errno = errno; + int save_errno = errno; - ImmediateInterruptOK = false; + ImmediateInterruptOK = false; - DisableNotifyInterrupt(); - DisableCatchupInterrupt(); + DisableNotifyInterrupt(); + DisableCatchupInterrupt(); - errno = save_errno; - } + errno = save_errno; } @@ -2594,6 +2588,17 @@ die(SIGNAL_ARGS) if (ImmediateInterruptOK && InterruptHoldoffCount == 0 && CritSectionCount == 0) { + if (pq_is_busy() && !DoingCommandRead) + { + /* + * Getting here indicates that we have interrupted during a + * data block is under sending to the client, so cut off the + * connection immediately not to send any more bytes which + * should cause protocol violation. + */ + close(MyProcPort->sock); + whereToSendOutput = DestNone; + } /* bump holdoff count to make ProcessInterrupts() a no-op */ /* until we are done getting ready for it */ InterruptHoldoffCount++; diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 5da9d8d..c3fc5f3 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -62,6 +62,7 @@ extern int pq_putbytes(const char *s, size_t len); extern int pq_flush(void); extern int pq_flush_if_writable(void); extern bool pq_is_send_pending(void); +extern bool pq_is_busy(void); extern int pq_putmessage(char msgtype, const char *s, size_t len); extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len); extern void pq_startcopyout(void); -- 1.7.1
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers