Robert, * Robert Haas ([email protected]) wrote: > Attached is a contrib module that lets you launch arbitrary command in > a background worker, and supporting infrastructure patches for core.
Very cool! Started looking into this while waiting on a few
CLOBBER_CACHE_ALWAYS runs to finish (ugh...).
Perhaps I'm just being a bit over the top, but all this per-character
work feels a bit ridiculous.. When we're using MAXIMUM_ALIGNOF, I
suppose it's not so bad, but is there no hope to increase that and make
this whole process more efficient? Just a thought.
After reading through the code for 0001, I decided to actually take it
out for a spin- see attached. I then passed a few megabytes of data
through it and it seemed to work just fine.
In general, I'm quite excited about this capability and will be looking
over the later patches also. I also prefer the function-pointer based
approach which was taken up in later versions to the hook-based approach
in the initial patches, so glad to see things going in that direction.
Lastly, I will say that I feel it'd be good to support bi-directional
communication as I think it'll be needed eventually, but I'm not sure
that has to happen now.
Thanks!
Stephen
#include <stdio.h>
#include <limits.h>
#include <stdint.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <time.h>
typedef uint64_t uint64;
typedef uint8_t uint8;
#define MAXIMUM_ALIGNOF 8
#define Size size_t
#define TYPEALIGN_DOWN(ALIGNVAL,LEN) \
(((uintptr_t) (LEN)) & ~((uintptr_t) ((ALIGNVAL) - 1)))
#define MAXALIGN_DOWN(LEN) TYPEALIGN_DOWN(MAXIMUM_ALIGNOF, (LEN))
typedef struct shm_mq
{
uint64 mq_bytes_read;
uint64 mq_bytes_written;
Size mq_ring_size;
bool mq_detached;
uint8 mq_ring_offset;
char mq_ring[];
} shm_mq;
typedef struct shm_mq_handle
{
shm_mq *mqh_queue;
char *mqh_buffer;
Size mqh_buflen;
Size mqh_consume_pending;
Size mqh_partial_bytes;
Size mqh_expected_bytes;
bool mqh_length_word_complete;
bool mqh_counterparty_attached;
FILE *output;
} shm_mq_handle;
typedef struct shm_mq_handle shm_mq_handle;
typedef struct
{
const char *data;
Size len;
} shm_mq_iovec;
typedef enum
{
SHM_MQ_SUCCESS, /* Sent or received a message. */
SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
SHM_MQ_DETACHED /* Other process has detached queue. */
} shm_mq_result;
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait);
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait);
#define BUFSIZE 1024
#define VECSIZE 16
int main()
{
shm_mq_handle mqh;
shm_mq_result result;
int randvec;
shm_mq_iovec vectors[VECSIZE];
srand(time(NULL));
mqh.output = stdout;
mqh.mqh_partial_bytes = 0;
randvec = (rand() % VECSIZE) + 1;
fprintf(stderr, "vectors: %d\n", randvec);
for (int i = 0; i < randvec; i++)
vectors[i].data = NULL;
while (!feof(stdin))
{
for (int i = 0; i < randvec; i++)
if (vectors[i].data != NULL)
free((char*) vectors[i].data);
for (int i = 0 ; i < randvec ; i++)
{
Size count;
int randread = rand() % BUFSIZE;
fprintf(stderr, "randread: %d\n", randread);
vectors[i].data = malloc(randread);
count = fread((char*) vectors[i].data, 1, randread, stdin);
vectors[i].len = count;
}
result = shm_mq_sendv(&mqh, vectors, randvec, false);
if (result != SHM_MQ_SUCCESS)
return 1;
}
#if 0
while ((count = fread(buffer, 1, randread, stdin)))
{
result = shm_mq_send(&mqh, count, buffer, false);
}
#endif
return 0;
}
shm_mq_result
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
{
shm_mq_iovec iov;
iov.data = data;
iov.len = nbytes;
return shm_mq_sendv(mqh, &iov, 1, nowait);
}
shm_mq_result
shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
{
shm_mq_result res;
Size nbytes = 0;
Size bytes_written;
int i;
int which_iov = 0;
Size offset;
/* Compute total size of write. */
for (i = 0; i < iovcnt; ++i)
nbytes += iov[i].len;
offset = mqh->mqh_partial_bytes;
do
{
Size chunksize;
/* Figure out which bytes need to be sent next. */
if (offset >= iov[which_iov].len)
{
offset -= iov[which_iov].len;
++which_iov;
if (which_iov >= iovcnt)
break;
continue;
}
if (which_iov + 1 < iovcnt &&
offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
{
char tmpbuf[MAXIMUM_ALIGNOF];
int j = 0;
for (;;)
{
if (offset < iov[which_iov].len)
{
tmpbuf[j] = iov[which_iov].data[offset];
j++;
offset++;
if (j == MAXIMUM_ALIGNOF)
break;
}
else
{
offset -= iov[which_iov].len;
which_iov++;
if (which_iov >= iovcnt)
break;
}
}
bytes_written = fwrite(tmpbuf, 1, j, mqh->output);
mqh->mqh_partial_bytes += bytes_written;
res = bytes_written > 0 ? SHM_MQ_SUCCESS : !SHM_MQ_SUCCESS;
if (res != SHM_MQ_SUCCESS)
return res;
continue;
}
/*
* If this is the last chunk, we can write all the data, even if it
* isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
* MAXALIGN_DOWN the write size.
*/
chunksize = iov[which_iov].len - offset;
if (which_iov + 1 < iovcnt)
chunksize = MAXALIGN_DOWN(chunksize);
bytes_written = fwrite(&iov[which_iov].data[offset], 1, chunksize, mqh->output);
mqh->mqh_partial_bytes += bytes_written;
offset += bytes_written;
res = bytes_written > 0 ? SHM_MQ_SUCCESS : !SHM_MQ_SUCCESS;
if (res != SHM_MQ_SUCCESS)
return res;
} while (mqh->mqh_partial_bytes < nbytes);
/* Reset for next message. */
mqh->mqh_partial_bytes = 0;
mqh->mqh_length_word_complete = false;
/* Notify receiver of the newly-written data, and return. */
return SHM_MQ_SUCCESS;
}
signature.asc
Description: Digital signature
