2015-09-07 11:55 GMT+02:00 Shulgin, Oleksandr <oleksandr.shul...@zalando.de>
:

> On Fri, Sep 4, 2015 at 6:11 AM, Pavel Stehule <pavel.steh...@gmail.com>
> wrote:
>
>> Sorry, but I still don't see how the slots help this issue - could you
>>> please elaborate?
>>>
>> with slot (or some similiar) there is not global locked resource. If I'll
>> have a time at weekend I'll try to write some prototype.
>>
>
> But you will still lock on the slots list to find an unused one.  How is
> that substantially different from what I'm doing?
>

It is not necessary - you can use similar technique to what it does PGPROC.
I am sending "lock free" demo.

I don't afraid about locks - short locks, when the range and time are
limited. But there are lot of bugs, and fixes with the name "do
interruptible some", and it is reason, why I prefer typical design for work
with shared memory.


> >> Other smaller issues:
>>> >>
>>> >> * probably sending line by line is useless - shm_mq_send can pass
>>> bigger data when nowait = false
>>>
>>> I'm not sending it like that because of the message size - I just find
>>> it more convenient. If you think it can be problematic, its easy to do this
>>> as before, by splitting lines on the receiving side.
>>>
>> Yes, shm queue sending data immediately - so slicing on sender generates
>> more interprocess communication
>>
>
> Well, we are talking about hundreds to thousands bytes per plan in total.
> And if my reading of shm_mq implementation is correct, if the message fits
> into the shared memory buffer, the receiver gets the direct pointer to the
> shared memory, no extra allocation/copy to process-local memory.  So this
> can be actually a win.
>

you have to calculate with signals and interprocess communication. the cost
of memory allocation is not all.

> >> * pg_usleep(1000L); - it is related to single point resource
>>>
>>> But not a highly concurrent one.
>>>
>> I believe so it is not becessary - waiting (sleeping) can be deeper in
>> reading from queue - the code will be cleaner
>>
>
> The only way I expect this line to be reached is when a concurrent
> pg_cmdstatus() call is in progress: the receiving backend has set the
> target_pid and has created the queue, released the lock and now waits to
> read something from shm_mq.  So the backend that's trying to also use this
> communication channel can obtain the lwlock, checks if the channel is not
> used at the time, fails and then it needs to check again, but that's going
> to put a load on the CPU, so there's a small sleep.
>
> The real problem could be if the process that was signaled to connect to
> the message queue never handles the interrupt, and we keep waiting forever
> in shm_mq_receive().  We could add a timeout parameter or just let the user
> cancel the call: send a cancellation request, use pg_cancel_backend() or
> set statement_timeout before running this.
>

This is valid question - for begin we can use a statement_timeout and we
don't need to design some special (if you don't hold some important lock).

My example (the code has prototype quality) is little bit longer, but it
work without global lock - the requester doesn't block any other

Pavel


>
> --
> Alex
>
>
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
new file mode 100644
index 32ac58f..f5db55a
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
***************
*** 43,48 ****
--- 43,49 ----
  #include "storage/procsignal.h"
  #include "storage/sinvaladt.h"
  #include "storage/spin.h"
+ #include "utils/signal_demo.h"
  
  
  shmem_startup_hook_type shmem_startup_hook = NULL;
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 139,144 ****
--- 140,146 ----
  		size = add_size(size, BTreeShmemSize());
  		size = add_size(size, SyncScanShmemSize());
  		size = add_size(size, AsyncShmemSize());
+ 		size = add_size(size, ProcPipeShmemSize());
  #ifdef EXEC_BACKEND
  		size = add_size(size, ShmemBackendArraySize());
  #endif
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 243,248 ****
--- 245,251 ----
  	ReplicationOriginShmemInit();
  	WalSndShmemInit();
  	WalRcvShmemInit();
+ 	ProcPipeShmemInit();
  
  	/*
  	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
new file mode 100644
index 0abde43..c28b44f
*** a/src/backend/storage/ipc/procsignal.c
--- b/src/backend/storage/ipc/procsignal.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "storage/sinval.h"
  #include "tcop/tcopprot.h"
  
+ #include "utils/signal_demo.h"
  
  /*
   * The SIGUSR1 signal is multiplexed to support signalling multiple event
*************** bool		set_latch_on_sigusr1;
*** 69,75 ****
  
  static ProcSignalSlot *ProcSignalSlots = NULL;
  static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
! 
  static bool CheckProcSignal(ProcSignalReason reason);
  static void CleanupProcSignalState(int status, Datum arg);
  
--- 70,76 ----
  
  static ProcSignalSlot *ProcSignalSlots = NULL;
  static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
!  
  static bool CheckProcSignal(ProcSignalReason reason);
  static void CleanupProcSignalState(int status, Datum arg);
  
*************** procsignal_sigusr1_handler(SIGNAL_ARGS)
*** 296,301 ****
--- 297,305 ----
  	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
  		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
  
+ 	if (CheckProcSignal(PROCSIG_SEND_INFO))
+ 		SendProcessInfoInterrupt();
+ 
  	if (set_latch_on_sigusr1)
  		SetLatch(MyLatch);
  
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
new file mode 100644
index d917af3..51701b8
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
*************** ProcessInterrupts(void)
*** 2991,2996 ****
--- 2991,2999 ----
  
  	if (ParallelMessagePending)
  		HandleParallelMessages();
+ 
+ 	if (SendProcessInfoPending)
+ 		HandleSendProcessInfo();
  }
  
  
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
new file mode 100644
index 3ed0b44..864970c
*** a/src/backend/utils/adt/Makefile
--- b/src/backend/utils/adt/Makefile
*************** OBJS = acl.o arrayfuncs.o array_expanded
*** 36,42 ****
  	tsquery_op.o tsquery_rewrite.o tsquery_util.o tsrank.o \
  	tsvector.o tsvector_op.o tsvector_parser.o \
  	txid.o uuid.o varbit.o varchar.o varlena.o version.o \
! 	windowfuncs.o xid.o xml.o
  
  like.o: like.c like_match.c
  
--- 36,42 ----
  	tsquery_op.o tsquery_rewrite.o tsquery_util.o tsrank.o \
  	tsvector.o tsvector_op.o tsvector_parser.o \
  	txid.o uuid.o varbit.o varchar.o varlena.o version.o \
! 	windowfuncs.o xid.o xml.o signal_demo.o
  
  like.o: like.c like_match.c
  
diff --git a/src/backend/utils/adt/signal_demo.c b/src/backend/utils/adt/signal_demo.c
new file mode 100644
index ...5de50b5
*** a/src/backend/utils/adt/signal_demo.c
--- b/src/backend/utils/adt/signal_demo.c
***************
*** 0 ****
--- 1,411 ----
+ #include "postgres.h"
+ 
+ #include "miscadmin.h"
+ 
+ #include "storage/backendid.h"
+ #include "storage/ipc.h"
+ #include "storage/procarray.h"
+ #include "storage/shm_mq.h"
+ #include "storage/shm_toc.h"
+ #include "utils/builtins.h"
+ #include "utils/resowner.h"
+ #include "utils/signal_demo.h"
+ 
+ #include "storage/dsm.h"
+ 
+ 
+ typedef struct
+ {
+ 	pid_t			pss_pid;
+ 	pid_t			sender_pss_pid;
+ 	BackendId		sender_backendId;
+ 	dsm_handle		dsm_seg_handle;
+ 	int			data_tag;
+ 	bool		is_valid;
+ 	bool		is_in_process;
+ 	bool		is_processed;
+ } ProcPipeSlot;
+ 
+ 
+ #define NumProcPipeSlots	(MaxBackends + NUM_AUXPROCTYPES)
+ 
+ #define PG_SEND_PROCESS_INFO_MAGIC 0x79fb2449
+ #define SEND_INFO_DEMO_TAG		1
+ 
+ static ProcPipeSlot *ProcPipeSlots = NULL;
+ static volatile ProcPipeSlot *MyProcPipeSlot = NULL;
+ 
+ volatile bool	SendProcessInfoPending = false;
+ 
+ static char *prepare_demo_data(int length, bool is_active_stmt);
+ 
+ 
+ Size
+ ProcPipeShmemSize(void)
+ {
+ 	return NumProcPipeSlots * sizeof(ProcPipeSlot);
+ }
+ 
+ void
+ ProcPipeShmemInit(void)
+ {
+ 	Size		size = ProcPipeShmemSize();
+ 	bool		found;
+ 
+ 	ProcPipeSlots = (ProcPipeSlot *)
+ 		ShmemInitStruct("ProcPipeSlots", size, &found);
+ 
+ 	if (!found)
+ 		MemSet(ProcPipeSlots, 0, size);
+ }
+ 
+ static void
+ CleanupProcPipeSlot(int status, Datum arg)
+ {
+ 	int		pss_idx = DatumGetInt32(arg);
+ 	volatile ProcPipeSlot *slot;
+ 
+ 	slot = &ProcPipeSlots[pss_idx - 1];
+ 	Assert(slot == MyProcPipeSlot);
+ 
+ 	MyProcPipeSlot = NULL;
+ 
+ 	if (slot->pss_pid != MyProcPid)
+ 	{
+ 		elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d",
+ 			 MyProcPid, pss_idx, (int) slot->pss_pid);
+ 		return;					/* XXX better to zero the slot anyway? */
+ 	}
+ 
+ 	slot->pss_pid = 0;
+ 	slot->is_valid = false;
+ }
+ 
+ void
+ ProcPipeInit(int pss_idx)
+ {
+ 	volatile ProcPipeSlot *slot;
+ 
+ 	slot = &ProcPipeSlots[pss_idx - 1];
+ 
+ 	if (slot->pss_pid != 0)
+ 		elog(LOG, "process %d taking over ProcPipe slot %d, but it's not empty",
+ 				 MyProcPid, pss_idx);
+ 
+ 	slot->pss_pid = MyProcPid;
+ 	slot->is_valid = false;
+ 
+ 	MyProcPipeSlot = slot;
+ 
+ 	on_shmem_exit(CleanupProcPipeSlot, Int32GetDatum(pss_idx));
+ }
+ 
+ /*
+  * Send request to data to targetted process. Result is taken from SendProcSignal
+  */
+ int
+ SendProcessInfoSignal(pid_t data_sender_pid, BackendId data_sender_backendId,
+ 		    dsm_handle dsm_seg_handle, int data_tag)
+ {
+ 	MyProcPipeSlot->sender_pss_pid = data_sender_pid;
+ 	MyProcPipeSlot->sender_backendId = data_sender_backendId;
+ 	MyProcPipeSlot->dsm_seg_handle = dsm_seg_handle;
+ 	MyProcPipeSlot->data_tag = data_tag;
+ 	MyProcPipeSlot->is_valid = true;
+ 	MyProcPipeSlot->is_in_process = false;
+ 	MyProcPipeSlot->is_processed = false;
+ 
+ 	return SendProcSignal(data_sender_pid, PROCSIG_SEND_INFO, data_sender_backendId);
+ }
+ 
+ void
+ InvalidateProcPipeSlot(void)
+ {
+ 	MyProcPipeSlot->is_valid = false;
+ }
+ 
+ bool
+ is_valid_result(void)
+ {
+ 	return MyProcPipeSlot->is_valid;
+ }
+ 
+ 
+ /*
+  * signal is handled quickly - don't do heavy work here
+  */
+ void
+ SendProcessInfoInterrupt(void)
+ {
+ 	volatile ProcPipeSlot *slot;
+ 	int		i;
+ 
+ 	for (i = NumProcPipeSlots - 1; i >= 0; i--)
+ 	{
+ 		slot = &ProcPipeSlots[i];
+ 		if (slot->sender_pss_pid == MyProcPid && slot->is_valid
+ 				 && !(slot->is_processed || slot->is_in_process))
+ 		{
+ 			SendProcessInfoPending = true;
+ 			InterruptPending = true;
+ 
+ 			SetLatch(MyLatch);
+ 
+ 			return;
+ 		}
+ 	}
+ }
+ 
+ static void
+ RestoreResourceOwner(ResourceOwner prevResourceOwner)
+ {
+ 	if (prevResourceOwner != CurrentResourceOwner)
+ 	{
+ 		ResourceOwner deleted_ro = CurrentResourceOwner;
+ 
+ 		CurrentResourceOwner = prevResourceOwner;
+ 		ResourceOwnerDelete(deleted_ro);
+ 	}
+ }
+ 
+ void
+ HandleSendProcessInfo(void)
+ {
+ 	volatile ProcPipeSlot *slot;
+ 
+ 	while (SendProcessInfoPending)
+ 	{
+ 		int		i;
+ 
+ 		SendProcessInfoPending = false;
+ 
+ 		for (i = NumProcPipeSlots - 1; i >= 0; i--)
+ 		{
+ 			slot = &ProcPipeSlots[i];
+ 
+ 			if (slot->sender_pss_pid == MyProcPid && slot->is_valid && !slot->is_processed)
+ 			{
+ 				ResourceOwner prevRO = CurrentResourceOwner;
+ 
+ 				dsm_segment	*seg = NULL;
+ 				shm_toc		*toc = NULL;
+ 				shm_mq_handle		*output;
+ 				shm_mq			*mq;
+ 				int		length, *length_ptr;
+ 
+ 				/* fast leave when tag is unsupported */
+ 				if (slot->data_tag != SEND_INFO_DEMO_TAG)
+ 				{
+ 					slot->is_processed = true;
+ 					slot->is_valid = false;
+ 					continue;
+ 				}
+ 
+ 				/* Ensure valid resource owner for access to dsm */
+ 				if (CurrentResourceOwner == NULL)
+ 					CurrentResourceOwner = ResourceOwnerCreate(NULL, "HandlerSendProcessInfo");
+ 
+ 				seg = dsm_attach(slot->dsm_seg_handle);
+ 				if (seg == NULL)
+ 				{
+ 					elog(LOG, "unable map dynamic memory segment");
+ 					slot->is_processed = true;
+ 					slot->is_valid = false;
+ 
+ 					RestoreResourceOwner(prevRO);
+ 
+ 					continue;
+ 				}
+ 
+ 				toc = shm_toc_attach(PG_SEND_PROCESS_INFO_MAGIC, dsm_segment_address(seg));
+ 				if (toc == NULL)
+ 				{
+ 					elog(LOG, "bad magic number in passed dynamic memory segment");
+ 
+ 					dsm_detach(seg);
+ 					slot->is_processed = true;
+ 					slot->is_valid = false;
+ 
+ 					RestoreResourceOwner(prevRO);
+ 
+ 					continue;
+ 				}
+ 
+ 				mq = shm_toc_lookup(toc, 0);
+ 				output = shm_mq_attach(mq, seg, NULL);
+ 
+ 				length_ptr = shm_toc_lookup(toc, 1);
+ 				length = *length_ptr;
+ 
+ 				/* do potentialy heavy and slow work */
+ 				PG_TRY();
+ 				{
+ 					char *data;
+ 					shm_mq_result		res;
+ 
+ 					data = prepare_demo_data(length, prevRO == CurrentResourceOwner);
+ 					res = shm_mq_send(output, length + 1, data, false);
+ 					if (res != SHM_MQ_SUCCESS)
+ 					{
+ 						elog(LOG, "unable to send data to recipient");
+ 
+ 						dsm_detach(seg);
+ 						slot->is_processed = true;
+ 						slot->is_valid = false;
+ 					}
+ 
+ 					slot->is_processed = true;
+ 					shm_mq_detach(mq);
+ 					dsm_detach(seg);
+ 
+ 					RestoreResourceOwner(prevRO);
+ 				}
+ 				PG_CATCH();
+ 				{
+ 					slot->is_processed = true;
+ 					slot->is_valid = false;
+ 
+ 					if (seg != NULL)
+ 						dsm_detach(seg);
+ 
+ 					RestoreResourceOwner(prevRO);
+ 
+ 					PG_RE_THROW();
+ 				}
+ 				PG_END_TRY();
+ 
+ 			}
+ 		}
+ 	}
+ }
+ 
+ /*
+  * generate some content that will be sended to recipient
+  */
+ static char *
+ prepare_demo_data(int length, bool is_active_stmt)
+ {
+ 	char *buffer, *ptr;
+ 	char *chars = "0123456789";
+ 	int padding = 0;
+ 
+ 	length = length < 100 ? 100 : length;
+ 
+ 	buffer = palloc(length + 1);
+ 
+ 	if (!is_active_stmt)
+ 	{
+ 		sprintf(buffer, "idle sender pid: %d", MyProcPid);
+ 		return buffer;
+ 	}
+ 
+ 	sprintf(buffer, "sender pid: %d\n", MyProcPid);
+ 
+ 	ptr = buffer + strlen(buffer);
+ 	length -= strlen(buffer);
+ 
+ 	while (length > 1)
+ 	{
+ 		*ptr++ = chars[padding++ % 10];
+ 
+ 		length--;
+ 		if (padding % 30 == 0)
+ 		{
+ 			*ptr++ = '\n';
+ 			length--;
+ 		}
+ 	}
+ 
+ 	*ptr++ = '#';
+ 	*ptr = '\0';
+ 
+ 	return buffer;
+ }
+ 
+ /*
+  * Send custom signal and returns string generated by signaled process.
+  */
+ Datum
+ signal_process(PG_FUNCTION_ARGS)
+ {
+ 	int		pid = PG_GETARG_INT32(0);
+ 	int		length = PG_GETARG_INT32(1);
+ 	dsm_segment		*seg;
+ 	shm_toc_estimator	e;
+ 	shm_toc			*toc;
+ 	shm_mq			*mq;
+ 	shm_mq_handle		*input;
+ 	shm_mq_result		res;
+ 	Size		segsize;
+ 	int		*param_ptr;
+ 	text		*result;
+ 	PGPROC	*proc;
+ 	Size		received_data_length;
+ 	char		*data;
+ 
+ 	if (pid == MyProcPid)
+ 		elog(ERROR, "cannot to send signal to self");
+ 
+ 	proc = BackendPidGetProc(pid);
+ 	if (proc == NULL)
+ 		elog(ERROR, "invalid pid");
+ 
+ 	if (length < 30)
+ 		elog(ERROR, "second parameter (length: %d) should be >= 30", length);
+ 
+ 
+ 	/* prepare shared dsm segment */
+ 	shm_toc_initialize_estimator(&e);
+ 	shm_toc_estimate_chunk(&e, 1024);
+ 	shm_toc_estimate_chunk(&e, sizeof(int));
+ 	shm_toc_estimate_keys(&e, 2);
+ 	segsize = shm_toc_estimate(&e);
+ 
+ 	seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
+ 	if (seg == NULL)
+ 		elog(ERROR, "cannot to create dsm segment");
+ 
+ 	PG_TRY();
+ 	{
+ 		toc = shm_toc_create(PG_SEND_PROCESS_INFO_MAGIC, dsm_segment_address(seg), segsize);
+ 		if (toc == NULL)
+ 			elog(ERROR, "cannot to create toc");
+ 
+ 		/* prepare basic structures */
+ 		mq = shm_mq_create(shm_toc_allocate(toc, 1024), 1024);
+ 		shm_mq_set_receiver(mq, MyProc);
+ 		shm_mq_set_sender(mq, proc);
+ 		shm_toc_insert(toc, 0, mq);
+ 
+ 		param_ptr = (int *) shm_toc_allocate(toc, sizeof(int));
+ 		*param_ptr = length;
+ 		shm_toc_insert(toc, 1, param_ptr);
+ 
+ 		if (SendProcessInfoSignal(pid, proc->backendId , dsm_segment_handle(seg), SEND_INFO_DEMO_TAG))
+ 			elog(ERROR, "could not send signal to process");
+ 
+ 		/* try to read from queue */
+ 		input = shm_mq_attach(mq, seg, NULL);
+ 		res = shm_mq_receive(input, &received_data_length, (void **) &data, false);
+ 		if (res != SHM_MQ_SUCCESS)
+ 			elog(ERROR, "could not receive message");
+ 
+ 		if (!is_valid_result())
+ 			elog(ERROR, "sender doesn't send valid data");
+ 
+ 		result = cstring_to_text_with_len(data, received_data_length - 1);
+ 
+ 		InvalidateProcPipeSlot();
+ 		shm_mq_detach(mq);
+ 		dsm_detach(seg);
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		InvalidateProcPipeSlot();
+ 		dsm_detach(seg);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	PG_RETURN_TEXT_P(result);
+ }
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
new file mode 100644
index 7b19714..5a36936
*** a/src/backend/utils/init/postinit.c
--- b/src/backend/utils/init/postinit.c
***************
*** 60,66 ****
  #include "utils/syscache.h"
  #include "utils/timeout.h"
  #include "utils/tqual.h"
! 
  
  static HeapTuple GetDatabaseTuple(const char *dbname);
  static HeapTuple GetDatabaseTupleByOid(Oid dboid);
--- 60,66 ----
  #include "utils/syscache.h"
  #include "utils/timeout.h"
  #include "utils/tqual.h"
! #include "utils/signal_demo.h"
  
  static HeapTuple GetDatabaseTuple(const char *dbname);
  static HeapTuple GetDatabaseTupleByOid(Oid dboid);
*************** InitPostgres(const char *in_dbname, Oid
*** 588,593 ****
--- 588,596 ----
  	/* Now that we have a BackendId, we can participate in ProcSignal */
  	ProcSignalInit(MyBackendId);
  
+ 	/* And same for ProcPipe */
+ 	ProcPipeInit(MyBackendId);
+ 
  	/*
  	 * Also set up timeout handlers needed for backend operation.  We need
  	 * these in every case except bootstrap.
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
new file mode 100644
index ddf7c67..ae25559
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 2171 ( pg_cancel_backe
*** 3134,3139 ****
--- 3134,3141 ----
  DESCR("cancel a server process' current query");
  DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
  DESCR("terminate a server process");
+ DATA(insert OID = 4066 ( signal_process		PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 25 "23 23" _null_ _null_ _null_ _null_ _null_ signal_process _null_ _null_ _null_ ));
+ DESCR("cancel a server process' current query");
  DATA(insert OID = 2172 ( pg_start_backup		PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ ));
  DESCR("prepare for taking an online backup");
  DATA(insert OID = 2173 ( pg_stop_backup			PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 3220 "" _null_ _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ ));
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
new file mode 100644
index e0cc69f..3b9a7bf
*** a/src/include/miscadmin.h
--- b/src/include/miscadmin.h
***************
*** 80,85 ****
--- 80,86 ----
  extern PGDLLIMPORT volatile bool InterruptPending;
  extern PGDLLIMPORT volatile bool QueryCancelPending;
  extern PGDLLIMPORT volatile bool ProcDiePending;
+ extern PGDLLIMPORT volatile bool SendProcessInfoPending;
  
  extern volatile bool ClientConnectionLost;
  
*************** extern void SwitchBackToLocalLatch(void)
*** 324,329 ****
--- 325,333 ----
  extern bool superuser(void);	/* current user is superuser */
  extern bool superuser_arg(Oid roleid);	/* given user is superuser */
  
+ /* utils/misc/signal_demo.c */
+ extern void HandleSendProcessInfo(void);
+ 
  
  /*****************************************************************************
   *	  pmod.h --																 *
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
new file mode 100644
index af1a0cd..0d692a7
*** a/src/include/storage/procsignal.h
--- b/src/include/storage/procsignal.h
*************** typedef enum
*** 32,37 ****
--- 32,38 ----
  	PROCSIG_CATCHUP_INTERRUPT,	/* sinval catchup interrupt */
  	PROCSIG_NOTIFY_INTERRUPT,	/* listen/notify interrupt */
  	PROCSIG_PARALLEL_MESSAGE,	/* message from cooperating parallel backend */
+ 	PROCSIG_SEND_INFO,		/* internal info request */
  
  	/* Recovery conflict reasons */
  	PROCSIG_RECOVERY_CONFLICT_DATABASE,
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
new file mode 100644
index fc1679e..7310dc0
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
*************** extern Datum pg_prepared_statement(PG_FU
*** 1264,1267 ****
--- 1264,1270 ----
  /* utils/mmgr/portalmem.c */
  extern Datum pg_cursor(PG_FUNCTION_ARGS);
  
+ /* utils/signal_demo.c */
+ extern Datum signal_process(PG_FUNCTION_ARGS);
+ 
  #endif   /* BUILTINS_H */
diff --git a/src/include/utils/signal_demo.h b/src/include/utils/signal_demo.h
new file mode 100644
index ...0c951c4
*** a/src/include/utils/signal_demo.h
--- b/src/include/utils/signal_demo.h
***************
*** 0 ****
--- 1,14 ----
+ #include "storage/dsm.h"
+ 
+ extern Size ProcPipeShmemSize(void);
+ extern void ProcPipeShmemInit(void);
+ extern void ProcPipeInit(int pss_idx);
+ 
+ extern int SendProcessInfoSignal(pid_t data_sender_pid, BackendId data_sender_backendId,
+ 		    dsm_handle dsm_seg_handle, int data_tag);
+ 
+ extern void SendProcessInfoInterrupt(void);
+ extern void HandleSendProcessInfo(void);
+ 
+ extern void InvalidateProcPipeSlot(void);
+ extern bool is_valid_result(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to