Thank you Dmitry! On Sunday, January 12, 2014 2:41:05 AM UTC-8, Dmitry Vyukov wrote: > > On Fri, Jan 10, 2014 at 3:34 AM, Rajiv Kurian > <[email protected]<javascript:>> > wrote: > > > > > > On Thursday, January 9, 2014 6:39:43 AM UTC-8, Dmitry Vyukov wrote: > >> > >> On Mon, Dec 23, 2013 at 12:22 PM, Rajiv Kurian <[email protected]> > wrote: > >> > This seems like a common pattern in multi-threaded applications: > >> > > >> > 1) Single thread reads of the network, allocating buffers as > necessary. > >> > 2) It sends these buffers along with other info to worker threads > that > >> > actually do real work. A worker thread processes the event entry and > >> > then > >> > frees the buffers from step (1). Typically a ring buffer or some > other > >> > kind > >> > of queue is used with an entry that resembles something like this: > >> > > >> > struct event_entry { > >> > char* buffer; > >> > int length; > >> > // Other stuff. > >> > } > >> > > >> > The problem with this pattern is that memory allocated on one thread > is > >> > freed on another. Besides the constant allocation and frees, this > also > >> > suffers from the allocation and free actually happening on different > >> > threads > >> > causing contention. > >> > > >> > I need to use a pointer to a buffer instead of an inlined array since > >> > the > >> > size of data is for each entry is variable and unknown. Currently > this > >> > is > >> > what I am planning to do to handle this: > >> > > >> > 1) Producer maintains a pool of buffers of different sizes. I could > also > >> > just use a memory allocator like jemalloc which does pooling behind > the > >> > scenes. > >> > 2) Producer picks an appropriately sized buffer from the pool for an > >> > incoming request. It's easy to pick a size especially if the protocol > is > >> > length prefixed. > >> > 3) Once there are enough bytes to form a complete entry (based on our > >> > protocol) the producer puts a pointer to this buffer on a ring buffer > >> > entry > >> > and publishes it. > >> > 4) The consumer picks an entry off the ring buffer and synchronously > >> > processes it. If it needs the buffer entry beyond the point of > initial > >> > processing it copies it (this is rare for me). It marks the ring > buffer > >> > entry as processed. > >> > 5) This ties to step (2). Whenever the producer doesn't have the > right > >> > sized > >> > buffer in it's pool it checks all the ring buffer entries already > marked > >> > processed by the consumer in this cycle of the ring buffer. It does > so > >> > by > >> > checking the sequence number of the consumer/worker. It claims all of > >> > these > >> > buffers as processed and puts them back in its pool. This logic needs > to > >> > be > >> > run at least once per cycle of the ring buffer (it could be triggered > >> > early > >> > because there was a shortage of buffers) otherwise we will end up > >> > reusing > >> > buffers that are still being processed. If after a reclamation it > still > >> > cannot find a right sized buffer it just allocates one and adds it to > >> > the > >> > pool (should be rare in steady state). > >> > > >> > Any comments or obvious faults with my logic? What do you guys use to > >> > exchange buffers of dynamic sizes between two threads? I am trying to > >> > avoid > >> > cross thread allocate and free. In fact I am trying to avoid allocate > >> > and > >> > free in steady state all together. > >> > >> > >> Hi! > >> > >> Your scheme sounds reasonable. > >> > >> > >> As far as I understand it's related to your other question about SPMC > >> queue. > > > > It is somewhat related but still a common enough pattern in most > networked + > > threaded applications. > > > In general case you can just use a sifficently good multi-threaded > allocator, e.g. tcmalloc. > However, if you want more performance then what you described make > perfect sense. Since you have only 1 producer, you can use the > classical lock-free stack as transfer queue for empty buffers. > Producer can grab the whole queue with a single XCHG(stack, NULL) > operation, this avoids any issues with ABA and safe memory > reclamation. > > > >> So if you use a queue like this: > >> > >> > http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue > >> and have an upper bound on buffer size and ready to waste some memory, > >> then you can embed the buffer directly into each element of the queue. > >> This queue is actually bi-directional, currently consumers transfer > >> "empty" elements back to producer. But there is no reason they can not > >> transfer useful payload back to producer. So in the extreme case you > >> can have just: > >> > >> #define MAX_MSG_SIZE (64<<10) > > > > I think this is a great solution where the MAX_MSG_SIZE is reasonable > and > > there is not much variance between requests. I am accepting requests to > > process images/videos. The variance is massive and the maximum size is > way > > too big (100s of MB). Another problem is that the network buffers get > filled > > in asynchronously. So if client-1 connects and wants to send 1 MB of > data, I > > can get a slot in a ring buffer and start filling in the buffer. In the > > meantime another client could connect and start writing data. Now > client-2 > > could connect and actually send it's data faster than the first one, but > we > > have to wait on the first one to publish the second one. The first one > could > > actually just become unresponsive and could need to be killed by a timer > or > > something. In general a slow client that starts sending a request before > > other faster clients could stall the pipeline. A pointer to the buffer, > > while definitely slower than an inline one avoids these issues. > >> > >> > >> #define QUEUE_SIZE (1<<10) > >> > >> struct element { > >> char buf[MAX_MSG_SIZE]; > >> ... other data > >> }; > >> > >> queue<QUEUE_SIZE, element> q; > >> > >> This consumes 64M of memory persistently. But it is as simple and as > >> fast as you can get. > >> > >> You will need to "split" enqueue and dequeue functions into 2 parts: > >> first waits when the next element becomes ready for > >> production/consumption, and the second part "commits" > >> production/consumption. > >> Producer workflow: > >> - wait till the next element becomes ready for production (it's most > >> likely already ready, because the queue if FIFO) > >> - fill in the element (the buffer is already there, right in the > element) > >> - commit production (make the element ready for consumption) > >> - repeat > >> Consumer workflow is symmetric: > >> - wait till the next element becomes ready for consumption > >> - process the element (don't need to copy the buffer, read it right in > >> place) > >> - commit consumption (make the element ready for subsequent production) > > > > Right this is what I am doing besides the extra logic to put buffers > back > > into the pool when I detect a need for buffers. > > > > -- > > > > --- > > You received this message because you are subscribed to the Google > Groups > > "Scalable Synchronization Algorithms" group. > > To unsubscribe from this group and stop receiving emails from it, send > an > > email to [email protected] <javascript:>. > > To view this discussion on the web visit > > > https://groups.google.com/d/msgid/lock-free/70736031-1eb0-4c83-ad48-afde9006e973%40googlegroups.com. > > > > > > For more options, visit https://groups.google.com/groups/opt_out. > > > > -- > Dmitry Vyukov > > All about lockfree/waitfree algorithms, multicore, scalability, > parallel computing and related topics: > http://www.1024cores.net >
-- --- You received this message because you are subscribed to the Google Groups "Scalable Synchronization Algorithms" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/lock-free/8a151806-3846-4e37-b969-0ddd31f46544%40googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.
