Revision: 17244
Author:   [email protected]
Date:     Wed Oct 16 14:47:20 2013 UTC
Log:      Improve queuing for concurrent OSR.

Specifically, this includes:
- Encapsulating data structure for osr buffer into CyclicBuffer
- Use the new CyclicQueue instead of UnboundedQueue to queue new jobs.
  We can enqueue and dequeue a CyclicQueue on both ends in O(1).
  This allows us to add OSR jobs to the front for lower compile latency.
- Dispose osr buffer by one stale job per GC to avoid leak

[email protected]
BUG=

Review URL: https://codereview.chromium.org/25505002
http://code.google.com/p/v8/source/detail?r=17244

Modified:
 /branches/bleeding_edge/src/heap.cc
 /branches/bleeding_edge/src/optimizing-compiler-thread.cc
 /branches/bleeding_edge/src/optimizing-compiler-thread.h

=======================================
--- /branches/bleeding_edge/src/heap.cc Wed Oct 16 14:33:04 2013 UTC
+++ /branches/bleeding_edge/src/heap.cc Wed Oct 16 14:47:20 2013 UTC
@@ -450,6 +450,10 @@
 #endif  // DEBUG

   store_buffer()->GCPrologue();
+
+  if (FLAG_concurrent_osr) {
+    isolate()->optimizing_compiler_thread()->AgeBufferedOsrJobs();
+  }
 }


=======================================
--- /branches/bleeding_edge/src/optimizing-compiler-thread.cc Mon Oct 14 14:15:22 2013 UTC +++ /branches/bleeding_edge/src/optimizing-compiler-thread.cc Wed Oct 16 14:47:20 2013 UTC
@@ -91,14 +91,22 @@
     }
   }
 }
+
+
+RecompileJob* OptimizingCompilerThread::NextInput() {
+  LockGuard<Mutex> access_input_queue_(&input_queue_mutex_);
+  if (input_queue_length_ == 0) return NULL;
+  RecompileJob* job = input_queue_[InputQueueIndex(0)];
+  ASSERT_NE(NULL, job);
+  input_queue_shift_ = InputQueueIndex(1);
+  input_queue_length_--;
+  return job;
+}


 void OptimizingCompilerThread::CompileNext() {
-  RecompileJob* job = NULL;
-  bool result = input_queue_.Dequeue(&job);
-  USE(result);
-  ASSERT(result);
-  Barrier_AtomicIncrement(&queue_length_, static_cast<Atomic32>(-1));
+  RecompileJob* job = NextInput();
+  ASSERT_NE(NULL, job);

   // The function may have already been optimized by OSR.  Simply continue.
   RecompileJob::Status status = job->OptimizeGraph();
@@ -131,7 +139,7 @@

void OptimizingCompilerThread::FlushInputQueue(bool restore_function_code) {
   RecompileJob* job;
-  while (input_queue_.Dequeue(&job)) {
+  while ((job = NextInput())) {
     // This should not block, since we have one signal on the input queue
     // semaphore corresponding to each element in the input queue.
     input_queue_semaphore_.Wait();
@@ -140,7 +148,6 @@
       DisposeRecompileJob(job, restore_function_code);
     }
   }
-  Release_Store(&queue_length_, static_cast<AtomicWord>(0));
 }


@@ -156,12 +163,12 @@


 void OptimizingCompilerThread::FlushOsrBuffer(bool restore_function_code) {
-  RecompileJob* job;
-  for (int i = 0; i < osr_buffer_size_; i++) {
-    job = osr_buffer_[i];
-    if (job != NULL) DisposeRecompileJob(job, restore_function_code);
+  for (int i = 0; i < osr_buffer_capacity_; i++) {
+    if (osr_buffer_[i] != NULL) {
+      DisposeRecompileJob(osr_buffer_[i], restore_function_code);
+      osr_buffer_[i] = NULL;
+    }
   }
-  osr_cursor_ = 0;
 }


@@ -187,10 +194,9 @@
   stop_semaphore_.Wait();

   if (FLAG_concurrent_recompilation_delay != 0) {
-    // Barrier when loading queue length is not necessary since the write
-    // happens in CompileNext on the same thread.
-    // This is used only for testing.
-    while (NoBarrier_Load(&queue_length_) > 0) CompileNext();
+ // At this point the optimizing compiler thread's event loop has stopped.
+    // There is no need for a mutex when reading input_queue_length_.
+    while (input_queue_length_ > 0) CompileNext();
     InstallOptimizedFunctions();
   } else {
     FlushInputQueue(false);
@@ -239,7 +245,6 @@
 void OptimizingCompilerThread::QueueForOptimization(RecompileJob* job) {
   ASSERT(IsQueueAvailable());
   ASSERT(!IsOptimizerThread());
-  Barrier_AtomicIncrement(&queue_length_, static_cast<Atomic32>(1));
   CompilationInfo* info = job->info();
   if (info->is_osr()) {
     if (FLAG_trace_concurrent_recompilation) {
@@ -247,13 +252,24 @@
       info->closure()->PrintName();
       PrintF(" for concurrent on-stack replacement.\n");
     }
-    AddToOsrBuffer(job);
     osr_attempts_++;
     BackEdgeTable::AddStackCheck(info);
+    AddToOsrBuffer(job);
+    // Add job to the front of the input queue.
+    LockGuard<Mutex> access_input_queue(&input_queue_mutex_);
+    ASSERT_LT(input_queue_length_, input_queue_capacity_);
+    // Move shift_ back by one.
+    input_queue_shift_ = InputQueueIndex(input_queue_capacity_ - 1);
+    input_queue_[InputQueueIndex(0)] = job;
+    input_queue_length_++;
   } else {
     info->closure()->MarkInRecompileQueue();
+    // Add job to the back of the input queue.
+    LockGuard<Mutex> access_input_queue(&input_queue_mutex_);
+    ASSERT_LT(input_queue_length_, input_queue_capacity_);
+    input_queue_[InputQueueIndex(input_queue_length_)] = job;
+    input_queue_length_++;
   }
-  input_queue_.Enqueue(job);
   if (FLAG_block_concurrent_recompilation) {
     blocked_jobs_++;
   } else {
@@ -274,15 +290,14 @@
 RecompileJob* OptimizingCompilerThread::FindReadyOSRCandidate(
     Handle<JSFunction> function, uint32_t osr_pc_offset) {
   ASSERT(!IsOptimizerThread());
-  RecompileJob* result = NULL;
-  for (int i = 0; i < osr_buffer_size_; i++) {
-    result = osr_buffer_[i];
-    if (result == NULL) continue;
-    if (result->IsWaitingForInstall() &&
-        result->info()->HasSameOsrEntry(function, osr_pc_offset)) {
+  for (int i = 0; i < osr_buffer_capacity_; i++) {
+    RecompileJob* current = osr_buffer_[i];
+    if (current != NULL &&
+        current->IsWaitingForInstall() &&
+        current->info()->HasSameOsrEntry(function, osr_pc_offset)) {
       osr_hits_++;
       osr_buffer_[i] = NULL;
-      return result;
+      return current;
     }
   }
   return NULL;
@@ -292,10 +307,11 @@
 bool OptimizingCompilerThread::IsQueuedForOSR(Handle<JSFunction> function,
                                               uint32_t osr_pc_offset) {
   ASSERT(!IsOptimizerThread());
-  for (int i = 0; i < osr_buffer_size_; i++) {
-    if (osr_buffer_[i] != NULL &&
-        osr_buffer_[i]->info()->HasSameOsrEntry(function, osr_pc_offset)) {
-      return !osr_buffer_[i]->IsWaitingForInstall();
+  for (int i = 0; i < osr_buffer_capacity_; i++) {
+    RecompileJob* current = osr_buffer_[i];
+    if (current != NULL &&
+        current->info()->HasSameOsrEntry(function, osr_pc_offset)) {
+      return !current->IsWaitingForInstall();
     }
   }
   return false;
@@ -304,10 +320,10 @@

 bool OptimizingCompilerThread::IsQueuedForOSR(JSFunction* function) {
   ASSERT(!IsOptimizerThread());
-  for (int i = 0; i < osr_buffer_size_; i++) {
-    if (osr_buffer_[i] != NULL &&
-        *osr_buffer_[i]->info()->closure() == function) {
-      return !osr_buffer_[i]->IsWaitingForInstall();
+  for (int i = 0; i < osr_buffer_capacity_; i++) {
+    RecompileJob* current = osr_buffer_[i];
+    if (current != NULL && *current->info()->closure() == function) {
+      return !current->IsWaitingForInstall();
     }
   }
   return false;
@@ -316,27 +332,27 @@

 void OptimizingCompilerThread::AddToOsrBuffer(RecompileJob* job) {
   ASSERT(!IsOptimizerThread());
- // Store into next empty slot or replace next stale OSR job that's waiting
-  // in vain.  Dispose in the latter case.
-  RecompileJob* stale;
+  // Find the next slot that is empty or has a stale job.
   while (true) {
-    stale = osr_buffer_[osr_cursor_];
-    if (stale == NULL) break;
-    if (stale->IsWaitingForInstall()) {
-      CompilationInfo* info = stale->info();
-      if (FLAG_trace_osr) {
-        PrintF("[COSR - Discarded ");
-        info->closure()->PrintName();
-        PrintF(", AST id %d]\n", info->osr_ast_id().ToInt());
-      }
-      DisposeRecompileJob(stale, false);
-      break;
+    RecompileJob* stale = osr_buffer_[osr_buffer_cursor_];
+    if (stale == NULL || stale->IsWaitingForInstall()) break;
+    osr_buffer_cursor_ = (osr_buffer_cursor_ + 1) % osr_buffer_capacity_;
+  }
+
+  // Add to found slot and dispose the evicted job.
+  RecompileJob* evicted = osr_buffer_[osr_buffer_cursor_];
+  if (evicted != NULL) {
+    ASSERT(evicted->IsWaitingForInstall());
+    CompilationInfo* info = evicted->info();
+    if (FLAG_trace_osr) {
+      PrintF("[COSR - Discarded ");
+      info->closure()->PrintName();
+      PrintF(", AST id %d]\n", info->osr_ast_id().ToInt());
     }
-    AdvanceOsrCursor();
+    DisposeRecompileJob(evicted, false);
   }
-
-  osr_buffer_[osr_cursor_] = job;
-  AdvanceOsrCursor();
+  osr_buffer_[osr_buffer_cursor_] = job;
+  osr_buffer_cursor_ = (osr_buffer_cursor_ + 1) % osr_buffer_capacity_;
 }


=======================================
--- /branches/bleeding_edge/src/optimizing-compiler-thread.h Mon Oct 14 14:15:22 2013 UTC +++ /branches/bleeding_edge/src/optimizing-compiler-thread.h Wed Oct 16 14:47:20 2013 UTC
@@ -53,21 +53,29 @@
       isolate_(isolate),
       stop_semaphore_(0),
       input_queue_semaphore_(0),
-      osr_cursor_(0),
+      input_queue_capacity_(FLAG_concurrent_recompilation_queue_length),
+      input_queue_length_(0),
+      input_queue_shift_(0),
+      osr_buffer_capacity_(FLAG_concurrent_recompilation_queue_length + 4),
+      osr_buffer_cursor_(0),
       osr_hits_(0),
       osr_attempts_(0),
       blocked_jobs_(0) {
     NoBarrier_Store(&stop_thread_, static_cast<AtomicWord>(CONTINUE));
-    NoBarrier_Store(&queue_length_, static_cast<AtomicWord>(0));
-    if (FLAG_concurrent_osr) {
-      osr_buffer_size_ = FLAG_concurrent_recompilation_queue_length + 4;
-      osr_buffer_ = NewArray<RecompileJob*>(osr_buffer_size_);
-      for (int i = 0; i < osr_buffer_size_; i++) osr_buffer_[i] = NULL;
-    }
+    input_queue_ = NewArray<RecompileJob*>(input_queue_capacity_);
+    osr_buffer_ = NewArray<RecompileJob*>(osr_buffer_capacity_);
+    // Mark OSR buffer slots as empty.
+    for (int i = 0; i < osr_buffer_capacity_; i++) osr_buffer_[i] = NULL;
   }

   ~OptimizingCompilerThread() {
-    if (FLAG_concurrent_osr) DeleteArray(osr_buffer_);
+    ASSERT_EQ(0, input_queue_length_);
+#ifdef DEBUG
+    for (int i = 0; i < osr_buffer_capacity_; i++) {
+      CHECK_EQ(NULL, osr_buffer_[i]);
+    }
+#endif
+    DeleteArray(osr_buffer_);
   }

   void Run();
@@ -83,17 +91,15 @@
   bool IsQueuedForOSR(JSFunction* function);

   inline bool IsQueueAvailable() {
-    // We don't need a barrier since we have a data dependency right
-    // after.
-    Atomic32 current_length = NoBarrier_Load(&queue_length_);
+    LockGuard<Mutex> access_input_queue(&input_queue_mutex_);
+    return input_queue_length_ < input_queue_capacity_;
+  }

-    // This can be queried only from the execution thread.
-    ASSERT(!IsOptimizerThread());
-    // Since only the execution thread increments queue_length_ and
-    // only one thread can run inside an Isolate at one time, a direct
-    // doesn't introduce a race -- queue_length_ may decreased in
-    // meantime, but not increased.
-    return (current_length < FLAG_concurrent_recompilation_queue_length);
+  inline void AgeBufferedOsrJobs() {
+ // Advance cursor of the cyclic buffer to next empty slot or stale OSR job.
+    // Dispose said OSR job in the latter case.  Calling this on every GC
+    // should make sure that we do not hold onto stale jobs indefinitely.
+    AddToOsrBuffer(NULL);
   }

 #ifdef DEBUG
@@ -107,12 +113,17 @@
   void FlushOutputQueue(bool restore_function_code);
   void FlushOsrBuffer(bool restore_function_code);
   void CompileNext();
+  RecompileJob* NextInput();

// Add a recompilation task for OSR to the cyclic buffer, awaiting OSR entry.
   // Tasks evicted from the cyclic buffer are discarded.
   void AddToOsrBuffer(RecompileJob* compiler);
-  void AdvanceOsrCursor() {
-    osr_cursor_ = (osr_cursor_ + 1) % osr_buffer_size_;
+
+  inline int InputQueueIndex(int i) {
+    int result = (i + input_queue_shift_) % input_queue_capacity_;
+    ASSERT_LE(0, result);
+    ASSERT_LT(result, input_queue_capacity_);
+    return result;
   }

 #ifdef DEBUG
@@ -124,20 +135,22 @@
   Semaphore stop_semaphore_;
   Semaphore input_queue_semaphore_;

-  // Queue of incoming recompilation tasks (including OSR).
-  UnboundQueue<RecompileJob*> input_queue_;
+  // Circular queue of incoming recompilation tasks (including OSR).
+  RecompileJob** input_queue_;
+  int input_queue_capacity_;
+  int input_queue_length_;
+  int input_queue_shift_;
+  Mutex input_queue_mutex_;
+
   // Queue of recompilation tasks ready to be installed (excluding OSR).
   UnboundQueue<RecompileJob*> output_queue_;
+
   // Cyclic buffer of recompilation tasks for OSR.
-  // TODO(yangguo): This may keep zombie tasks indefinitely, holding on to
-  //                a lot of memory.  Fix this.
   RecompileJob** osr_buffer_;
-  // Cursor for the cyclic buffer.
-  int osr_cursor_;
-  int osr_buffer_size_;
+  int osr_buffer_capacity_;
+  int osr_buffer_cursor_;

   volatile AtomicWord stop_thread_;
-  volatile Atomic32 queue_length_;
   TimeDelta time_spent_compiling_;
   TimeDelta time_spent_total_;

--
--
v8-dev mailing list
[email protected]
http://groups.google.com/group/v8-dev
--- You received this message because you are subscribed to the Google Groups "v8-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to