Revision: 14337 Author: adrian.chadd Date: Mon Oct 19 21:29:45 2009 Log: Flesh out some more of the loghelper instance handling and begin fleshing out the code to write data to it.
http://code.google.com/p/lusca-cache/source/detail?r=14337 Modified: /branches/LUSCA_HEAD/libloghelper/loghelper.c /branches/LUSCA_HEAD/libloghelper/loghelper.h ======================================= --- /branches/LUSCA_HEAD/libloghelper/loghelper.c Mon Oct 19 02:18:59 2009 +++ /branches/LUSCA_HEAD/libloghelper/loghelper.c Mon Oct 19 21:29:45 2009 @@ -4,6 +4,7 @@ #include <stdlib.h> #include <unistd.h> #include <sys/socket.h> +#include <sys/types.h> #include <netinet/in.h> #include "include/util.h" @@ -34,6 +35,8 @@ #include "loghelper_commands.h" +/* Loghelper buffer management functions */ + static void loghelper_buffer_free(loghelper_buffer_t *lb) { @@ -54,7 +57,8 @@ return lb; } -/* *** */ +/* ********** */ +/* Loghelper instance buffer management functions */ /* * Free all pending buffers @@ -71,6 +75,44 @@ } lh->nbufs = 0; } + +static void +loghelper_append_buffer(loghelper_instance_t *lh, loghelper_buffer_t *lb) +{ + dlinkAddTail(lb, &lb->node, &lh->bufs); + lh->nbufs++; +} + +static int +loghelper_has_bufs(loghelper_instance_t *lh) +{ + return (lh->nbufs > 0); +} + +static loghelper_buffer_t * +loghelper_dequeue_buffer(loghelper_instance_t *lh) +{ + loghelper_buffer_t *lb; + + if (lh->bufs.head == NULL) + return NULL; + lb = lh->bufs.head->data; + dlinkDelete(&lb->node, &lh->bufs); + lh->nbufs--; + assert(lh->nbufs >= 0); + return lb; +} + +static void +loghelper_requeue_buffer(loghelper_instance_t *lh, loghelper_buffer_t *lb) +{ + dlinkAdd(lb, &lb->node, &lh->bufs); + lh->nbufs++; +} + + +/* ********* */ +/* Loghelper instance related functions */ /* * destroy the loghelper. @@ -93,10 +135,57 @@ safe_free(lh); } -static int -loghelper_has_bufs(loghelper_instance_t *lh) -{ - return (lh->nbufs > 0); +static void +loghelper_write_handle(int fd, void *data) +{ + loghelper_instance_t *lh = data; + loghelper_buffer_t *lb; + int r; + + debug(87, 5) ("loghelper_write_handle: %p: write ready; writing\n", lh); + lh->flags.writing = 0; + + /* Dequeue a buffer */ + lb = loghelper_dequeue_buffer(lh); + + /* Write it to the socket */ + r = FD_WRITE_METHOD(fd, lb->buf + lb->written_len, lb->len - lb->written_len); + + /* EOF? Error? Shut down the IPC sockets for now and flag it */ + /* And requeue the dequeued buffer so it is eventually retried when appropriate */ + assert(r > 0); + + /* Was it partially written? Requeue the buffer for another write */ + if (r < lb->len) { + lb->written_len += r; + loghelper_requeue_buffer(lh, lb); + } else { + loghelper_buffer_free(lb); + } + + /* Is there any further data? schedule another write */ + if (loghelper_has_bufs(lh)) { + lh->flags.writing = 1; + commSetSelect(lh->wfd, COMM_SELECT_WRITE, loghelper_write_handle, lh, 0); + } + + /* Out of buffers and closing? Destroy the instance */ + if (lh->flags.closing && (! loghelper_has_bufs(lh))) { + loghelper_destroy(lh); + return; + } +} + +static void +loghelper_kick_write(loghelper_instance_t *lh) +{ + /* Don't queue a write if we already have */ + if (lh->flags.writing) + return; + + /* Queue write and set flag to make sure we don't queue another */ + commSetSelect(lh->wfd, COMM_SELECT_WRITE, loghelper_write_handle, lh, 0); + lh->flags.writing = 1; } /* @@ -141,10 +230,51 @@ /* Are there pending messages? If so, leave it for now and queue a write if needed */ if (loghelper_has_bufs(lh)) { - /* XXX check if an explicit flush needs to be scheduled here */ + loghelper_kick_write(lh); return; } /* No pending messages? Wrap up. */ loghelper_destroy(lh); } + +/* + * Queue a command to be sent to the loghelper. + * + * The command will be queued to be sent straight away. Subsequently queued requests + * will wait until the socket write buffer is ready for more data. + * + * For now there's an artificial 32768 - 3 byte limit on the payload size. + */ +int +loghelper_queue_command(loghelper_instance_t *lh, loghelper_command_t cmd, short payload_len, const char payload[]) +{ + u_int8_t u_cmd; + u_int16_t u_len; + loghelper_buffer_t *lb; + + /* make sure the data will fit for now */ + if (payload_len + 3 > 32768) + return 0; + + u_cmd = cmd; + u_len = payload_len + 3; /* 3 == 1 byte cmd, 2 byte length */ + + /* Create a new buffer */ + lb = loghelper_buffer_create(); + + + /* Copy in the command */ + + /* Copy in the packet length */ + + /* Copy in the payload, if any */ + + /* Add it to the instance buffer list tail */ + loghelper_append_buffer(lh, lb); + + /* Do we have a write scheduled? If not, schedule a write */ + loghelper_kick_write(lh); + + return 1; +} ======================================= --- /branches/LUSCA_HEAD/libloghelper/loghelper.h Mon Oct 19 02:10:23 2009 +++ /branches/LUSCA_HEAD/libloghelper/loghelper.h Mon Oct 19 21:29:45 2009 @@ -20,6 +20,7 @@ struct { int closing:1; /* whether we are closing - and further queued messages should be rejected */ + int writing:1; /* whether we're writing anything */ } flags; }; typedef struct _loghelper_instance loghelper_instance_t; --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "lusca-commit" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/lusca-commit?hl=en -~----------~----~----~----~------~----~------~--~---
