Revision: 14337
Author: adrian.chadd
Date: Mon Oct 19 21:29:45 2009
Log: Flesh out some more of the loghelper instance handling and begin  
fleshing out
the code to write data to it.


http://code.google.com/p/lusca-cache/source/detail?r=14337

Modified:
  /branches/LUSCA_HEAD/libloghelper/loghelper.c
  /branches/LUSCA_HEAD/libloghelper/loghelper.h

=======================================
--- /branches/LUSCA_HEAD/libloghelper/loghelper.c       Mon Oct 19 02:18:59 2009
+++ /branches/LUSCA_HEAD/libloghelper/loghelper.c       Mon Oct 19 21:29:45 2009
@@ -4,6 +4,7 @@
  #include <stdlib.h>
  #include <unistd.h>
  #include <sys/socket.h>
+#include <sys/types.h>
  #include <netinet/in.h>

  #include "include/util.h"
@@ -34,6 +35,8 @@
  #include "loghelper_commands.h"


+/* Loghelper buffer management functions */
+
  static void
  loghelper_buffer_free(loghelper_buffer_t *lb)
  {
@@ -54,7 +57,8 @@
        return lb;
  }

-/* *** */
+/* ********** */
+/* Loghelper instance buffer management functions */

  /*
   * Free all pending buffers
@@ -71,6 +75,44 @@
        }
        lh->nbufs = 0;
  }
+
+static void
+loghelper_append_buffer(loghelper_instance_t *lh, loghelper_buffer_t *lb)
+{
+       dlinkAddTail(lb, &lb->node, &lh->bufs);
+       lh->nbufs++;
+}
+
+static int
+loghelper_has_bufs(loghelper_instance_t *lh)
+{
+       return (lh->nbufs > 0);
+}
+
+static loghelper_buffer_t *
+loghelper_dequeue_buffer(loghelper_instance_t *lh)
+{
+       loghelper_buffer_t *lb;
+
+       if (lh->bufs.head == NULL)
+               return NULL;
+       lb = lh->bufs.head->data;
+       dlinkDelete(&lb->node, &lh->bufs);
+       lh->nbufs--;
+       assert(lh->nbufs >= 0);
+       return lb;
+}
+
+static void
+loghelper_requeue_buffer(loghelper_instance_t *lh, loghelper_buffer_t *lb)
+{
+       dlinkAdd(lb, &lb->node, &lh->bufs);
+       lh->nbufs++;
+}
+
+
+/* ********* */
+/* Loghelper instance related functions */

  /*
   * destroy the loghelper.
@@ -93,10 +135,57 @@
        safe_free(lh);
  }

-static int
-loghelper_has_bufs(loghelper_instance_t *lh)
-{
-       return (lh->nbufs > 0);
+static void
+loghelper_write_handle(int fd, void *data)
+{
+       loghelper_instance_t *lh = data;
+       loghelper_buffer_t *lb;
+       int r;
+
+       debug(87, 5) ("loghelper_write_handle: %p: write ready; writing\n", lh);
+       lh->flags.writing = 0;
+
+       /* Dequeue a buffer */
+       lb = loghelper_dequeue_buffer(lh);
+
+       /* Write it to the socket */
+       r = FD_WRITE_METHOD(fd, lb->buf + lb->written_len, lb->len -  
lb->written_len);
+
+       /* EOF? Error? Shut down the IPC sockets for now and flag it */
+       /* And requeue the dequeued buffer so it is eventually retried when  
appropriate */
+       assert(r > 0);
+
+       /* Was it partially written? Requeue the buffer for another write */
+       if (r < lb->len) {
+               lb->written_len += r;
+               loghelper_requeue_buffer(lh, lb);
+       } else {
+               loghelper_buffer_free(lb);
+       }
+
+       /* Is there any further data? schedule another write */
+       if (loghelper_has_bufs(lh)) {
+               lh->flags.writing = 1;
+               commSetSelect(lh->wfd, COMM_SELECT_WRITE, 
loghelper_write_handle, lh, 0);
+       }
+
+       /* Out of buffers and closing? Destroy the instance */
+       if (lh->flags.closing && (! loghelper_has_bufs(lh))) {
+               loghelper_destroy(lh);
+               return;
+       }
+}
+
+static void
+loghelper_kick_write(loghelper_instance_t *lh)
+{
+       /* Don't queue a write if we already have */
+       if (lh->flags.writing)
+               return;
+
+       /* Queue write and set flag to make sure we don't queue another */
+       commSetSelect(lh->wfd, COMM_SELECT_WRITE, loghelper_write_handle, lh, 
0);
+       lh->flags.writing = 1;
  }

  /*
@@ -141,10 +230,51 @@

        /* Are there pending messages? If so, leave it for now and queue a 
write  
if needed */
        if (loghelper_has_bufs(lh)) {
-               /* XXX check if an explicit flush needs to be scheduled here */
+               loghelper_kick_write(lh);
                return;
        }

        /* No pending messages? Wrap up. */
        loghelper_destroy(lh);
  }
+
+/*
+ * Queue a command to be sent to the loghelper.
+ *
+ * The command will be queued to be sent straight away. Subsequently  
queued requests
+ * will wait until the socket write buffer is ready for more data.
+ *
+ * For now there's an artificial 32768 - 3 byte limit on the payload size.
+ */
+int
+loghelper_queue_command(loghelper_instance_t *lh, loghelper_command_t cmd,  
short payload_len, const char payload[])
+{
+       u_int8_t u_cmd;
+       u_int16_t u_len;
+       loghelper_buffer_t *lb;
+
+       /* make sure the data will fit for now */
+       if (payload_len + 3 > 32768)
+               return 0;
+
+       u_cmd = cmd;
+       u_len = payload_len + 3;                /* 3 == 1 byte cmd, 2 byte 
length */
+
+       /* Create a new buffer */
+       lb = loghelper_buffer_create();
+
+
+       /* Copy in the command */
+
+       /* Copy in the packet length */
+
+       /* Copy in the payload, if any */
+
+       /* Add it to the instance buffer list tail */
+       loghelper_append_buffer(lh, lb);
+
+       /* Do we have a write scheduled? If not, schedule a write */
+       loghelper_kick_write(lh);
+
+       return 1;
+}
=======================================
--- /branches/LUSCA_HEAD/libloghelper/loghelper.h       Mon Oct 19 02:10:23 2009
+++ /branches/LUSCA_HEAD/libloghelper/loghelper.h       Mon Oct 19 21:29:45 2009
@@ -20,6 +20,7 @@
        struct {
                int closing:1;          /* whether we are closing - and further
                                           queued messages should be rejected */
+               int writing:1;          /* whether we're writing anything */
        } flags;
  };
  typedef struct _loghelper_instance loghelper_instance_t;

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"lusca-commit" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/lusca-commit?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to