Revision: 15750
Author:   [email protected]
Date:     Thu Jul 18 06:42:04 2013
Log:      Fix data race in SamplingCircularQueue

This change fixes data race described in the bug by adding Acquire_Load to SamplingCircularQueue::StartDequeue and Acquire_Store to SamplingCircularQueue::Enqueue.

Also the queue implementation imposed a constraint on the records it stored: the first AtomicWord in each record was a marker. For that purpose TickSampleEventRecord had filter field of type int. This approach is error prone, e.g. on x64 sizeof(AtomicWord) is 8 while sizeof(int) is 4. Moreover the queue needs such marker only at the beginning of chunk. I changed the queue so that it stores the marker explicitly as the first Cell in chunk and removed the filter field.

BUG=251218
[email protected], [email protected]

Review URL: https://codereview.chromium.org/19642002
http://code.google.com/p/v8/source/detail?r=15750

Modified:
 /branches/bleeding_edge/src/circular-queue-inl.h
 /branches/bleeding_edge/src/circular-queue.cc
 /branches/bleeding_edge/src/circular-queue.h
 /branches/bleeding_edge/src/cpu-profiler.h
 /branches/bleeding_edge/test/cctest/test-circular-queue.cc

=======================================
--- /branches/bleeding_edge/src/circular-queue-inl.h Mon Sep 12 03:24:18 2011 +++ /branches/bleeding_edge/src/circular-queue-inl.h Thu Jul 18 06:42:04 2013
@@ -35,7 +35,16 @@


 void* SamplingCircularQueue::Enqueue() {
-  WrapPositionIfNeeded(&producer_pos_->enqueue_pos);
+  if (producer_pos_->enqueue_pos == producer_pos_->next_chunk_pos) {
+    if (producer_pos_->enqueue_pos == buffer_ + buffer_size_) {
+      producer_pos_->next_chunk_pos = buffer_;
+      producer_pos_->enqueue_pos = buffer_;
+    }
+    Acquire_Store(producer_pos_->next_chunk_pos, kEnqueueStarted);
+    // Skip marker.
+    producer_pos_->enqueue_pos += 1;
+    producer_pos_->next_chunk_pos += chunk_size_;
+  }
   void* result = producer_pos_->enqueue_pos;
   producer_pos_->enqueue_pos += record_size_;
   return result;
@@ -44,7 +53,7 @@

 void SamplingCircularQueue::WrapPositionIfNeeded(
     SamplingCircularQueue::Cell** pos) {
-  if (**pos == kEnd) *pos = buffer_;
+  if (*pos == buffer_ + buffer_size_) *pos = buffer_;
 }


=======================================
--- /branches/bleeding_edge/src/circular-queue.cc       Thu Mar  7 01:12:48 2013
+++ /branches/bleeding_edge/src/circular-queue.cc       Thu Jul 18 06:42:04 2013
@@ -33,26 +33,22 @@
 namespace internal {


-SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
- int desired_chunk_size_in_bytes,
+SamplingCircularQueue::SamplingCircularQueue(size_t record_size_in_bytes,
+ size_t desired_chunk_size_in_bytes,
                                              int buffer_size_in_chunks)
     : record_size_(record_size_in_bytes / sizeof(Cell)),
chunk_size_in_bytes_(desired_chunk_size_in_bytes / record_size_in_bytes *
-                        record_size_in_bytes),
+                        record_size_in_bytes + sizeof(Cell)),
       chunk_size_(chunk_size_in_bytes_ / sizeof(Cell)),
       buffer_size_(chunk_size_ * buffer_size_in_chunks),
-      // The distance ensures that producer and consumer never step on
-      // each other's chunks and helps eviction of produced data from
-      // the CPU cache (having that chunk size is bigger than the cache.)
-      producer_consumer_distance_(2 * chunk_size_),
-      buffer_(NewArray<Cell>(buffer_size_ + 1)) {
+      buffer_(NewArray<Cell>(buffer_size_)) {
+  ASSERT(record_size_ * sizeof(Cell) == record_size_in_bytes);
+  ASSERT(chunk_size_ * sizeof(Cell) == chunk_size_in_bytes_);
   ASSERT(buffer_size_in_chunks > 2);
-  // Clean up the whole buffer to avoid encountering a random kEnd
-  // while enqueuing.
-  for (int i = 0; i < buffer_size_; ++i) {
+  // Mark all chunks as clear.
+  for (int i = 0; i < buffer_size_; i += chunk_size_) {
     buffer_[i] = kClear;
   }
-  buffer_[buffer_size_] = kEnd;

   // Layout producer and consumer position pointers each on their own
   // cache lines to avoid cache lines thrashing due to simultaneous
@@ -67,6 +63,7 @@

   producer_pos_ = reinterpret_cast<ProducerPosition*>(
       RoundUp(positions_, kProcessorCacheLineSize));
+  producer_pos_->next_chunk_pos = buffer_;
   producer_pos_->enqueue_pos = buffer_;

   consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
@@ -74,7 +71,11 @@
   ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
          positions_ + positions_size);
   consumer_pos_->dequeue_chunk_pos = buffer_;
- consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
+  // The distance ensures that producer and consumer never step on
+  // each other's chunks and helps eviction of produced data from
+  // the CPU cache (having that chunk size is bigger than the cache.)
+  const int producer_consumer_distance = (2 * chunk_size_);
+ consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance;
   consumer_pos_->dequeue_pos = NULL;
 }

@@ -89,9 +90,11 @@
   if (consumer_pos_->dequeue_pos != NULL) {
     return consumer_pos_->dequeue_pos;
   } else {
-    if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
-      consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
- consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
+    if (Acquire_Load(consumer_pos_->dequeue_chunk_poll_pos) != kClear) {
+      // Skip marker.
+      consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos + 1;
+      consumer_pos_->dequeue_end_pos =
+          consumer_pos_->dequeue_chunk_pos + chunk_size_;
       return consumer_pos_->dequeue_pos;
     } else {
       return NULL;
=======================================
--- /branches/bleeding_edge/src/circular-queue.h        Thu Mar  7 01:12:48 2013
+++ /branches/bleeding_edge/src/circular-queue.h        Thu Jul 18 06:42:04 2013
@@ -45,8 +45,8 @@
 class SamplingCircularQueue {
  public:
   // Executed on the application thread.
-  SamplingCircularQueue(int record_size_in_bytes,
-                        int desired_chunk_size_in_bytes,
+  SamplingCircularQueue(size_t record_size_in_bytes,
+                        size_t desired_chunk_size_in_bytes,
                         int buffer_size_in_chunks);
   ~SamplingCircularQueue();

@@ -67,12 +67,16 @@
   void FlushResidualRecords();

   typedef AtomicWord Cell;
-  // Reserved values for the first cell of a record.
-  static const Cell kClear = 0;  // Marks clean (processed) chunks.
-  static const Cell kEnd = -1;   // Marks the end of the buffer.

  private:
+  // Reserved values for the chunk marker (first Cell in each chunk).
+  enum {
+    kClear,          // Marks clean (processed) chunks.
+    kEnqueueStarted  // Marks chunks where enqueue started.
+  };
+
   struct ProducerPosition {
+    Cell* next_chunk_pos;
     Cell* enqueue_pos;
   };
   struct ConsumerPosition {
@@ -85,10 +89,9 @@
   INLINE(void WrapPositionIfNeeded(Cell** pos));

   const int record_size_;
-  const int chunk_size_in_bytes_;
+  const size_t chunk_size_in_bytes_;
   const int chunk_size_;
   const int buffer_size_;
-  const int producer_consumer_distance_;
   Cell* buffer_;
   byte* positions_;
   ProducerPosition* producer_pos_;
=======================================
--- /branches/bleeding_edge/src/cpu-profiler.h  Sun Jul  7 04:42:30 2013
+++ /branches/bleeding_edge/src/cpu-profiler.h  Thu Jul 18 06:42:04 2013
@@ -110,18 +110,8 @@
   // The parameterless constructor is used when we dequeue data from
   // the ticks buffer.
   TickSampleEventRecord() { }
-  explicit TickSampleEventRecord(unsigned order)
-      : filler(1),
-        order(order) {
-    ASSERT(filler != SamplingCircularQueue::kClear);
-  }
+  explicit TickSampleEventRecord(unsigned order) : order(order) { }

-  // The first machine word of a TickSampleEventRecord must not ever
-  // become equal to SamplingCircularQueue::kClear.  As both order and
-  // TickSample's first field are not reliable in this sense (order
-  // can overflow, TickSample can have all fields reset), we are
-  // forced to use an artificial filler field.
-  int filler;
   unsigned order;
   TickSample sample;

=======================================
--- /branches/bleeding_edge/test/cctest/test-circular-queue.cc Thu Mar 7 03:12:26 2013 +++ /branches/bleeding_edge/test/cctest/test-circular-queue.cc Thu Jul 18 06:42:04 2013
@@ -42,8 +42,6 @@
                             3);

   // Check that we are using non-reserved values.
-  CHECK_NE(SamplingCircularQueue::kClear, 1);
-  CHECK_NE(SamplingCircularQueue::kEnd, 1);
   // Fill up the first chunk.
   CHECK_EQ(NULL, scq.StartDequeue());
   for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
@@ -153,8 +151,6 @@
   scq.FlushResidualRecords();

   // Check that we are using non-reserved values.
-  CHECK_NE(SamplingCircularQueue::kClear, 1);
-  CHECK_NE(SamplingCircularQueue::kEnd, 1);
   ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore);
   ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore);
   ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore);

--
--
v8-dev mailing list
[email protected]
http://groups.google.com/group/v8-dev
--- You received this message because you are subscribed to the Google Groups "v8-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.


Reply via email to