src/pulsecore/srbchannel.c | 55 ++++++++++++++++++++++++++++++++++++--------- src/pulsecore/srbchannel.h | 16 ++++++------- 2 files changed, 53 insertions(+), 18 deletions(-)
New commits: commit a27e6d6d9e7b92dbbb2bc0312e369f393e0d6bc6 Author: Arun Raghavan <a...@accosted.net> Date: Wed Aug 6 07:34:24 2014 +0530 srbchannel: Trivial whitespace and style fixes Mostly to improve readability and make things a bit more consistent. diff --git a/src/pulsecore/srbchannel.c b/src/pulsecore/srbchannel.c index 35c86d1..63a9748 100644 --- a/src/pulsecore/srbchannel.c +++ b/src/pulsecore/srbchannel.c @@ -31,11 +31,12 @@ /* #define DEBUG_SRBCHANNEL */ /* This ringbuffer might be useful in other contexts too, but - right now it's only used inside the srbchannel, so let's keep it here - for the time being. */ + * right now it's only used inside the srbchannel, so let's keep it here + * for the time being. */ typedef struct pa_ringbuffer pa_ringbuffer; + struct pa_ringbuffer { - pa_atomic_t *count; + pa_atomic_t *count; /* amount of data in the buffer, can be negative */ int capacity; uint8_t *memory; int readindex, writeindex; @@ -43,24 +44,30 @@ struct pa_ringbuffer { static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) { int c = pa_atomic_load(r->count); + if (r->readindex + c > r->capacity) *count = r->capacity - r->readindex; else *count = c; + return r->memory + r->readindex; } /* Returns true only if the buffer was completely full before the drop. */ static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) { bool b = pa_atomic_sub(r->count, count) >= r->capacity; + r->readindex += count; r->readindex %= r->capacity; + return b; } static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) { int c = pa_atomic_load(r->count); + *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c); + return r->memory + r->writeindex; } @@ -74,34 +81,40 @@ struct pa_srbchannel { pa_ringbuffer rb_read, rb_write; pa_fdsem *sem_read, *sem_write; pa_memblock *memblock; + void *cb_userdata; pa_srbchannel_cb_t callback; + pa_io_event *read_event; pa_mainloop_api *mainloop; }; /* We always listen to sem_read, and always signal on sem_write. - - This means we signal the same semaphore for two scenarios: - 1) We have written something to our send buffer, and want the other - side to read it - 2) We have read something from our receive buffer that was previously - completely full, and want the other side to continue writing + * + * This means we signal the same semaphore for two scenarios: + * 1) We have written something to our send buffer, and want the other + * side to read it + * 2) We have read something from our receive buffer that was previously + * completely full, and want the other side to continue writing */ size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) { size_t written = 0; + while (l > 0) { int towrite; void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite); + if ((size_t) towrite > l) towrite = l; + if (towrite == 0) { #ifdef DEBUG_SRBCHANNEL pa_log("srbchannel output buffer full"); #endif break; } + memcpy(ptr, data, towrite); pa_ringbuffer_end_write(&sr->rb_write, towrite); written += towrite; @@ -111,20 +124,26 @@ size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) { #ifdef DEBUG_SRBCHANNEL pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written); #endif + pa_fdsem_post(sr->sem_write); return written; } size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) { size_t isread = 0; + while (l > 0) { int toread; void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread); + if ((size_t) toread > l) toread = l; + if (toread == 0) break; + memcpy(data, ptr, toread); + if (pa_ringbuffer_drop(&sr->rb_read, toread)) { #ifdef DEBUG_SRBCHANNEL pa_log("Read from full output buffer, signalling fdsem"); @@ -136,9 +155,11 @@ size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) { data = (uint8_t*) data + toread; l -= toread; } + #ifdef DEBUG_SRBCHANNEL pa_log("Read %d bytes from srbchannel", (int) isread); #endif + return isread; } @@ -147,11 +168,14 @@ size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) { struct srbheader { pa_atomic_t read_count; pa_atomic_t write_count; + pa_fdsem_data read_semdata; pa_fdsem_data write_semdata; + int capacity; int readbuf_offset; int writebuf_offset; + /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */ }; @@ -163,13 +187,14 @@ static void srbchannel_rwloop(pa_srbchannel* sr) { pa_log("In rw loop from srbchannel, before callback, count = %d", q); #endif - if (sr->callback) + if (sr->callback) { if (!sr->callback(sr, sr->cb_userdata)) { #ifdef DEBUG_SRBCHANNEL pa_log("Aborting read loop from srbchannel"); #endif return; } + } #ifdef DEBUG_SRBCHANNEL pa_ringbuffer_peek(&sr->rb_read, &q); @@ -199,14 +224,19 @@ pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) { sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh)); srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh; + capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2; + sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity); srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh; + capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset); + pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes", (int) pa_memblock_get_length(sr->memblock), capacity); srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity; + sr->rb_read.count = &srh->read_count; sr->rb_write.count = &srh->write_count; @@ -219,9 +249,11 @@ pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) { goto fail; readfd = pa_fdsem_get(sr->sem_read); + #ifdef DEBUG_SRBCHANNEL pa_log("Enabling io event on fd %d", readfd); #endif + sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr); m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); @@ -235,6 +267,7 @@ fail: static void pa_srbchannel_swap(pa_srbchannel *sr) { pa_srbchannel temp = *sr; + sr->sem_read = temp.sem_write; sr->sem_write = temp.sem_read; sr->rb_read = temp.rb_write; @@ -255,6 +288,7 @@ pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity; sr->rb_read.count = &srh->read_count; sr->rb_write.count = &srh->write_count; + sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset; sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset; @@ -272,6 +306,7 @@ pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel #ifdef DEBUG_SRBCHANNEL pa_log("Enabling io event on fd %d", t->readfd); #endif + sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr); m->io_enable(sr->read_event, PA_IO_EVENT_INPUT); diff --git a/src/pulsecore/srbchannel.h b/src/pulsecore/srbchannel.h index 843bf96..e41cc52 100644 --- a/src/pulsecore/srbchannel.h +++ b/src/pulsecore/srbchannel.h @@ -27,7 +27,7 @@ #include <pulsecore/memblock.h> /* An shm ringbuffer that is used for low overhead server-client communication. - Signaling is done through eventfd semaphores (pa_fdsem). */ + * Signaling is done through eventfd semaphores (pa_fdsem). */ typedef struct pa_srbchannel pa_srbchannel; @@ -48,13 +48,13 @@ size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l); size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l); /* Set the callback function that is called whenever data becomes available for reading. - It can also be called if the output buffer was full and can now be written to. - - Return false to abort all processing (e g if the srbchannel has been freed during the callback). - Otherwise return true. - - Note that the callback will be called immediately, to be able to process stuff that - might already be in the buffer. + * It can also be called if the output buffer was full and can now be written to. + * + * Return false to abort all processing (e g if the srbchannel has been freed during the callback). + * Otherwise return true. + * + * Note that the callback will be called immediately, to be able to process stuff that + * might already be in the buffer. */ typedef bool (*pa_srbchannel_cb_t)(pa_srbchannel *sr, void *userdata); void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata); _______________________________________________ pulseaudio-commits mailing list pulseaudio-commits@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/pulseaudio-commits