Robert,

* Robert Haas (robertmh...@gmail.com) wrote:
> Attached is a contrib module that lets you launch arbitrary command in
> a background worker, and supporting infrastructure patches for core.

Very cool!  Started looking into this while waiting on a few
CLOBBER_CACHE_ALWAYS runs to finish (ugh...).

Perhaps I'm just being a bit over the top, but all this per-character
work feels a bit ridiculous..  When we're using MAXIMUM_ALIGNOF, I
suppose it's not so bad, but is there no hope to increase that and make
this whole process more efficient?  Just a thought.

After reading through the code for 0001, I decided to actually take it
out for a spin- see attached.  I then passed a few megabytes of data
through it and it seemed to work just fine.

In general, I'm quite excited about this capability and will be looking
over the later patches also.  I also prefer the function-pointer based
approach which was taken up in later versions to the hook-based approach
in the initial patches, so glad to see things going in that direction.
Lastly, I will say that I feel it'd be good to support bi-directional
communication as I think it'll be needed eventually, but I'm not sure
that has to happen now.

        Thanks!

                Stephen
#include <stdio.h>
#include <limits.h>
#include <stdint.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <time.h>

typedef uint64_t uint64;
typedef uint8_t uint8;

#define MAXIMUM_ALIGNOF 8
#define Size size_t

#define TYPEALIGN_DOWN(ALIGNVAL,LEN)  \
    (((uintptr_t) (LEN)) & ~((uintptr_t) ((ALIGNVAL) - 1)))

#define MAXALIGN_DOWN(LEN)      TYPEALIGN_DOWN(MAXIMUM_ALIGNOF, (LEN))


typedef struct shm_mq
{
    uint64      mq_bytes_read;
    uint64      mq_bytes_written;
    Size        mq_ring_size;
    bool        mq_detached;
    uint8       mq_ring_offset;
    char        mq_ring[];
} shm_mq;

typedef struct shm_mq_handle
{
    shm_mq     *mqh_queue;
    char       *mqh_buffer;
    Size        mqh_buflen;
    Size        mqh_consume_pending;
    Size        mqh_partial_bytes;
    Size        mqh_expected_bytes;
    bool        mqh_length_word_complete;
    bool        mqh_counterparty_attached;
	FILE	   *output;
} shm_mq_handle;

typedef struct shm_mq_handle shm_mq_handle;

typedef struct
{
	const char  *data;
	Size    len;
} shm_mq_iovec;                                                                   

typedef enum
{
    SHM_MQ_SUCCESS,             /* Sent or received a message. */
    SHM_MQ_WOULD_BLOCK,         /* Not completed; retry later. */
    SHM_MQ_DETACHED             /* Other process has detached queue. */
} shm_mq_result;

shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait);
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait);

#define BUFSIZE 1024
#define VECSIZE 16

int main()
{
	shm_mq_handle mqh;
	shm_mq_result result;
	int randvec;
	shm_mq_iovec  vectors[VECSIZE];

	srand(time(NULL));

	mqh.output = stdout;
	mqh.mqh_partial_bytes = 0;

	randvec = (rand() % VECSIZE) + 1;

	fprintf(stderr, "vectors: %d\n", randvec);

	for (int i = 0; i < randvec; i++)
		vectors[i].data = NULL;

	while (!feof(stdin))
	{
		for (int i = 0; i < randvec; i++)
			if (vectors[i].data != NULL)
				free((char*) vectors[i].data);

		for (int i = 0 ; i < randvec ; i++)
		{
			Size count;
			int randread = rand() % BUFSIZE;

			fprintf(stderr, "randread: %d\n", randread);

			vectors[i].data = malloc(randread);

			count = fread((char*) vectors[i].data, 1, randread, stdin);
			vectors[i].len = count;
		}

		result = shm_mq_sendv(&mqh, vectors, randvec, false);

		if (result != SHM_MQ_SUCCESS)
			return 1;
	}


#if 0
	while ((count = fread(buffer, 1, randread, stdin)))
	{
		result = shm_mq_send(&mqh, count, buffer, false);
	}
#endif

	return 0;
}

shm_mq_result
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
{
	shm_mq_iovec    iov;

	iov.data = data;
	iov.len = nbytes;

	return shm_mq_sendv(mqh, &iov, 1, nowait);
}

shm_mq_result
shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
{
	shm_mq_result res;
	Size        nbytes = 0;
	Size        bytes_written;
	int         i;
	int         which_iov = 0;
	Size        offset;

	/* Compute total size of write. */
	for (i = 0; i < iovcnt; ++i)
		nbytes += iov[i].len;

	offset = mqh->mqh_partial_bytes;
	do
	{
		Size    chunksize;

		/* Figure out which bytes need to be sent next. */
		if (offset >= iov[which_iov].len)
		{
			offset -= iov[which_iov].len;
			++which_iov;
			if (which_iov >= iovcnt)
				break;
			continue;
		}

		if (which_iov + 1 < iovcnt &&
			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
		{
			char    tmpbuf[MAXIMUM_ALIGNOF];
			int     j = 0;

			for (;;)
			{
				if (offset < iov[which_iov].len)
				{
					tmpbuf[j] = iov[which_iov].data[offset];
					j++;                                                             
					offset++;
					if (j == MAXIMUM_ALIGNOF)
						break;
				}
				else
				{
					offset -= iov[which_iov].len;
					which_iov++;
					if (which_iov >= iovcnt)
						break;
				}
			}
			bytes_written = fwrite(tmpbuf, 1, j, mqh->output);
			mqh->mqh_partial_bytes += bytes_written;
			res = bytes_written > 0 ? SHM_MQ_SUCCESS : !SHM_MQ_SUCCESS;
			if (res != SHM_MQ_SUCCESS)
				return res;
			continue;
		}

		/*
		 * If this is the last chunk, we can write all the data, even if it
		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
		 * MAXALIGN_DOWN the write size.
		 */
		chunksize = iov[which_iov].len - offset;
		if (which_iov + 1 < iovcnt)
			chunksize = MAXALIGN_DOWN(chunksize);
		bytes_written = fwrite(&iov[which_iov].data[offset], 1, chunksize, mqh->output);
		mqh->mqh_partial_bytes += bytes_written;                                     
		offset += bytes_written;
		res = bytes_written > 0 ? SHM_MQ_SUCCESS : !SHM_MQ_SUCCESS;
		if (res != SHM_MQ_SUCCESS)
			return res;
	} while (mqh->mqh_partial_bytes < nbytes);

	/* Reset for next message. */
	mqh->mqh_partial_bytes = 0;
	mqh->mqh_length_word_complete = false;

	/* Notify receiver of the newly-written data, and return. */
	return SHM_MQ_SUCCESS;
}

Attachment: signature.asc
Description: Digital signature

Reply via email to