I have pulled the ringbuffer out of logsys and corosync-fplay
into a single file ringbuffer.c. I was intending this to be for
libqb, but it might help simplify logsys.c whilst we are hunting
out our current bug.

This seems to work well with the following testing:
- debug=on/off with cpgbench.
- killall -SIGSEGV corosync
- corosync-fplay

Signed-off-by: Angus Salkeld <[email protected]>
---
 exec/Makefile.am          |    2 +-
 exec/logsys.c             |  189 +++---------------------
 exec/ringbuffer.c         |  369 +++++++++++++++++++++++++++++++++++++++++++++
 include/corosync/cororb.h |  129 ++++++++++++++++
 tools/Makefile.am         |    3 +
 tools/corosync-fplay.c    |  122 ++++++---------
 6 files changed, 568 insertions(+), 246 deletions(-)
 create mode 100644 exec/ringbuffer.c
 create mode 100644 include/corosync/cororb.h

diff --git a/exec/Makefile.am b/exec/Makefile.am
index f367f29..e43865d 100644
--- a/exec/Makefile.am
+++ b/exec/Makefile.am
@@ -42,7 +42,7 @@ if BUILD_RDMA
 TOTEM_SRC              += totemiba.c
 endif
 
-LOGSYS_SRC             = wthread.c logsys.c
+LOGSYS_SRC             = wthread.c logsys.c ringbuffer.c
 COROIPCS_SRC           = coroipcs.c
 
 LCRSO_SRC              = objdb.c vsf_ykd.c coroparse.c vsf_quorum.c
diff --git a/exec/logsys.c b/exec/logsys.c
index 7463798..c28ebed 100644
--- a/exec/logsys.c
+++ b/exec/logsys.c
@@ -40,6 +40,7 @@
 #include <stdint.h>
 #include <stdio.h>
 #include <ctype.h>
+#include <assert.h>
 #include <string.h>
 #include <stdarg.h>
 #include <sys/time.h>
@@ -64,6 +65,7 @@
 #include <semaphore.h>
 
 #include <corosync/list.h>
+#include <corosync/cororb.h>
 #include <corosync/engine/logsys.h>
 
 #define YIELD_AFTER_LOG_OPS 10
@@ -144,17 +146,7 @@ struct logsys_logger {
                                                   for subsystems */
 };
 
-
-/*
- * These are not static so they can be read from the core file
- */
-int *flt_data;
-
-uint32_t flt_head;
-
-uint32_t flt_tail;
-
-unsigned int flt_data_size;
+static cs_ringbuffer_t* rb = NULL;
 
 #define COMBINE_BUFFER_SIZE 2048
 
@@ -202,8 +194,6 @@ static pthread_spinlock_t logsys_wthread_spinlock;
 static pthread_mutex_t logsys_wthread_mutex = PTHREAD_MUTEX_INITIALIZER;
 #endif
 
-static int logsys_buffer_full = 0;
-
 static char *format_buffer=NULL;
 
 static int logsys_dropped_messages = 0;
@@ -298,47 +288,6 @@ static void dump_full_config(void)
 }
 #endif
 
-static uint32_t circular_memory_map (void **buf, size_t bytes)
-{
-       void *addr_orig;
-       void *addr;
-
-       addr_orig = mmap (*buf, bytes << 1, PROT_NONE,
-               MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
-
-       if (addr_orig == MAP_FAILED) {
-printf ("a\n");
-               return (-1);
-       }
-
-       addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
-               MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
-
-       if (addr != addr_orig) {
-printf ("b %d\n", errno);
-exit (1);
-               return (-1);
-       }
-#ifdef COROSYNC_BSD
-       madvise(addr_orig, bytes, MADV_NOSYNC);
-#endif
-
-       addr = mmap (((char *)addr_orig) + bytes,
-                  bytes, PROT_READ | PROT_WRITE,
-                  MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
-       if ((char *)addr != (char *)((char *)addr_orig + bytes)) {
-printf ("c %d\n", errno);
-exit (1);
-               return (-1);
-       }
-#ifdef COROSYNC_BSD
-       madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
-#endif
-
-       *buf = addr_orig;
-       return (0);
-}
-
 #if defined(HAVE_PTHREAD_SPIN_LOCK)
 static void logsys_flt_lock (void)
 {
@@ -380,62 +329,6 @@ static void logsys_wthread_unlock (void)
 #endif
 
 /*
- * Before any write operation, a reclaim on the buffer area must be executed
- */
-static inline void records_reclaim (unsigned int idx, unsigned int words)
-{
-       unsigned int should_reclaim;
-
-       should_reclaim = 0;
-
-       if ((idx + words) >= flt_data_size) {
-               logsys_buffer_full = 1;
-       }
-       if (logsys_buffer_full == 0) {
-               return;
-       }
-
-       if (flt_tail > flt_head) {
-               if (idx + words >= flt_tail) {
-                       should_reclaim = 1;
-               }
-       } else {
-               if ((idx + words) >= (flt_tail + flt_data_size)) {
-                       should_reclaim = 1;
-               }
-       }
-
-       if (should_reclaim) {
-               int words_needed = 0;
-
-               words_needed = words + 1;
-               do {
-                       unsigned int old_tail;
-
-                       words_needed -= flt_data[flt_tail];
-                       old_tail = flt_tail;
-                       flt_tail =
-                               (flt_tail +
-                               flt_data[flt_tail]) % (flt_data_size);
-               } while (words_needed > 0);
-       }
-}
-
-#define idx_word_step(idx)                                             \
-do {                                                                   \
-       if (idx > (flt_data_size - 1)) {                                \
-               idx = 0;                                                \
-       }                                                               \
-} while (0);
-
-#define idx_buffer_step(idx)                                           \
-do {                                                                   \
-       if (idx > (flt_data_size - 1)) {                                \
-               idx = ((idx) % (flt_data_size));                        \
-       }                                                               \
-} while (0);
-
-/*
  * Internal threaded logging implementation
  */
 static inline int strcpy_cutoff (char *dest, const char *src, size_t cutoff,
@@ -1066,8 +959,6 @@ int _logsys_wthread_create (void)
 
 int _logsys_rec_init (unsigned int fltsize)
 {
-       size_t flt_real_size;
-
        sem_init (&logsys_thread_start, 0, 0);
 
        sem_init (&logsys_print_finished, 0, 0);
@@ -1077,33 +968,7 @@ int _logsys_rec_init (unsigned int fltsize)
        pthread_spin_init (&logsys_wthread_spinlock, 1);
 #endif
 
-       /*
-        * XXX: kill me for 1.1 because I am a dirty hack
-        * temporary workaround that will be replaced by supporting
-        * 0 byte size flight recorder buffer.
-        * 0 byte size buffer will enable direct printing to logs
-        *   without flight recoder.
-        */
-       if (fltsize < 64000) {
-               fltsize = 64000;
-       }
-
-       flt_real_size = ROUNDUP(fltsize, sysconf(_SC_PAGESIZE));
-
-       circular_memory_map ((void **)&flt_data, flt_real_size);
-
-       memset (flt_data, 0, flt_real_size * 2);
-       /*
-        * flt_data_size tracks data by ints and not bytes/chars.
-        */
-
-       flt_data_size = flt_real_size / sizeof (uint32_t);
-       /*
-        * First record starts at zero
-        * Last record ends at zero
-        */
-       flt_head = 0;
-       flt_tail = 0;
+       rb = cs_rb_create (fltsize, CS_RB_OVERWRITE);
 
        return (0);
 }
@@ -1139,9 +1004,8 @@ void _logsys_log_rec (
        unsigned int idx;
        unsigned int arguments = 0;
        unsigned int record_reclaim_size = 0;
-       unsigned int index_start;
-       int words_written;
        int subsysid;
+       int32_t *flt_data;
 
        subsysid = LOGSYS_DECODE_SUBSYSID(rec_ident);
 
@@ -1177,20 +1041,12 @@ void _logsys_log_rec (
                record_reclaim_size += ((buf_len[i] + 3) >> 2) + 1;
        }
 
+       record_reclaim_size+= 4;
        logsys_flt_lock();
-       idx = flt_head;
-       index_start = idx;
 
-       /*
-        * Reclaim data needed for record including 4 words for the header
-        */
-       records_reclaim (idx, record_reclaim_size + 4);
-
-       /*
-        * Write record size of zero and rest of header information
-        */
-       flt_data[idx++] = 0;
-       idx_word_step(idx);
+       flt_data = cs_rb_chunk_writable_alloc (rb, (record_reclaim_size * 
sizeof (uint32_t)));
+       assert(flt_data != NULL);
+       idx = 0;
 
        flt_data[idx++] = rec_ident;
        idx_word_step(idx);
@@ -1219,17 +1075,19 @@ void _logsys_log_rec (
                idx_buffer_step (idx);
 
        }
-       words_written = idx - index_start;
-       if (words_written < 0) {
-               words_written += flt_data_size;
-       }
+
        /*
         * Commit the write of the record size now that the full record
         * is in the memory buffer
         */
-       flt_data[index_start] = words_written;
+       if (record_reclaim_size < idx) {
+               printf ("record_reclaim_size:%d idx:%d commit_size:%d",
+                       record_reclaim_size, idx, (idx * sizeof(uint32_t)));
+               sleep(1);
+               assert(0);
+       }
+       cs_rb_chunk_writable_commit (rb, (idx * sizeof(uint32_t)));
 
-       flt_head = idx;
        logsys_flt_unlock();
        records_written++;
 }
@@ -1617,28 +1475,17 @@ int logsys_log_rec_store (const char *filename)
 {
        int fd;
        ssize_t written_size;
-       size_t size_to_write = (flt_data_size + 2) * sizeof (unsigned int);
 
        fd = open (filename, O_CREAT|O_RDWR, 0700);
        if (fd < 0) {
                return (-1);
        }
 
-       written_size = write (fd, &flt_data_size, sizeof(unsigned int));
-       if ((written_size < 0) || (written_size != sizeof(unsigned int))) {
-               close (fd);
-               return (-1);
-       }
-
-       written_size = write (fd, flt_data, flt_data_size * sizeof (unsigned 
int));
-       written_size += write (fd, &flt_head, sizeof (uint32_t));
-       written_size += write (fd, &flt_tail, sizeof (uint32_t));
+       written_size = cs_rb_write_to_file (rb, fd);
        if (close (fd) != 0)
                return (-1);
        if (written_size < 0) {
                return (-1);
-       } else if ((size_t)written_size != size_to_write) {
-               return (-1);
        }
 
        return (0);
diff --git a/exec/ringbuffer.c b/exec/ringbuffer.c
new file mode 100644
index 0000000..177279e
--- /dev/null
+++ b/exec/ringbuffer.c
@@ -0,0 +1,369 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Angus Salkeld <[email protected]>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of Red Hat, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <config.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <stdint.h>
+#include <string.h>
+#include <assert.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <corosync/cororb.h>
+#include "util.h"
+
+
+#define ROUNDUP(x, y) ((((x) + ((y) - 1)) / (y)) * (y))
+
+/* the chunk header is two words
+ * 1) the chunk data size
+ * 2) the magic number
+ */
+#define RB_CHUNK_HEADER_WORDS 2
+#define RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * RB_CHUNK_HEADER_WORDS)
+#define RB_CHUNK_MAGIC 0xdeadbeef
+#define FDHEAD_INDEX           (rb->size)
+#define FDTAIL_INDEX           (rb->size + 1)
+
+
+static uint32_t circular_memory_map (void **buf, size_t bytes)
+{
+       void *addr_orig;
+       void *addr;
+
+       addr_orig = mmap (*buf, bytes << 1, PROT_NONE,
+               MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+
+       if (addr_orig == MAP_FAILED) {
+               return (-1);
+       }
+
+       addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+               MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
+
+       if (addr != addr_orig) {
+               return (-1);
+       }
+#ifdef COROSYNC_BSD
+       madvise(addr_orig, bytes, MADV_NOSYNC);
+#endif
+
+       addr = mmap (((char *)addr_orig) + bytes,
+                  bytes, PROT_READ | PROT_WRITE,
+                  MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
+       if ((char *)addr != (char *)((char *)addr_orig + bytes)) {
+               return (-1);
+       }
+#ifdef COROSYNC_BSD
+       madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
+#endif
+
+       *buf = addr_orig;
+       return (0);
+}
+
+cs_ringbuffer_t* cs_rb_create (size_t size, cs_ringbuffer_type type)
+{
+       cs_ringbuffer_t* rb = malloc (sizeof (cs_ringbuffer_t));
+       size_t real_size = ROUNDUP(size, sysconf(_SC_PAGESIZE));
+
+
+       if (circular_memory_map ((void**)&rb->buf, real_size) != 0) {
+               return NULL;
+       }
+
+       memset (rb->buf, 0, real_size * 2);
+
+       rb->type = type;
+       /*
+        * rb->size tracks data by ints and not bytes/chars.
+        */
+       rb->size = real_size / sizeof (uint32_t);
+       /*
+        * First record starts at zero
+        * Last record ends at zero
+        */
+       rb->write_pt = 0;
+       rb->read_pt = 0;
+
+       return rb;
+}
+
+size_t cs_rb_space_free (cs_ringbuffer_t *rb)
+{
+       uint32_t write_size;
+       uint32_t read_size;
+       size_t space_free = 0;
+
+       write_size = rb->write_pt;
+       read_size = rb->read_pt;
+
+       if (write_size > read_size) {
+               space_free = (read_size - write_size + rb->size) - 1;
+       }
+       else if (write_size < read_size) {
+               space_free = (read_size - write_size) - 1;
+       }
+       else {
+               space_free = rb->size;
+       }
+       /* word -> bytes */
+       return (space_free * sizeof (uint32_t));
+}
+
+size_t cs_rb_space_used (cs_ringbuffer_t *rb)
+{
+       uint32_t write_size;
+       uint32_t read_size;
+       size_t space_used;
+
+       write_size = rb->write_pt;
+       read_size = rb->read_pt;
+
+       if (write_size > read_size) {
+               space_used = write_size - read_size;
+       } else
+       if (write_size < read_size) {
+               space_used = (write_size - read_size + rb->size) - 1;
+       } else {
+               space_used = 0;
+       }
+       /* word -> bytes */
+       return (space_used * sizeof (uint32_t));
+}
+
+void* cs_rb_chunk_writable_alloc (cs_ringbuffer_t *rb, size_t len)
+{
+       uint32_t idx;
+
+       /*
+        * Reclaim data if we are over writing and we need space
+        */
+       if (rb->type == CS_RB_OVERWRITE) {
+               while (cs_rb_space_free (rb) < (len + RB_CHUNK_HEADER_SIZE)) {
+                       cs_rb_chunk_reclaim (rb);
+               }
+       } else {
+               if (cs_rb_space_free (rb) < (len + RB_CHUNK_HEADER_SIZE)) {
+                       return NULL;
+               }
+       }
+       idx = rb->write_pt;
+       /*
+        * insert the chunk header
+        */
+       rb->buf[idx++] = len;
+       idx_word_step(idx);
+       rb->buf[idx++] = RB_CHUNK_MAGIC;
+       idx_word_step(idx);
+
+       /*
+        * return a pointer to the begining of the chunk data
+        */
+       return (void*)&rb->buf[idx];
+
+}
+
+void cs_rb_chunk_writable_commit (cs_ringbuffer_t *rb, size_t len)
+{
+       uint32_t idx = rb->write_pt;
+
+       /*
+        * skip over the chunk header
+        */
+       rb->buf[idx++] = len;
+       idx_word_step(idx);
+       rb->buf[idx++] = RB_CHUNK_MAGIC;
+       idx_word_step(idx);
+
+       /*
+        * skip over the user's chunk.
+        */
+       idx += (len / sizeof (uint32_t));
+       idx_buffer_step (idx);
+
+       /*
+        * commit the write_pt
+        */
+       rb->write_pt = idx;
+}
+
+size_t cs_rb_chunk_write (cs_ringbuffer_t *rb, const void* data, size_t len)
+{
+       char *dest = cs_rb_chunk_writable_alloc (rb, len);
+
+       if (dest == NULL) {
+               return 0;
+       }
+
+       /*
+        * copy the data
+        */
+       memcpy (dest, data, len);
+
+       cs_rb_chunk_writable_commit (rb, len);
+
+       return len;
+}
+
+void cs_rb_chunk_reclaim (cs_ringbuffer_t *rb)
+{
+       int words_needed = 0;
+       uint32_t chunk_size = rb->buf[rb->read_pt];
+       uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
+
+       //printf ("%s: size:%d magic:%d\n", __func__, chunk_size, chunk_magic);
+       assert (chunk_magic == RB_CHUNK_MAGIC);
+       words_needed = (chunk_size + RB_CHUNK_HEADER_SIZE) / sizeof (uint32_t);
+
+       rb->read_pt = (rb->read_pt + words_needed) % (rb->size);
+}
+
+size_t cs_rb_chunk_peek (cs_ringbuffer_t *rb, void **data_out)
+{
+       uint32_t chunk_size = rb->buf[rb->read_pt];
+       uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
+
+       *data_out = &rb->buf[rb->read_pt + RB_CHUNK_HEADER_WORDS];
+
+       if (chunk_magic != RB_CHUNK_MAGIC) {
+               return 0;
+       } else {
+               return chunk_size;
+       }
+}
+
+size_t cs_rb_chunk_read (cs_ringbuffer_t *rb, void *data_out, size_t len)
+{
+       uint32_t chunk_size = rb->buf[rb->read_pt];
+       uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
+
+       if (cs_rb_space_used (rb) == 0) {
+               return 0;
+       }
+
+       if (chunk_magic != RB_CHUNK_MAGIC) {
+               return 0;
+       }
+
+       if (len < chunk_size) {
+               return 0;
+       }
+
+       memcpy (data_out, &rb->buf[rb->read_pt + RB_CHUNK_HEADER_WORDS], 
chunk_size);
+
+       cs_rb_chunk_reclaim (rb);
+
+       return chunk_size;
+}
+
+static void print_header (cs_ringbuffer_t *rb)
+{
+       printf ("Ringbuffer: \n");
+       if (rb->type == CS_RB_OVERWRITE) {
+               printf (" ->OVERWRITE\n");
+       } else {
+               printf (" ->NORMAL\n");
+       }
+       printf (" ->write_pt [%d]\n", rb->write_pt);
+       printf (" ->read_pt [%d]\n", rb->read_pt);
+       printf (" ->size [%d words]\n", rb->size);
+
+       printf (" =>free [%d bytes]\n", cs_rb_space_free (rb));
+       printf (" =>used [%d bytes]\n", cs_rb_space_used (rb));
+}
+
+
+size_t cs_rb_write_to_file (cs_ringbuffer_t *rb, int fd)
+{
+       ssize_t written_size;
+
+       print_header (rb);
+
+       written_size = write (fd, &rb->size, sizeof (uint32_t));
+       if ((written_size < 0) || (written_size != sizeof (uint32_t))) {
+               return -1;
+       }
+
+       written_size = write (fd, rb->buf, rb->size * sizeof (unsigned int));
+       /*
+        * store the read & write pointers
+        */
+       written_size += write (fd, (void*)&rb->write_pt, sizeof (uint32_t));
+       written_size += write (fd, (void*)&rb->read_pt, sizeof (uint32_t));
+
+       return written_size;
+}
+
+cs_ringbuffer_t *cs_rb_create_from_file (int fd, cs_ringbuffer_type type)
+{
+       ssize_t n_read;
+       size_t n_required;
+       cs_ringbuffer_t *rb = malloc (sizeof (cs_ringbuffer_t));
+
+       rb->type = type;
+
+       n_required = sizeof (unsigned int);
+       n_read = read (fd, &rb->size, n_required);
+       if (n_read != n_required) {
+               fprintf (stderr, "Unable to read fdata header\n");
+               return NULL;
+       }
+
+       n_required = ((rb->size + 2) * sizeof(unsigned int));
+
+       if ((rb->buf = malloc (n_required)) == NULL) {
+               fprintf (stderr, "exhausted virtual memory\n");
+               return NULL;
+       }
+       n_read = read (fd, rb->buf, n_required);
+       if (n_read < 0) {
+               fprintf (stderr, "reading file failed: %s\n",
+                        strerror (errno));
+               return NULL;
+       }
+
+       if (n_read != n_required) {
+               printf ("Warning: read %lu bytes, but expected %lu\n",
+                       (unsigned long) n_read, (unsigned long) n_required);
+       }
+
+       rb->write_pt = rb->buf[FDHEAD_INDEX];
+       rb->read_pt = rb->buf[FDTAIL_INDEX];
+
+       print_header (rb);
+
+       return rb;
+}
+
diff --git a/include/corosync/cororb.h b/include/corosync/cororb.h
new file mode 100644
index 0000000..8cca162
--- /dev/null
+++ b/include/corosync/cororb.h
@@ -0,0 +1,129 @@
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Angus Salkeld <[email protected]>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of Red Hat, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef CS_RB_DEFINED
+#define CS_RB_DEFINED
+
+#include <sys/types.h>
+#include <stdint.h>
+
+typedef enum {
+       CS_RB_NORMAL,
+       CS_RB_OVERWRITE,
+} cs_ringbuffer_type;
+
+#define CS_RB_SHM_PATH_MAX 128
+
+typedef struct {
+   cs_ringbuffer_type   type;
+   volatile uint32_t  write_pt;
+   volatile uint32_t  read_pt;
+   uint32_t           size;
+   uint32_t          *buf;
+} cs_ringbuffer_t;
+
+
+
+#define idx_word_step(idx)                                             \
+do {                                                                   \
+       if (idx > (rb->size - 1)) {                             \
+               idx = 0;                                                \
+       }                                                               \
+} while (0);
+
+#define idx_buffer_step(idx)                                           \
+do {                                                                   \
+       if (idx > (rb->size - 1)) {                             \
+               idx = ((idx) % (rb->size));                     \
+       }                                                               \
+} while (0);
+
+/**
+ * initialize
+ */
+cs_ringbuffer_t* cs_rb_create (size_t size, cs_ringbuffer_type type);
+
+
+/**
+ * Try to add data to the buffer.
+ * @return the amount of bytes actually buffered.
+ */
+size_t cs_rb_chunk_write (cs_ringbuffer_t *rb, const void* data, size_t len);
+
+/**
+ * write the header
+ * @return pointer to chunk to write to, or NULL (if no space).
+ */
+void* cs_rb_chunk_writable_alloc (cs_ringbuffer_t *rb, size_t len);
+
+/**
+ * finalize the chunk.
+ */
+void cs_rb_chunk_writable_commit (cs_ringbuffer_t *rb, size_t len);
+
+/**
+ * read (without reclaiming) the last chunk.
+ * @return the size of the chunk.
+ */
+size_t cs_rb_chunk_peek (cs_ringbuffer_t *rb, void **data_out);
+
+/**
+ * reclaim the last chunk.
+ */
+void cs_rb_chunk_reclaim (cs_ringbuffer_t *rb);
+
+/**
+ * This is the same as cs_rb_chunk_peek() memcpy() and cs_rb_chunk_reclaim().
+ */
+size_t cs_rb_chunk_read (cs_ringbuffer_t *rb, void *data_out, size_t len);
+
+/**
+ * The amount of data currently available.
+ */
+size_t cs_rb_space_free (cs_ringbuffer_t *rb);
+
+/**
+ * The total amount of data in the buffer.
+ */
+size_t cs_rb_space_used (cs_ringbuffer_t *rb);
+
+
+/**
+ * write the contents of the Ring Buffer to file.
+ */
+size_t cs_rb_write_to_file (cs_ringbuffer_t *rb, int fd);
+
+cs_ringbuffer_t *cs_rb_create_from_file (int fd, cs_ringbuffer_type type);
+
+#endif /* CS_RB_DEFINED */
+
diff --git a/tools/Makefile.am b/tools/Makefile.am
index 3a52f4a..01dacbc 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -40,6 +40,9 @@ bin_SCRIPTS           = corosync-blackbox
 
 EXTRA_DIST             = $(bin_SCRIPTS)
 
+corosync_fplay_LDADD   = -llogsys
+corosync_fplay_LDFLAGS = -L../exec
+
 corosync_pload_LDADD   = -lpload -lcoroipcc
 corosync_pload_LDFLAGS = -L../lib
 corosync_objctl_LDADD  = -lconfdb ../lcr/liblcr.a -lcoroipcc
diff --git a/tools/corosync-fplay.c b/tools/corosync-fplay.c
index f6f3bae..a2f2891 100644
--- a/tools/corosync-fplay.c
+++ b/tools/corosync-fplay.c
@@ -1,3 +1,36 @@
+/*
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake ([email protected])
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
 #include <config.h>
 
 #include <sys/types.h>
@@ -17,12 +50,7 @@
 #include <arpa/inet.h>
 
 #include <corosync/engine/logsys.h>
-
-unsigned int flt_data_size;
-
-unsigned int *flt_data;
-#define FDHEAD_INDEX           (flt_data_size)
-#define FDTAIL_INDEX           (flt_data_size + 1)
+#include <corosync/cororb.h>
 
 #define TOTEMIP_ADDRLEN (sizeof(struct in6_addr))
 
@@ -341,34 +369,12 @@ static struct printer_subsys printer_subsystems[] = {
 static unsigned int printer_subsys_count =
   sizeof (printer_subsystems) / sizeof (struct printer_subsys);
 
-static unsigned int g_record[10000];
-
-/*
- * Copy record, dealing with wrapping
- */
-static int logsys_rec_get (int rec_idx) {
-       unsigned int rec_size;
-       int firstcopy, secondcopy;
-
-       rec_size = flt_data[rec_idx];
-
-       firstcopy = rec_size;
-       secondcopy = 0;
-       if (firstcopy + rec_idx > flt_data_size) {
-               firstcopy = flt_data_size - rec_idx;
-               secondcopy -= firstcopy - rec_size;
-       }
-       memcpy (&g_record[0], &flt_data[rec_idx], firstcopy<<2);
-       if (secondcopy) {
-               memcpy (&g_record[firstcopy], &flt_data[0], secondcopy<<2);
-       }
-       return ((rec_idx + rec_size) % flt_data_size);
-}
+#define G_RECORD_SIZE 10000
+static unsigned int g_record[G_RECORD_SIZE];
 
-static void logsys_rec_print (const void *record)
+static void logsys_rec_print (const void *record, size_t rec_size)
 {
        const unsigned int *buf_uint32t = record;
-       unsigned int rec_size;
        unsigned int rec_ident;
        unsigned int level;
        unsigned int line;
@@ -382,16 +388,15 @@ static void logsys_rec_print (const void *record)
        const char *arguments[64];
        int arg_count = 0;
 
-       rec_size = buf_uint32t[rec_idx];
-       rec_ident = buf_uint32t[rec_idx+1];
-       line = buf_uint32t[rec_idx+2];
-       record_number = buf_uint32t[rec_idx+3];
+       rec_ident = buf_uint32t[rec_idx];
+       line = buf_uint32t[rec_idx+1];
+       record_number = buf_uint32t[rec_idx+2];
 
        level = LOGSYS_DECODE_LEVEL(rec_ident);
 
        printf ("rec=[%d] ", record_number);
-       arg_size_idx = rec_idx + 4;
-       words_processed = 4;
+       arg_size_idx = rec_idx + 3;
+       words_processed = 3;
        for (i = 0; words_processed < rec_size; i++) {
                arguments[arg_count++] =
                  (const char *)&buf_uint32t[arg_size_idx + 1];
@@ -459,12 +464,10 @@ printf ("\n");
 int main (void)
 {
        int fd;
-       int rec_idx;
-       int end_rec;
        int record_count = 1;
-       ssize_t n_read;
+       size_t chunk_size;
        const char *data_file = LOCALSTATEDIR "/lib/corosync/fdata";
-       size_t n_required;
+       cs_ringbuffer_t *rb = NULL;
 
        if ((fd = open (data_file, O_RDONLY)) < 0) {
                fprintf (stderr, "failed to open %s: %s\n",
@@ -472,45 +475,16 @@ int main (void)
                return EXIT_FAILURE;
        }
 
-       n_required = sizeof (unsigned int);
-       n_read = read (fd, &flt_data_size, n_required);
-       if (n_read != n_required) {
-               fprintf (stderr, "Unable to read fdata header\n");
-               return EXIT_FAILURE;
-       }
-
-       n_required = ((flt_data_size + 2) * sizeof(unsigned int));
+       rb = cs_rb_create_from_file (fd, CS_RB_OVERWRITE);
 
-       if ((flt_data = malloc (n_required)) == NULL) {
-               fprintf (stderr, "exhausted virtual memory\n");
-               return EXIT_FAILURE;
-       }
-       n_read = read (fd, flt_data, n_required);
        close (fd);
-       if (n_read < 0) {
-               fprintf (stderr, "reading %s failed: %s\n",
-                        data_file, strerror (errno));
-               return EXIT_FAILURE;
-       }
-
-       if (n_read != n_required) {
-               printf ("Warning: read %lu bytes, but expected %lu\n",
-                       (unsigned long) n_read, (unsigned long) n_required);
-       }
-
-       rec_idx = flt_data[FDTAIL_INDEX];
-       end_rec = flt_data[FDHEAD_INDEX];
-
-       printf ("Starting replay: head [%d] tail [%d]\n",
-               flt_data[FDHEAD_INDEX],
-               flt_data[FDTAIL_INDEX]);
 
        for (;;) {
-               rec_idx = logsys_rec_get (rec_idx);
-               logsys_rec_print (g_record);
-               if (rec_idx == end_rec) {
+               chunk_size = cs_rb_chunk_read (rb, g_record, G_RECORD_SIZE);
+               if (chunk_size == 0) {
                        break;
                }
+               logsys_rec_print (g_record, (chunk_size / sizeof (uint32_t)));
                record_count += 1;
        }
 
-- 
1.6.6.1

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to