Alvaro Herrera wrote:
Hmm, I remember eyeballing that code.  Would you mind sending me an URL
to that file, or something?  Or maybe send me the files themselves?


Sure, here's a patch against current CVS. Please remove all the functions referencing to "buffer" and "buffer.h" to compile.

Remember that it's a work in progress thing. It has flaws. One issue that currently bugs me is, that processes can deadlock if they keep trying to create a message (IMessagesCreate), but fail because the queue is full of messages for themselves. A process should thus always try to fetch messages (via IMessagesCheck) and remove pending ones before retrying to send one. That's not always practical.

One design limitation is, that you have to know how large your message is as soon as you reserve (shared) memory for it, but that's intended.

At least I've stress tested the wrap-around code and it seems to work. No guarantees, though ;-)

Regards

Markus

# 
# old_revision [9a68fa59cb0ca3246f03880664062abb98f1a61a]
# 
# add_file "src/backend/storage/ipc/imsg.c"
#  content [3e84c6372a47612a2fe233fee6b122808135580e]
# 
# add_file "src/include/storage/imsg.h"
#  content [3cf37b12a00b90f65b8393fc5e27c98d772dc22b]
# 
# patch "src/backend/storage/ipc/Makefile"
#  from [71276ab6483aebbb27f87c988d77ab876611f190]
#    to [9a99101d3e8bbfe52c97763db536804e94371828]
# 
# patch "src/backend/storage/ipc/ipci.c"
#  from [177f266b4668190a6ab1f2902305f7b7e577ef8d]
#    to [1971e2122ba4455c8b9784e70059d917fdf4f4c8]
# 
============================================================
--- src/backend/storage/ipc/imsg.c	3e84c6372a47612a2fe233fee6b122808135580e
+++ src/backend/storage/ipc/imsg.c	3e84c6372a47612a2fe233fee6b122808135580e
@@ -0,0 +1,375 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#include <sys/ioctl.h>
+
+#include "postgres.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "storage/imsg.h"
+#include "storage/ipc.h"
+#include "storage/buffer.h"
+#include "storage/spin.h"
+#include "utils/elog.h"
+
+/* global variable pointing to the shmem area */
+IMessageCtlData *IMessageCtl = NULL;
+
+/*
+ * Initialization of shared memory for internal messages.
+ */
+int
+IMessageShmemSize(void)
+{
+	return MAXALIGN(IMessageBufferSize);
+}
+
+void
+IMessageShmemInit(void)
+{
+	bool		foundIMessageCtl;
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+#endif
+
+	IMessageCtl = (IMessageCtlData *)
+		ShmemInitStruct("IMsgCtl",
+						MAXALIGN(IMessageBufferSize),
+						&foundIMessageCtl);
+
+	if (foundIMessageCtl)
+		return;
+
+	/* empty the control structure and all message descriptors */
+	memset(IMessageCtl, 0, MAXALIGN(IMessageBufferSize));
+
+	/* initialize start and end pointers */
+	IMessageCtl->queue_start = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+	IMessageCtl->queue_end = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+
+	SpinLockInit(&IMessageCtl->msgs_lck);
+}
+
+/*
+ *   IMessageCreate
+ *
+ * creates a new but deactivated message within the queue, returning the
+ * message header of the newly created message.
+ */
+IMessage*
+IMessageCreate(int recipient, int msg_size)
+{
+	IMessage	   *msg;
+	int				remaining_space;
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageCreate(): recipient: %d, size: %d",
+		recipient, msg_size);
+#endif
+
+	/* assert a reasonable maximum message size */
+	Assert(msg_size < (MAXALIGN(IMessageBufferSize) / 4));
+
+	START_CRIT_SECTION();
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+		SpinLockAcquire(&imsgctl->msgs_lck);
+
+		/*
+		 * Check if there is enough space for the message plus the
+		 * terminating header
+		 */
+		if (imsgctl->queue_end < imsgctl->queue_start)
+			remaining_space = (int) imsgctl->queue_start -
+							  (int) imsgctl->queue_end;
+		else
+			remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+							  (int) imsgctl->queue_end;
+
+		if (remaining_space < (MAXALIGN(IMessageBufferSize) / 8))
+		{
+#ifdef IMSG_DEBUG
+			elog(DEBUG3, "IMessageCreate(): cleanup starting");
+#endif
+
+			/* Clean up messages that have been removed. */
+			while (imsgctl->queue_start->recipient == 0)
+			{
+				if (imsgctl->queue_start > imsgctl->queue_end)
+				{
+					if ((imsgctl->queue_start->sender == 0) &&
+						(imsgctl->queue_start->recipient == 0))
+					{
+#ifdef IMSG_DEBUG
+			elog(DEBUG3, "IMessageCreate(): cleanup wrapped");
+#endif
+						imsgctl->queue_start = (IMessage*) IMSG_BUFFER_START(imsgctl);
+						continue;
+					}
+				}
+				else if (imsgctl->queue_start >= imsgctl->queue_end)
+					break;
+
+				imsgctl->queue_start = (IMessage*) (
+					(int) imsgctl->queue_start +
+					IMSG_ALIGN(imsgctl->queue_start->size +
+							   sizeof(IMessage)));
+			}
+
+			/* recalc remainig space */
+			if (imsgctl->queue_end < imsgctl->queue_start)
+				remaining_space = (int) imsgctl->queue_start -
+								  (int) imsgctl->queue_end;
+			else
+				remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+								  (int) imsgctl->queue_end;
+
+		}
+
+		if (IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) < remaining_space)
+		{
+			msg = (IMessage*) imsgctl->queue_end;
+			imsgctl->queue_end = (IMessage*) ((int) imsgctl->queue_end + 
+								 IMSG_ALIGN(msg_size + sizeof(IMessage)));
+		}
+		else
+		{
+			remaining_space = (int) imsgctl->queue_start -
+							  (int) IMSG_BUFFER_START(imsgctl);
+#ifdef IMSG_DEBUG
+			elog(DEBUG5, "IMessageCreate:    remaining wrap space: %d",
+				 remaining_space);
+#endif
+
+			/* There is not enough space. But maybe we can wrap around? */
+			if ((imsgctl->queue_end >= imsgctl->queue_start) &&
+				((int) IMSG_BUFFER_START(imsgctl) +
+				IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) <
+				(int) imsgctl->queue_start))
+			{
+				/* Yes, wrap around */
+#ifdef IMSG_DEBUG
+				elog(DEBUG5, "IMessageCreate: wrapped around.");
+#endif
+				msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+				imsgctl->queue_end = (IMessage*) ((int) msg +
+									IMSG_ALIGN(msg_size + sizeof(IMessage)));
+			}
+			else
+			{
+				/* TODO: correct error handling here... */
+				elog(ERROR, "Not enough space within IMessages buffer.");
+				SpinLockRelease(&imsgctl->msgs_lck);
+				return NULL;
+			}
+		}
+
+		/* initialize the message as inactive */
+		msg->sender = 0;
+		msg->recipient = recipient;
+		msg->size = msg_size;
+
+		/* clean the following block */
+		imsgctl->queue_end->sender = 0;
+		imsgctl->queue_end->recipient = 0;
+
+		/* queue editing finished */
+		SpinLockRelease(&imsgctl->msgs_lck);
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageCreate(): created at %08X size: %d (next: %08X)",
+		 (int) msg, msg->size, (unsigned int) imsgctl->queue_end);
+#endif
+	}
+	END_CRIT_SECTION();
+
+	return msg;
+}
+
+void
+IMessageForward(IMessage *msg, int new_recipient)
+{
+	msg->recipient = new_recipient;
+	msg->sender = 0;
+
+	IMessageActivate(msg);
+}
+
+void
+IMessageActivate(IMessage *msg)
+{
+	msg->sender = MyProcPid;
+
+	/* TODO: use PGPROC to determine if the recipient wants to be signaled,
+	 *       probably we can save that signaling step in certain occasions.
+	 */
+
+	/* send a signal to the recipient */
+	kill(msg->recipient, SIGUSR1);
+}
+
+/*
+ *   IMessageRemove
+ *
+ * Marks a message as removable by setting the recipient to null. The message
+ * will eventually be removed during creation of new messages, see
+ * IMessageCreate().
+ */
+void
+IMessageRemove(IMessage *msg)
+{
+	msg->recipient = 0;
+}
+
+/*
+ *   IMessageCheck
+ *
+ * Checks if there is a message in the queue for this process. Returns null
+ * if there is no message for this process, the message header otherwise. The
+ * message remains in the queue and should be removed by IMessageRemove().
+ */
+IMessage*
+IMessageCheck(void)
+{
+	IMessage	   *msg,
+				   *res;
+
+	res = NULL;
+	START_CRIT_SECTION();
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+		SpinLockAcquire(&imsgctl->msgs_lck);
+
+		/* Loop through the queue from the start. Wraping might be
+		 * required */
+		msg = imsgctl->queue_start;
+		while (1)
+		{
+			if (((int) msg >= (int) imsgctl->queue_start) &&
+				((int) imsgctl->queue_start > (int) imsgctl->queue_end))
+			{
+				if ((msg->sender == 0) &&
+					(msg->recipient == 0))
+				{
+					msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+					continue;
+				}
+			}
+			else if (msg >= imsgctl->queue_end)
+				break;
+
+			if ((msg->sender != 0) && (msg->recipient == MyProcPid))
+			{
+				res = msg;
+				break;
+			}
+ 
+			msg = (IMessage*) ((int) msg +
+					IMSG_ALIGN(msg->size + sizeof(IMessage)));
+		}
+
+		SpinLockRelease(&imsgctl->msgs_lck);
+	}
+	END_CRIT_SECTION();
+
+#ifdef IMSG_DEBUG
+	if (res == NULL)
+		elog(DEBUG3, "IMessageCheck(): no new message for %d.", MyProcPid);
+	else
+		elog(DEBUG3, "IMessageCheck(): new message of size %d for %d.",
+				msg->size, MyProcPid);
+#endif
+
+	return res;
+}
+
+/*
+ *   IMessageAwait
+ *
+ * Waits for a message but leaves the message in the queue.
+ */
+IMessage*
+IMessageAwait(void)
+{
+	IMessage	   *msg;
+	struct timeval	tv;
+
+	msg = IMessageCheck();
+	while (!msg)
+	{
+		/*
+		 * TODO: we want to wait for signals here. Check if select() is
+		 * appropriate. Maybe pause() is better, but how about portability?
+		 * However, make sure we have a timeout here, since we could
+		 * probably miss a signal.
+		 */
+		tv.tv_sec = 2;
+		tv.tv_usec = 0;
+		select(1, NULL, NULL, NULL, &tv);
+		// pause();
+		msg = IMessageCheck();
+	}
+
+	return msg;
+}
+
+/*
+ *   IMessageGetReadBuffer
+ *
+ * gets a readable buffer for the given message
+ */
+buffer *
+IMessageGetReadBuffer(IMessage *msg)
+{
+	buffer *b = palloc(sizeof(buffer));
+
+	Assert(msg);
+	Assert(msg->size > 0);
+
+	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+	b->fill_size = msg->size;
+
+	return b;
+}
+
+/*
+ *   IMessageGetWriteBuffer
+ *
+ * gets a writeable buffer for the given message
+ */
+buffer *
+IMessageGetWriteBuffer(IMessage *msg)
+{
+	buffer *b = palloc(sizeof(buffer));
+
+	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+
+	return b;
+}
+
+void
+IMessageFreeBuffer(buffer *b)
+{
+	pfree(b);
+}
============================================================
--- src/include/storage/imsg.h	3cf37b12a00b90f65b8393fc5e27c98d772dc22b
+++ src/include/storage/imsg.h	3cf37b12a00b90f65b8393fc5e27c98d772dc22b
@@ -0,0 +1,85 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef IMSG_H
+#define IMSG_H
+
+#include <sys/types.h>
+#include "storage/spin.h"
+#include "storage/buffer.h"
+
+/* TODO: replace with GUC variable to be configurable */
+#define IMessageBufferSize 8388608		/* 8 MB */
+
+/* alignment for messages (8 bytes) */
+#define IMSG_ALIGN(size) (((size) + 7) & 0xFFFFFFF8)
+
+/* for convinience to buffer access */
+#define IMSG_BUFFER_START(imsgctl) ((int) \
+			(IMSG_ALIGN((int) imsgctl + sizeof(IMessageCtlData))))
+
+#define IMSG_BUFFER_END(imsgctl) ((int) \
+			(IMSG_ALIGN((int) imsgctl + MAXALIGN(IMessageBufferSize))))
+
+/* get a data pointer from the header */
+#define IMSG_DATA(imsg) ((void*) ((int) imsg + sizeof(IMessage)))
+
+/*
+ * Message descriptor in front of the message
+ */
+typedef struct
+{
+	/* pid of the sender, null means not yet activated message */
+	pid_t		sender;
+
+	/* pid of the recipient, null meaning has already been received */
+	pid_t		recipient;
+
+	/* message size following, but not including this header */
+	int			size;
+} IMessage;
+
+/*
+ * shared-memory pool for internal messages.
+ */
+typedef struct
+{
+	/* currently active messages */
+	unsigned int		count_messages;
+
+	/* start of messages within the cycling queue */
+	IMessage		   *queue_start;
+
+	/* next free place, just after the last message */
+	IMessage		   *queue_end;
+
+	/* lock for editing the message queue */
+	slock_t				msgs_lck;
+} IMessageCtlData;
+
+/* the global variable storing pointer to the shared memory area */
+extern IMessageCtlData *RmgrCtl;
+
+/* routines to send and receive internal messages */
+extern int IMessageShmemSize(void);
+extern void IMessageShmemInit(void);
+extern IMessage* IMessageCreate(int recipient, int msg_size);
+extern void IMessageForward(IMessage *msg, int new_recipient);
+extern void IMessageActivate(IMessage *msg);
+extern void IMessageRemove(IMessage *msg);
+extern IMessage* IMessageCheck(void);
+extern IMessage* IMessageAwait(void);
+
+extern buffer *IMessageGetReadBuffer(IMessage *msg);
+extern buffer *IMessageGetWriteBuffer(IMessage *msg);
+extern void IMessageFreeBuffer(buffer *b);
+
+#endif   /* IMSG_H */
============================================================
--- src/backend/storage/ipc/Makefile	71276ab6483aebbb27f87c988d77ab876611f190
+++ src/backend/storage/ipc/Makefile	9a99101d3e8bbfe52c97763db536804e94371828
@@ -16,7 +16,7 @@ OBJS = ipc.o ipci.o pmsignal.o procarray
 endif
 
 OBJS = ipc.o ipci.o pmsignal.o procarray.o shmem.o shmqueue.o \
-	sinval.o sinvaladt.o
+	sinval.o sinvaladt.o imsg.o buffer.o
 
 all: SUBSYS.o
 
============================================================
--- src/backend/storage/ipc/ipci.c	177f266b4668190a6ab1f2902305f7b7e577ef8d
+++ src/backend/storage/ipc/ipci.c	1971e2122ba4455c8b9784e70059d917fdf4f4c8
@@ -24,6 +24,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "storage/freespace.h"
+#include "storage/imsg.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
 #include "storage/pmsignal.h"
@@ -110,6 +111,7 @@ CreateSharedMemoryAndSemaphores(bool mak
 		size = add_size(size, FreeSpaceShmemSize());
 		size = add_size(size, BgWriterShmemSize());
 		size = add_size(size, BTreeShmemSize());
+		size = add_size(size, IMessageShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -178,6 +180,7 @@ CreateSharedMemoryAndSemaphores(bool mak
 	SUBTRANSShmemInit();
 	TwoPhaseShmemInit();
 	MultiXactShmemInit();
+	IMessageShmemInit();
 	InitBufferPool();
 
 	/*
---------------------------(end of broadcast)---------------------------
TIP 7: You can help support the PostgreSQL project by donating at

                http://www.postgresql.org/about/donate

Reply via email to