Angus,
attached is fixed version of rb, which should do what you think current
implementation is doing. Please take a time to read comments beginning
with "^^^^^"
But basically:
- Implementation didn't support storing of data with non multiply
word-size (ie. 32-bit) records (7 bytes, ...)
- MMaped circular buffer needs backend in file. Take a look to coroipcc
how to make it in shm, ...
Regards,
Honza
(test.c is just simple test program, but try it run on current
implementation and you will be surprised what doesn't work).
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* All rights reserved.
*
* Author: Angus Salkeld <[email protected]>
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of Red Hat, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <sys/mman.h>
#include "cororb.h"
#include <fcntl.h>
#define ROUNDUP(x, y) ((((x) + ((y) - 1)) / (y)) * (y))
/* the chunk header is two words
* 1) the chunk data size
* 2) the magic number
*/
#define RB_CHUNK_HEADER_WORDS 2
#define RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * RB_CHUNK_HEADER_WORDS)
#define RB_CHUNK_MAGIC 0xdeadbeef
#define FDHEAD_INDEX (rb->size)
#define FDTAIL_INDEX (rb->size + 1)
static uint32_t circular_memory_map (void **buf, size_t bytes)
{
void *addr_orig;
void *addr;
int fd;
fd = open ("something.txt", O_RDWR|O_CREAT, 0666);
/* ^^^^^ CIRCULLAR BUFFER MUST HAVE BACKEND IN FILE */
if (fd == -1) {
fprintf(stderr,"BLE\n");
return -1;
}
ftruncate(fd, bytes);
addr_orig = mmap (*buf, bytes << 1, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if (addr_orig == MAP_FAILED) {
return (-1);
}
/* addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);*/
/* ^^^^ NOTHING - just another malloc*/
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_FIXED, fd, 0);
if (addr != addr_orig) {
return (-1);
}
#ifdef COROSYNC_BSD
madvise(addr_orig, bytes, MADV_NOSYNC);
#endif
/* addr = mmap (((char *)addr_orig) + bytes,
bytes, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);*/
/* ^^^^ NOTHING - just another malloc*/
addr = mmap (((char *)addr_orig) + bytes,
bytes, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_FIXED, fd, 0);
if ((char *)addr != (char *)((char *)addr_orig + bytes)) {
return (-1);
}
#ifdef COROSYNC_BSD
madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
#endif
*buf = addr_orig;
return (0);
}
cs_ringbuffer_t* cs_rb_create (size_t size, cs_ringbuffer_type type)
{
cs_ringbuffer_t* rb = malloc (sizeof (cs_ringbuffer_t));
size_t real_size = ROUNDUP(size, sysconf(_SC_PAGESIZE));
rb->buf = NULL;
if (circular_memory_map ((void**)&rb->buf, real_size) != 0) {
return NULL;
}
memset (rb->buf, 0, real_size * 2);
rb->type = type;
/*
* rb->size tracks data by ints and not bytes/chars.
*/
rb->size = real_size / sizeof (uint32_t);
/*
* First record starts at zero
* Last record ends at zero
*/
rb->write_pt = 0;
rb->read_pt = 0;
return rb;
}
size_t cs_rb_space_free (cs_ringbuffer_t *rb)
{
uint32_t write_size;
uint32_t read_size;
size_t space_free = 0;
write_size = rb->write_pt;
read_size = rb->read_pt;
if (write_size > read_size) {
space_free = (read_size - write_size + rb->size) - 1;
}
else if (write_size < read_size) {
space_free = (read_size - write_size) - 1;
}
else {
space_free = rb->size;
}
/* word -> bytes */
return (space_free * sizeof (uint32_t));
}
size_t cs_rb_space_used (cs_ringbuffer_t *rb)
{
uint32_t write_size;
uint32_t read_size;
size_t space_used;
write_size = rb->write_pt;
read_size = rb->read_pt;
if (write_size > read_size) {
space_used = write_size - read_size;
} else
if (write_size < read_size) {
space_used = (write_size - read_size + rb->size) - 1;
} else {
space_used = 0;
}
/* word -> bytes */
return (space_used * sizeof (uint32_t));
}
void* cs_rb_chunk_writable_alloc (cs_ringbuffer_t *rb, size_t len)
{
uint32_t idx;
/*
* Reclaim data if we are over writing and we need space
*/
if (rb->type == CS_RB_OVERWRITE) {
while (cs_rb_space_free (rb) < (len + RB_CHUNK_HEADER_SIZE)) {
fprintf(stderr,"Reclaiming\n");
cs_rb_chunk_reclaim (rb);
}
} else {
if (cs_rb_space_free (rb) < (len + RB_CHUNK_HEADER_SIZE)) {
return NULL;
}
}
idx = rb->write_pt;
/*
* insert the chunk header
*/
rb->buf[idx++] = len;
idx_word_step(idx);
rb->buf[idx++] = RB_CHUNK_MAGIC;
idx_word_step(idx);
/*
* return a pointer to the begining of the chunk data
*/
return (void*)&rb->buf[idx];
}
void cs_rb_chunk_writable_commit (cs_ringbuffer_t *rb, size_t len)
{
uint32_t idx = rb->write_pt;
/*
* skip over the chunk header
*/
rb->buf[idx++] = len;
idx_word_step(idx);
rb->buf[idx++] = RB_CHUNK_MAGIC;
idx_word_step(idx);
/*
* skip over the user's chunk.
*/
idx += (len / sizeof (uint32_t)) + ((len % sizeof (uint32_t)) == 0 ? 0 : 1);
/* ^^^^ DO WE WANT SUPPORT FOR not-word-sized items or not? If so, magic is needed*/
idx_buffer_step (idx);
/*
* commit the write_pt
*/
rb->write_pt = idx;
}
size_t cs_rb_chunk_write (cs_ringbuffer_t *rb, const void* data, size_t len)
{
char *dest = cs_rb_chunk_writable_alloc (rb, len);
if (dest == NULL) {
return 0;
}
/*
* copy the data
*/
memcpy (dest, data, len);
cs_rb_chunk_writable_commit (rb, len);
return len;
}
void cs_rb_chunk_reclaim (cs_ringbuffer_t *rb)
{
int words_needed = 0;
uint32_t chunk_size = rb->buf[rb->read_pt];
uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
//printf ("%s: size:%d magic:%d\n", __func__, chunk_size, chunk_magic);
assert (chunk_magic == RB_CHUNK_MAGIC);
words_needed = (chunk_size + RB_CHUNK_HEADER_SIZE) / sizeof (uint32_t) + ((chunk_size % sizeof (uint32_t)) == 0 ? 0 : 1);
/* ^^^^ DO WE WANT SUPPORT FOR not-word-sized items or not? If so, magic is needed*/
rb->read_pt = (rb->read_pt + words_needed) % (rb->size);
}
size_t cs_rb_chunk_peek (cs_ringbuffer_t *rb, void **data_out)
{
uint32_t chunk_size = rb->buf[rb->read_pt];
uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
*data_out = &rb->buf[rb->read_pt + RB_CHUNK_HEADER_WORDS];
if (chunk_magic != RB_CHUNK_MAGIC) {
return 0;
} else {
return chunk_size;
}
}
size_t cs_rb_chunk_read (cs_ringbuffer_t *rb, void *data_out, size_t len)
{
uint32_t chunk_size = rb->buf[rb->read_pt];
uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
if (cs_rb_space_used (rb) == 0) {
return 0;
}
if (chunk_magic != RB_CHUNK_MAGIC) {
return 0;
}
if (len < chunk_size) {
return 0;
}
memcpy (data_out, &rb->buf[rb->read_pt + RB_CHUNK_HEADER_WORDS], chunk_size);
cs_rb_chunk_reclaim (rb);
return chunk_size;
}
static void print_header (cs_ringbuffer_t *rb)
{
printf ("Ringbuffer: \n");
if (rb->type == CS_RB_OVERWRITE) {
printf (" ->OVERWRITE\n");
} else {
printf (" ->NORMAL\n");
}
printf (" ->write_pt [%d]\n", rb->write_pt);
printf (" ->read_pt [%d]\n", rb->read_pt);
printf (" ->size [%d words]\n", rb->size);
printf (" =>free [%d bytes]\n", cs_rb_space_free (rb));
printf (" =>used [%d bytes]\n", cs_rb_space_used (rb));
}
size_t cs_rb_write_to_file (cs_ringbuffer_t *rb, int fd)
{
ssize_t written_size;
print_header (rb);
written_size = write (fd, &rb->size, sizeof (uint32_t));
if ((written_size < 0) || (written_size != sizeof (uint32_t))) {
return -1;
}
written_size = write (fd, rb->buf, rb->size * sizeof (unsigned int));
/*
* store the read & write pointers
*/
written_size += write (fd, (void*)&rb->write_pt, sizeof (uint32_t));
written_size += write (fd, (void*)&rb->read_pt, sizeof (uint32_t));
return written_size;
}
cs_ringbuffer_t *cs_rb_create_from_file (int fd, cs_ringbuffer_type type)
{
ssize_t n_read;
size_t n_required;
cs_ringbuffer_t *rb = malloc (sizeof (cs_ringbuffer_t));
rb->type = type;
n_required = sizeof (unsigned int);
n_read = read (fd, &rb->size, n_required);
if (n_read != n_required) {
fprintf (stderr, "Unable to read fdata header\n");
return NULL;
}
n_required = ((rb->size + 2) * sizeof(unsigned int));
if ((rb->buf = malloc (n_required)) == NULL) {
fprintf (stderr, "exhausted virtual memory\n");
return NULL;
}
n_read = read (fd, rb->buf, n_required);
if (n_read < 0) {
fprintf (stderr, "reading file failed: %s\n",
strerror (errno));
return NULL;
}
if (n_read != n_required) {
printf ("Warning: read %lu bytes, but expected %lu\n",
(unsigned long) n_read, (unsigned long) n_required);
}
rb->write_pt = rb->buf[FDHEAD_INDEX];
rb->read_pt = rb->buf[FDTAIL_INDEX];
print_header (rb);
return rb;
}
#include <stdio.h>
#include "cororb.h"
int main(void) {
cs_ringbuffer_t *t;
char data[] = "1234567891";
int i;
char *new_data;
ssize_t l;
t = cs_rb_create (10, CS_RB_OVERWRITE);
for (i = 0; i < 2000; i++) {
fprintf(stderr, "Bytes written: %zu, free %zu\n", cs_rb_chunk_write (t, data, strlen (data)), cs_rb_space_free (t));
if (i == 0) {
data[0]='b';
}
}
for (i=0;i<2000;i++) {
fprintf (stderr, "Get %d\n", l=cs_rb_chunk_peek (t, &new_data));
if (l == 0) {
fprintf (stderr, "EE\n");
exit (1);
}
for (i = 0; i < l; i++) {
fprintf (stderr, "%x ", (int)new_data[i]);
}
fprintf(stderr, "\n");
cs_rb_chunk_reclaim (t);
}
return (0);
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais