Hello,

On Sat, 2006-04-08 at 13:45 -0400, Jonah H. Harris wrote:
> > I'd like to help teaching PostgreSQL the art of parallel query
> > execution.
> 
> Cool, we're not at the communication-level yet, but your help would be
> appreciated.
> 
> > In case you're interested I'll compile a patch for review.
> 
> Surely!

[ This patch is not meant to be included immediately. But it might be
one step towards supporting parallel query execution, IMHO. ]

This is my internal message passing implementation. I'd probably also
need to document its use...

It's not tested except for 'it works for me'. And it still misses the
'wrap around' functionality to recycle the buffer. Also, I should
probably rename buffer.h and buffer.c since there are a lot of other
buffers in PostgreSQL.

Markus

*** /dev/null	2006-04-03 19:40:38.000000000 +0200
--- trunk/src/backend/storage/ipc/imsg.c	2006-04-08 20:39:32.000000000 +0200
***************
*** 0 ****
--- 1,277 ----
+ /*-------------------------------------------------------------------------
+  *
+  * imsg.c
+  *    internal messages from process to process sent via shared memory.
+  *
+  *
+  * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include <unistd.h>
+ #include <signal.h>
+ 
+ #ifdef HAVE_SYS_FILIO_H
+ #include <sys/filio.h>
+ #endif
+ 
+ #include <sys/ioctl.h>
+ 
+ #include "postgres.h"
+ #include "miscadmin.h"
+ #include "storage/proc.h"
+ #include "storage/imsg.h"
+ #include "storage/ipc.h"
+ #include "storage/buffer.h"
+ #include "storage/spin.h"
+ #include "utils/elog.h"
+ 
+ /* global variable pointing to the shmem area */
+ IMessageCtlData *IMessageCtl = NULL;
+ 
+ #define IMSG_DEBUG
+ 
+ /*
+  * Initialization of shared memory for internal messages.
+  */
+ int
+ IMessageShmemSize(void)
+ {
+ 	return MAXALIGN(IMessageBufferSize);
+ }
+ 
+ void
+ IMessageShmemInit(void)
+ {
+ 	bool		foundIMessageCtl;
+ 
+ #ifdef IMSG_DEBUG
+ 	elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+ #endif
+ 
+ 	IMessageCtl = (IMessageCtlData *)
+ 		ShmemInitStruct("IMsgCtl",
+ 						MAXALIGN(IMessageBufferSize),
+ 						&foundIMessageCtl);
+ 
+ 	if (foundIMessageCtl)
+ 		return;
+ 
+ 	/* empty the control structure and all message descriptors */
+ 	memset(IMessageCtl, 0, MAXALIGN(IMessageBufferSize));
+ 
+ 	/* initialize start and end pointers */
+ 	IMessageCtl->queue_start = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+ 	IMessageCtl->queue_end = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+ 
+ 	SpinLockInit(&IMessageCtl->msgs_lck);
+ }
+ 
+ /*
+  *   IMessageCreate
+  *
+  * creates a new but deactivated message within the queue, returning the
+  * message header of the newly created message.
+  */
+ IMessage*
+ IMessageCreate(int recipient, int msg_size)
+ {
+ 	IMessage	   *msg;
+ 
+ #ifdef IMSG_DEBUG
+ 	elog(DEBUG3, "IMessageCreate(): recipient: %d, size: %d",
+ 		recipient, msg_size);
+ #endif
+ 
+ 	/* assert a reasonable maximum message size */
+ 	Assert(msg_size < MAXALIGN(IMessageShmemSize) / 4);
+ 
+ 	START_CRIT_SECTION();
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile IMessageCtlData *imsgctl = IMessageCtl;
+ 
+ 		SpinLockAcquire(&imsgctl->msgs_lck);
+ 
+ 		/* check if there is enough space for the message plus the
+ 		 * terminating header */
+ 		if ((int) imsgctl->queue_end + msg_size + sizeof(IMessage) <
+ 			 IMSG_BUFFER_END(imsgctl))
+ 		{
+ 			msg = (IMessage*) imsgctl->queue_end;
+ 			imsgctl->queue_end += IMSG_ALIGN(msg_size);
+ 
+ 			imsgctl->queue_end->sender = 0;
+ 			imsgctl->queue_end->recipient = 0;
+ 		}
+ 		else
+ 		{
+ 			/* most probably we can wrap around and recycle the space
+ 			 * at the start of the queue. */
+ 
+ 			/* TODO: implement wrap-around functionality */
+ 			Assert(0);
+ 			msg = NULL; /* to avoid compiler warning */
+ 		}
+ 
+ 		/* initialize the message as inactive */
+ 		msg->sender = 0;
+ 		msg->recipient = recipient;
+ 		msg->size = msg_size;
+ 
+ 		/* queue editing finished */
+ 		SpinLockRelease(&imsgctl->msgs_lck);
+ 
+ #ifdef IMSG_DEBUG
+ 	elog(DEBUG3, "IMessageCreate(): created at %08X size: %d", (int) msg,
+ 		msg->size);
+ #endif
+ 	}
+ 	END_CRIT_SECTION();
+ 
+ 	return msg;
+ }
+ 
+ void
+ IMessageActivate(IMessage *msg)
+ {
+ 	msg->sender = MyProcPid;
+ 
+ 	/* send a signal to the recipient */
+ 	kill(msg->recipient, SIGUSR1);
+ }
+ 
+ /*
+  *   IMessageRemove
+  *
+  * Marks a message as removable by setting the recipient to null. The message
+  * will eventually be removed during creation of new messages, see
+  * IMessageCreate().
+  */
+ void
+ IMessageRemove(IMessage *msg)
+ {
+ 	msg->recipient = 0;
+ }
+ 
+ /*
+  *   IMessageCheck
+  *
+  * Checks if there is a message in the queue for this process. Returns null
+  * if there is no message for this process, the message header otherwise. The
+  * message remains in the queue and should be removed by IMessageRemove().
+  */
+ IMessage*
+ IMessageCheck(void)
+ {
+ 	IMessage	   *msg,
+ 					   *res;
+ 
+ 	res = NULL;
+ 	START_CRIT_SECTION();
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile IMessageCtlData *imsgctl = IMessageCtl;
+ 
+ 		SpinLockAcquire(&imsgctl->msgs_lck);
+ 
+ 		/* Loop through the queue from the start. Wraping might be
+ 		 * required */
+ 		msg = imsgctl->queue_start;
+ 		while ((msg->sender != 0) || (msg->recipient != 0))
+ 		{
+ 			if ((msg->recipient == MyProcPid) && (msg->sender != 0))
+ 			{
+ 				res = msg;
+ 				break;
+ 			}
+ 
+ 			msg += IMSG_ALIGN(msg->size);
+ 
+ 			/* possibly wrap around */
+ 			if ((msg->sender == 0) && (msg->recipient == 0) &&
+ 				((int) imsgctl->queue_end < (int) imsgctl->queue_start))
+ 			{
+ 				msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+ 			}
+ 		}
+ 
+ 		SpinLockRelease(&imsgctl->msgs_lck);
+ 	}
+ 	END_CRIT_SECTION();
+ 
+ #ifdef IMSG_DEBUG
+ 	if (res == NULL)
+ 		elog(DEBUG3, "IMessageCheck(): no new message for %d.", MyProcPid);
+ 	else
+ 		elog(DEBUG3, "IMessageCheck(): new message of size %d for %d.",
+ 				msg->size, MyProcPid);
+ #endif
+ 
+ 	return res;
+ }
+ 
+ /*
+  *   IMessageAwait
+  *
+  * Waits for a message but leaves the message in the queue.
+  */
+ IMessage*
+ IMessageAwait(void)
+ {
+ 	IMessage	   *msg;
+ //	struct timeval		tv;
+ 
+ 	msg = IMessageCheck();
+ 	if (!msg)
+ 	{
+ 		/*
+ 		 * TODO: we want to wait for signals here. Check if select() is
+ 		 * appropriate. Maybe pause() is better, but how about portability?
+ 		 */
+ 		//tv.tv_sec = 99999;
+ 		//select(1, NULL, NULL, NULL, &tv);
+ 		pause();
+ 		msg = IMessageCheck();
+ 	}
+ 
+ 	return msg;
+ }
+ 
+ /*
+  *   IMessageGetReadBuffer
+  *
+  * gets a readable buffer for the given message
+  */
+ buffer *
+ IMessageGetReadBuffer(IMessage *msg)
+ {
+ 	buffer *b = palloc(sizeof(buffer));
+ 
+ 	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+ 	b->fill_size = msg->size;
+ 
+ 	return b;
+ }
+ 
+ /*
+  *   IMessageGetWriteBuffer
+  *
+  * gets a writeable buffer for the given message
+  */
+ buffer *
+ IMessageGetWriteBuffer(IMessage *msg)
+ {
+ 	buffer *b = palloc(sizeof(buffer));
+ 
+ 	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+ 
+ 	return b;
+ }
+ 
+ void
+ IMessageFreeBuffer(buffer *b)
+ {
+ 	pfree(b);
+ }
*** /dev/null	2006-04-03 19:40:38.000000000 +0200
--- trunk/src/backend/storage/ipc/buffer.c	2006-04-08 20:37:16.000000000 +0200
***************
*** 0 ****
--- 1,250 ----
+ /*-------------------------------------------------------------------------
+  *
+  * buffer.c
+  *    byte order and maximum message size aware buffer handling
+  *    functions for communication.
+  *
+  *
+  * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+  * 
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include <endian.h>
+ #include <string.h>
+ #include <netinet/in.h>
+ 
+ 
+ #include "postgres.h"
+ #include "miscadmin.h"
+ #include "storage/buffer.h"
+ 
+ void
+ init_buffer(buffer *b, void *data, int size, alloc_func *ma,
+ 						buffer_fill_func *bf, void *obj)
+ {
+ 	b->data = data;
+ 	b->ptr = 0;
+ 	b->max_size = size;
+ 	b->fill_size = 0;
+ 	b->obj = obj;
+ 
+ 	b->mem_alloc = ma;
+ 	b->fill_func = bf;
+ }
+ 
+ uint8_t
+ get_int8(buffer *b)
+ {
+ 	int8_t res;
+ 
+ 	Assert(b->ptr + sizeof(uint8_t) <= b->fill_size);
+ 	res = *((uint8_t*) ((int) b->data + b->ptr));
+ 	b->ptr += sizeof(uint8_t);
+ 	return res;
+ }
+ 
+ uint32_t
+ get_int32(buffer *b)
+ {
+ 	uint32_t res;
+ 
+ 	Assert(b->ptr + sizeof(uint32_t) <= b->fill_size);
+ 	res = *((uint32_t*) ((int) b->data + b->ptr));
+ 	b->ptr += sizeof(uint32_t);
+ 
+ 	
+ #if __BYTE_ORDER == __LITTLE_ENDIAN
+ 	return __bswap_32(res);
+ #elif __BYTE_ORDER == __BIG_ENDIAN
+ 	return res;
+ #else
+ #error __BYTE_ORDER not specified!
+ #endif
+ }
+ 
+ char *
+ get_pstring(buffer *b)
+ {
+ 	int size;
+ 	char *res;
+ 
+ 	Assert(b->ptr + sizeof(uint8_t) <= b->fill_size);
+ 	size = *((char*) ((int) b->data + b->ptr));
+ 	Assert(b->ptr + sizeof(uint8_t) + size <= b->fill_size);
+ 	Assert(b->mem_alloc);
+ 	res = b->mem_alloc(size + 1);
+ 	b->ptr += sizeof(char);
+ 	memcpy(res, (void*) ((int) b->data + b->ptr), size);
+ 	res[size] = 0;
+ 	b->ptr += size;
+ 	return res;
+ }
+ 
+ void
+ get_pstring_into(buffer *b, char *dest, int max_size)
+ {
+ 	int size;
+ 
+ 	Assert(b->ptr + sizeof(uint8_t) <= b->fill_size);
+ 	size = *((char*) ((int) b->data + b->ptr));
+ 	Assert(b->ptr + sizeof(uint8_t) + size <= b->fill_size);
+ 	Assert(size + 1 <= max_size);
+ 	b->ptr += sizeof(char);
+ 	memcpy(dest, (void*) ((int) b->data + b->ptr), size);
+ 	dest[size] = 0;
+ 	b->ptr += size;
+ }
+ 
+ char *
+ get_p32string(buffer *b)
+ {
+ 	int size;
+ 	char *res;
+ 
+ 	size = get_int32(b);
+ 	Assert(b->ptr + size <= b->fill_size);
+ 	Assert(b->mem_alloc);
+ 	res = b->mem_alloc(size + 1);
+ 	memcpy(res, (void*) ((int) b->data + b->ptr), size);
+ 	res[size] = 0;
+ 	b->ptr += size;
+ 	return res;
+ }
+ 
+ void
+ get_p32string_into(buffer *b, char *dest, int max_size)
+ {
+ 	int size;
+ 
+ 	size = get_int32(b);
+ 	Assert(b->ptr + sizeof(uint32_t) + size <= b->fill_size);
+ 	Assert(size + 1 <= max_size);
+ 	memcpy(dest, (void*) ((int) b->data + b->ptr), size);
+ 	dest[size] = 0;
+ 	b->ptr += size;
+ }
+ 
+ char *
+ get_cstring(buffer *b)
+ {
+ 	int size;
+ 	char *res;
+ 
+ 	size = strlen( (char *) ((int) b->data + b->ptr) );
+ 	Assert(b->ptr + size <= b->fill_size);
+ 	res = b->mem_alloc(size + 1);
+ 	memcpy(res, (void*) ((int) b->data + b->ptr), size);
+ 	res[size] = 0;
+ 	b->ptr += size + 1;
+ 	return res;
+ }
+ 
+ void
+ get_cstring_into(buffer *b, char *dest, int max_size)
+ {
+ 	int size;
+ 
+ 	size = strlen( (char *) ((int) b->data + b->ptr) );
+ 	Assert(size <= max_size);
+ 	Assert(b->ptr + size <= b->fill_size);
+ 	memcpy(dest, (void*) ((int) b->data + b->ptr), size);
+ 	dest[size] = 0;
+ 	b->ptr += size + 1;
+ }
+ 
+ void
+ get_data(buffer *b, void *dest, int size)
+ {
+ 	Assert(b->ptr + size <= b->fill_size);
+ 	memcpy(dest, (void*) ((int) b->data + b->ptr), size);
+ 	b->ptr += size;
+ }
+ 
+ void
+ put_int8(buffer *b, uint8_t val)
+ {
+ 	Assert(b->ptr + sizeof(uint8_t) <= b->max_size);
+ 	*((uint8_t*) ((int) b->data + b->ptr)) = val;
+ 	b->ptr += sizeof(uint8_t);
+ 	b->fill_size += sizeof(uint8_t);
+ }
+ 
+ void
+ put_int32(buffer *b, uint32_t val)
+ {
+ 	Assert(b->ptr + sizeof(uint32_t) <= b->max_size);
+ 
+ #if __BYTE_ORDER == __LITTLE_ENDIAN
+ 	*((uint32_t*) ((int) b->data + b->ptr)) = __bswap_32(val);
+ #elif __BYTE_ORDER == __BIG_ENDIAN
+ 	*((uint32_t*) ((int) b->data + b->ptr)) = val;
+ #else
+ #error __BYTE_ORDER not specified!
+ #endif
+ 	b->ptr += sizeof(uint32_t);
+ 	b->fill_size += sizeof(uint32_t);
+ }
+ 
+ void
+ put_pstring(buffer *b, char *str)
+ {
+ 	int size = strlen(str);
+ 
+ 	Assert(b->ptr + size + 1 <= b->max_size);
+ 	*((uint8_t*) ((int) b->data + b->ptr)) = size;
+ 	b->ptr += sizeof(uint8_t);
+ 	b->fill_size += sizeof(uint8_t);
+ 	memcpy((void*) ((int)b->data + b->ptr), str, size);
+ 	b->ptr += size;
+ 	b->fill_size += size;
+ }
+ 
+ void
+ put_p32string(buffer *b, char *str)
+ {
+ 	int size = strlen(str);
+ 
+ 	put_int32(b, size);
+ 	Assert(b->ptr + size + 1 <= b->max_size);
+ 	memcpy((void*) ((int)b->data + b->ptr), str, size);
+ 	b->ptr += size;
+ 	b->fill_size += size;
+ }
+ 
+ void
+ put_cstring(buffer *b, char *str)
+ {
+ 	int size = strlen(str);
+ 
+ 	Assert(b->ptr + size + 1 <= b->max_size);
+ 	memcpy((void*) ((int)b->data + b->ptr), str, size);
+ 	b->ptr += size;
+ 	b->fill_size += size;
+ 
+ 	*((uint8_t*) ((int) b->data + b->ptr)) = 0;
+ 	b->ptr += 1;
+ 	b->fill_size += 1;
+ }
+ 
+ void
+ put_data(buffer *b, void *data, int size)
+ {
+ 	Assert(b->ptr + size <= b->max_size);
+ 	memcpy((void*) ((int) b->data + b->ptr), data, size);
+ 	b->ptr += size;
+ 	b->fill_size += size;
+ }
+ 
+ void
+ put_data_from_buffer(buffer *dest, buffer *src)
+ {
+ 	int size;
+ 
+ 	size = src->fill_size - src->ptr;
+ 	Assert(dest->ptr + size <= dest->max_size);
+ 	memcpy((void*) ((int) dest->data + dest->ptr), 
+ 		(void*) ((int) src->data + src->ptr), size);
+ 	dest->ptr += size;
+ 	dest->fill_size += size;
+ }
*** /dev/null	2006-04-03 19:40:38.000000000 +0200
--- trunk/src/include/storage/imsg.h	2006-04-08 20:39:26.000000000 +0200
***************
*** 0 ****
--- 1,84 ----
+ /*-------------------------------------------------------------------------
+  *
+  * imsg.c
+  *    internal messages from process to process sent via shared memory.
+  *
+  *
+  * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #ifndef IMSG_H
+ #define IMSG_H
+ 
+ #include <sys/types.h>
+ #include "storage/spin.h"
+ #include "storage/buffer.h"
+ 
+ /* TODO: replace with GUC variable to be configurable */
+ #define IMessageBufferSize 1024000
+ 
+ /* for convinience to buffer access */
+ #define IMSG_BUFFER_START(imsgctl) ((int) \
+ 			(((int) imsgctl) + sizeof(IMessageCtlData)))
+ 
+ #define IMSG_BUFFER_END(imsgctl) ((int) \
+ 			(((int) imsgctl) + MAXALIGN(IMessageShmemSize)))
+ 
+ /* get a data pointer from the header */
+ #define IMSG_DATA(imsg) ((void*) ((int) imsg + sizeof(IMessage)))
+ 
+ /* correct alignment for messages (8 bytes) */
+ #define IMSG_ALIGN(size) (((size) + 7) & 0xFFFFFFF8)
+ 
+ /*
+  * Message descriptor in front of the message
+  */
+ typedef struct
+ {
+ 	/* pid of the sender, null means not yet activated message */
+ 	pid_t		sender;
+ 
+ 	/* pid of the recipient, null meaning has already been received */
+ 	pid_t		recipient;
+ 
+ 	/* message size following, but not including this header */
+ 	int			size;
+ } IMessage;
+ 
+ /*
+  * shared-memory pool for internal messages.
+  */
+ typedef struct
+ {
+ 	/* currently active messages */
+ 	unsigned int		count_messages;
+ 
+ 	/* start of messages within the cycling queue */
+ 	IMessage		   *queue_start;
+ 
+ 	/* next free place, just after the last message */
+ 	IMessage		   *queue_end;
+ 
+ 	/* lock for editing the message queue */
+ 	slock_t				msgs_lck;
+ } IMessageCtlData;
+ 
+ /* the global variable storing pointer to the shared memory area */
+ extern IMessageCtlData *RmgrCtl;
+ 
+ /* routines to send and receive internal messages */
+ extern int IMessageShmemSize(void);
+ extern void IMessageShmemInit(void);
+ extern IMessage* IMessageCreate(int recipient, int msg_size);
+ extern void IMessageActivate(IMessage *msg);
+ extern void IMessageRemove(IMessage *msg);
+ extern IMessage* IMessageCheck(void);
+ extern IMessage* IMessageAwait(void);
+ 
+ extern buffer *IMessageGetReadBuffer(IMessage *msg);
+ extern buffer *IMessageGetWriteBuffer(IMessage *msg);
+ extern void IMessageFreeBuffer(buffer *b);
+ 
+ #endif   /* IMSG_H */
*** /dev/null	2006-04-03 19:40:38.000000000 +0200
--- trunk/src/include/storage/buffer.h	2006-04-08 20:37:10.000000000 +0200
***************
*** 0 ****
--- 1,54 ----
+ /*-------------------------------------------------------------------------
+  *
+  * buffer.c
+  *    byte order and maximum message size aware buffer handling
+  *    functions for communication.
+  *
+  *
+  * Copyright (c) 2006, Markus Schiltknecht <[EMAIL PROTECTED]>
+  * 
+  *-------------------------------------------------------------------------
+  */
+ 
+ #ifndef _BUFFSOCK_H_
+ #define _BUFFSOCK_H_
+ 
+ #include <string.h>
+ #include <netinet/in.h>
+ 
+ typedef void* (alloc_func) (size_t size);
+ typedef void (buffer_fill_func) (void *buf, size_t min, void *obj);
+ 
+ typedef struct
+ {
+ 	void *data;
+ 	int ptr;
+ 	int max_size;		/* memory allocated for the buffer */
+ 	int fill_size;		/* fill status of buffer, read until there */
+ 
+ 	void *obj;			/* custom owning object, passed to buffer_fill_func */
+ 
+ 	alloc_func *mem_alloc;
+ 	buffer_fill_func *fill_func;
+ } buffer;
+ 
+ extern void init_buffer(buffer *b, void *data, int size, alloc_func *ma,
+ 						buffer_fill_func *bf, void *obj);
+ extern uint8_t get_int8(buffer *b);
+ extern uint32_t get_int32(buffer *b);
+ extern char *get_pstring(buffer *b);
+ extern char *get_p32string(buffer *b);
+ extern char *get_cstring(buffer *b);
+ extern void get_pstring_into(buffer *b, char *dest, int max_size);
+ extern void get_p32string_into(buffer *b, char *dest, int max_size);
+ extern void get_cstring_into(buffer *b, char *dest, int max_size);
+ extern void get_data(buffer *b, void *dest, int size);
+ extern void put_int8(buffer *b, uint8_t val);
+ extern void put_int32(buffer *b, uint32_t val);
+ extern void put_pstring(buffer *b, char *str);
+ extern void put_p32string(buffer *b, char *str);
+ extern void put_cstring(buffer *b, char *str);
+ extern void put_data(buffer *b, void *data, int size);
+ extern void put_data_from_buffer(buffer *dest, buffer *src);
+ 
+ #endif		// _BUFFSOCK_H_
*** trunk_cvs/src/backend/storage/ipc/ipci.c	2006-04-08 20:09:08.000000000 +0200
--- trunk/src/backend/storage/ipc/ipci.c	2006-04-08 20:43:09.000000000 +0200
***************
*** 24,29 ****
--- 24,30 ----
  #include "postmaster/postmaster.h"
  #include "storage/bufmgr.h"
  #include "storage/freespace.h"
+ #include "storage/imsg.h"
  #include "storage/ipc.h"
  #include "storage/lock.h"
  #include "storage/lwlock.h"
***************
*** 88,93 ****
--- 89,95 ----
  		size = add_size(size, SInvalShmemSize());
  		size = add_size(size, FreeSpaceShmemSize());
  		size = add_size(size, BgWriterShmemSize());
+ 		size = add_size(size, IMessageShmemSize());
  #ifdef EXEC_BACKEND
  		size = add_size(size, ShmemBackendArraySize());
  #endif
***************
*** 152,157 ****
--- 154,160 ----
  	SUBTRANSShmemInit();
  	TwoPhaseShmemInit();
  	MultiXactShmemInit();
+ 	IMessageShmemInit();
  	InitBufferPool();
  
  	/*
*** trunk_cvs/src/backend/storage/ipc/Makefile	2006-04-08 20:09:08.000000000 +0200
--- trunk/src/backend/storage/ipc/Makefile	2006-04-08 20:25:23.000000000 +0200
***************
*** 16,22 ****
  endif
  
  OBJS = ipc.o ipci.o pmsignal.o procarray.o shmem.o shmqueue.o \
! 	sinval.o sinvaladt.o
  
  all: SUBSYS.o
  
--- 16,22 ----
  endif
  
  OBJS = ipc.o ipci.o pmsignal.o procarray.o shmem.o shmqueue.o \
! 	sinval.o sinvaladt.o imsg.o buffer.o
  
  all: SUBSYS.o
  
---------------------------(end of broadcast)---------------------------
TIP 5: don't forget to increase your free space map settings

Reply via email to