Alvaro Herrera wrote:
Hmm, I remember eyeballing that code. Would you mind sending me an URL
to that file, or something? Or maybe send me the files themselves?
Sure, here's a patch against current CVS. Please remove all the
functions referencing to "buffer" and "buffer.h" to compile.
Remember that it's a work in progress thing. It has flaws. One issue
that currently bugs me is, that processes can deadlock if they keep
trying to create a message (IMessagesCreate), but fail because the queue
is full of messages for themselves. A process should thus always try to
fetch messages (via IMessagesCheck) and remove pending ones before
retrying to send one. That's not always practical.
One design limitation is, that you have to know how large your message
is as soon as you reserve (shared) memory for it, but that's intended.
At least I've stress tested the wrap-around code and it seems to work.
No guarantees, though ;-)
Regards
Markus
#
# old_revision [9a68fa59cb0ca3246f03880664062abb98f1a61a]
#
# add_file "src/backend/storage/ipc/imsg.c"
# content [3e84c6372a47612a2fe233fee6b122808135580e]
#
# add_file "src/include/storage/imsg.h"
# content [3cf37b12a00b90f65b8393fc5e27c98d772dc22b]
#
# patch "src/backend/storage/ipc/Makefile"
# from [71276ab6483aebbb27f87c988d77ab876611f190]
#to [9a99101d3e8bbfe52c97763db536804e94371828]
#
# patch "src/backend/storage/ipc/ipci.c"
# from [177f266b4668190a6ab1f2902305f7b7e577ef8d]
#to [1971e2122ba4455c8b9784e70059d917fdf4f4c8]
#
--- src/backend/storage/ipc/imsg.c 3e84c6372a47612a2fe233fee6b122808135580e
+++ src/backend/storage/ipc/imsg.c 3e84c6372a47612a2fe233fee6b122808135580e
@@ -0,0 +1,375 @@
+/*-
+ *
+ * imsg.c
+ *internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+ *
+ *-
+ */
+
+#include
+#include
+#include
+
+#ifdef HAVE_SYS_FILIO_H
+#include
+#endif
+
+#include
+
+#include "postgres.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "storage/imsg.h"
+#include "storage/ipc.h"
+#include "storage/buffer.h"
+#include "storage/spin.h"
+#include "utils/elog.h"
+
+/* global variable pointing to the shmem area */
+IMessageCtlData *IMessageCtl = NULL;
+
+/*
+ * Initialization of shared memory for internal messages.
+ */
+int
+IMessageShmemSize(void)
+{
+ return MAXALIGN(IMessageBufferSize);
+}
+
+void
+IMessageShmemInit(void)
+{
+ bool foundIMessageCtl;
+
+#ifdef IMSG_DEBUG
+ elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+#endif
+
+ IMessageCtl = (IMessageCtlData *)
+ ShmemInitStruct("IMsgCtl",
+ MAXALIGN(IMessageBufferSize),
+ &foundIMessageCtl);
+
+ if (foundIMessageCtl)
+ return;
+
+ /* empty the control structure and all message descriptors */
+ memset(IMessageCtl, 0, MAXALIGN(IMessageBufferSize));
+
+ /* initialize start and end pointers */
+ IMessageCtl->queue_start = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+ IMessageCtl->queue_end = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+
+ SpinLockInit(&IMessageCtl->msgs_lck);
+}
+
+/*
+ * IMessageCreate
+ *
+ * creates a new but deactivated message within the queue, returning the
+ * message header of the newly created message.
+ */
+IMessage*
+IMessageCreate(int recipient, int msg_size)
+{
+ IMessage *msg;
+ intremaining_space;
+
+#ifdef IMSG_DEBUG
+ elog(DEBUG3, "IMessageCreate(): recipient: %d, size: %d",
+ recipient, msg_size);
+#endif
+
+ /* assert a reasonable maximum message size */
+ Assert(msg_size < (MAXALIGN(IMessageBufferSize) / 4));
+
+ START_CRIT_SECTION();
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+ SpinLockAcquire(&imsgctl->msgs_lck);
+
+ /*
+ * Check if there is enough space for the message plus the
+ * terminating header
+ */
+ if (imsgctl->queue_end < imsgctl->queue_start)
+ remaining_space = (int) imsgctl->queue_start -
+ (int) imsgctl->queue_end;
+ else
+ remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+ (int) imsgctl->queue_end;
+
+ if (remaining_space < (MAXALIGN(IMessageBufferSize) / 8))
+ {
+#ifdef IMSG_DEBUG
+ elog(DEBUG3, "IMessageCreate(): cleanup starting");
+#endif
+
+ /* Clean up messages that have been removed. */
+ while (imsgctl->queue_start->recipient == 0)
+ {
+if (imsgctl->queue_start > imsgctl->queue_end)
+{
+ if ((imsgctl->queue_start->sender == 0) &&
+ (imsgctl->queue_start->recipient == 0))
+ {
+#ifdef IMSG_DEBUG
+ elog(DEBUG3, "IMessageCreate(): cleanup wrapped");
+#endif
+ imsgctl->queue_start = (IMessage*) IMSG_BUFFER_START(imsgctl);
+ continue;
+ }
+}
+else if (imsgctl->queue_start >= imsgctl->queue_end)
+ break;
+
+imsgctl->que