[PATCH] D42573: [clangd] The new threading implementation
This revision was automatically updated to reflect the committed changes. Closed by commit rL324356: [clangd] The new threading implementation (authored by ibiryukov, committed by ). Herald added a subscriber: llvm-commits. Repository: rL LLVM https://reviews.llvm.org/D42573 Files: clang-tools-extra/trunk/clangd/CMakeLists.txt clang-tools-extra/trunk/clangd/ClangdServer.h clang-tools-extra/trunk/clangd/ClangdUnit.h clang-tools-extra/trunk/clangd/ClangdUnitStore.cpp clang-tools-extra/trunk/clangd/ClangdUnitStore.h clang-tools-extra/trunk/clangd/TUScheduler.cpp clang-tools-extra/trunk/clangd/TUScheduler.h clang-tools-extra/trunk/clangd/Threading.cpp clang-tools-extra/trunk/clangd/Threading.h clang-tools-extra/trunk/unittests/clangd/CMakeLists.txt clang-tools-extra/trunk/unittests/clangd/ThreadingTests.cpp Index: clang-tools-extra/trunk/unittests/clangd/CMakeLists.txt === --- clang-tools-extra/trunk/unittests/clangd/CMakeLists.txt +++ clang-tools-extra/trunk/unittests/clangd/CMakeLists.txt @@ -21,6 +21,7 @@ JSONExprTests.cpp URITests.cpp TestFS.cpp + ThreadingTests.cpp TraceTests.cpp TUSchedulerTests.cpp SourceCodeTests.cpp Index: clang-tools-extra/trunk/unittests/clangd/ThreadingTests.cpp === --- clang-tools-extra/trunk/unittests/clangd/ThreadingTests.cpp +++ clang-tools-extra/trunk/unittests/clangd/ThreadingTests.cpp @@ -0,0 +1,61 @@ +//===-- ThreadingTests.cpp --*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===--===// + +#include "Threading.h" +#include "gtest/gtest.h" +#include + +namespace clang { +namespace clangd { +class ThreadingTest : public ::testing::Test {}; + +TEST_F(ThreadingTest, TaskRunner) { + const int TasksCnt = 100; + const int IncrementsPerTask = 1000; + + std::mutex Mutex; + int Counter(0); /* GUARDED_BY(Mutex) */ + { +AsyncTaskRunner Tasks; +auto scheduleIncrements = [&]() { + for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) { +Tasks.runAsync([&Counter, &Mutex]() { + for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) { +std::lock_guard Lock(Mutex); +++Counter; + } +}); + } +}; + +{ + // Make sure runAsync is not running tasks synchronously on the same + // thread by locking the Mutex used for increments. + std::lock_guard Lock(Mutex); + scheduleIncrements(); +} + +Tasks.waitForAll(); +{ + std::lock_guard Lock(Mutex); + ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask); +} + +{ + std::lock_guard Lock(Mutex); + Counter = 0; + scheduleIncrements(); +} + } + // Check that destructor has waited for tasks to finish. + std::lock_guard Lock(Mutex); + ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask); +} +} // namespace clangd +} // namespace clang Index: clang-tools-extra/trunk/clangd/TUScheduler.h === --- clang-tools-extra/trunk/clangd/TUScheduler.h +++ clang-tools-extra/trunk/clangd/TUScheduler.h @@ -11,9 +11,9 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "Function.h" #include "Threading.h" +#include "llvm/ADT/StringMap.h" namespace clang { namespace clangd { @@ -42,6 +42,7 @@ public: TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback); + ~TUScheduler(); /// Returns estimated memory usage for each of the currently open files. /// The order of results is unspecified. @@ -81,11 +82,17 @@ UniqueFunction)> Action); private: - const ParseInputs &getInputs(PathRef File); + /// This class stores per-file data in the Files map. + struct FileData; - llvm::StringMap CachedInputs; - CppFileCollection Files; - ThreadPool Threads; + const bool StorePreamblesInMemory; + const std::shared_ptr PCHOps; + const ASTParsedCallback ASTCallback; + Semaphore Barrier; + llvm::StringMap> Files; + // None when running tasks synchronously and non-None when running tasks + // asynchronously. + llvm::Optional Tasks; }; } // namespace clangd } // namespace clang Index: clang-tools-extra/trunk/clangd/ClangdServer.h === --- clang-tools-extra/trunk/clangd/ClangdServer.h +++ clang-tools-extra/trunk/clangd/ClangdServer.h @@ -11,7 +11,6 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov updated this revision to Diff 133003. ilya-biryukov added a comment. - Addressed the last review comment Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 Files: clangd/CMakeLists.txt clangd/ClangdServer.h clangd/ClangdUnit.h clangd/ClangdUnitStore.cpp clangd/ClangdUnitStore.h clangd/TUScheduler.cpp clangd/TUScheduler.h clangd/Threading.cpp clangd/Threading.h unittests/clangd/CMakeLists.txt unittests/clangd/ThreadingTests.cpp Index: unittests/clangd/ThreadingTests.cpp === --- /dev/null +++ unittests/clangd/ThreadingTests.cpp @@ -0,0 +1,61 @@ +//===-- ThreadingTests.cpp --*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===--===// + +#include "Threading.h" +#include "gtest/gtest.h" +#include + +namespace clang { +namespace clangd { +class ThreadingTest : public ::testing::Test {}; + +TEST_F(ThreadingTest, TaskRunner) { + const int TasksCnt = 100; + const int IncrementsPerTask = 1000; + + std::mutex Mutex; + int Counter(0); /* GUARDED_BY(Mutex) */ + { +AsyncTaskRunner Tasks; +auto scheduleIncrements = [&]() { + for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) { +Tasks.runAsync([&Counter, &Mutex]() { + for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) { +std::lock_guard Lock(Mutex); +++Counter; + } +}); + } +}; + +{ + // Make sure runAsync is not running tasks synchronously on the same + // thread by locking the Mutex used for increments. + std::lock_guard Lock(Mutex); + scheduleIncrements(); +} + +Tasks.waitForAll(); +{ + std::lock_guard Lock(Mutex); + ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask); +} + +{ + std::lock_guard Lock(Mutex); + Counter = 0; + scheduleIncrements(); +} + } + // Check that destructor has waited for tasks to finish. + std::lock_guard Lock(Mutex); + ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask); +} +} // namespace clangd +} // namespace clang Index: unittests/clangd/CMakeLists.txt === --- unittests/clangd/CMakeLists.txt +++ unittests/clangd/CMakeLists.txt @@ -21,6 +21,7 @@ JSONExprTests.cpp URITests.cpp TestFS.cpp + ThreadingTests.cpp TraceTests.cpp TUSchedulerTests.cpp SourceCodeTests.cpp Index: clangd/Threading.h === --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,65 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} + void cancel() { +assert(WasCancelled && "the object was moved"); +WasCancelled->store(true); + } -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); + bool isCancelled() const { +assert(WasCancelled && "the object was moved"); +return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} +private: + std::shared_ptr> WasCancelled; +}; -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV
[PATCH] D42573: [clangd] The new threading implementation
sammccall accepted this revision. sammccall added a comment. This revision is now accepted and ready to land. This is really great. Just one test nit. Comment at: unittests/clangd/ThreadingTests.cpp:34 + +scheduleIncrements(); +Tasks.waitForAll(); The current test passes if `runAsync` is synchronous. I'd suggest scheduling while holding a mutex that prevents the tasks from getting to the point of incrementing the counter, (and maybe checking that the counter is 0 after scheduling for clarity). Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 ___ cfe-commits mailing list cfe-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov updated this revision to Diff 132967. ilya-biryukov marked 21 inline comments as done. ilya-biryukov added a comment. - Renamed File to AST. - Introduced startTask(). - Moved small methods of ASTWorkerHandle to have inline definitions. - Removed constructor of FileData. - Replaced find() with []. - Removed RunSync, store optional instead. - Added a test for AsyncTasksRunner. - Documented approach for cancelling updates. - Addressed other review comments. Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 Files: clangd/CMakeLists.txt clangd/ClangdServer.h clangd/ClangdUnit.h clangd/ClangdUnitStore.cpp clangd/ClangdUnitStore.h clangd/TUScheduler.cpp clangd/TUScheduler.h clangd/Threading.cpp clangd/Threading.h unittests/clangd/CMakeLists.txt unittests/clangd/ThreadingTests.cpp Index: unittests/clangd/ThreadingTests.cpp === --- /dev/null +++ unittests/clangd/ThreadingTests.cpp @@ -0,0 +1,45 @@ +//===-- ThreadingTests.cpp --*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===--===// + +#include "Threading.h" +#include "gtest/gtest.h" +#include + +namespace clang { +namespace clangd { +class ThreadingTest : public ::testing::Test {}; + +TEST_F(ThreadingTest, TaskRunner) { + const int TasksCnt = 100; + const int IncrementsPerTask = 1000; + + std::atomic Counter(0); + { +AsyncTaskRunner Tasks; +auto scheduleIncrements = [&]() { + for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) { +Tasks.runAsync([&Counter]() { + for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) +++Counter; +}); + } +}; + +scheduleIncrements(); +Tasks.waitForAll(); +ASSERT_EQ(Counter.load(), TasksCnt * IncrementsPerTask); + +Counter = 0; +scheduleIncrements(); + } + // Check that destructor has waited for tasks to finish. + ASSERT_EQ(Counter.load(), TasksCnt * IncrementsPerTask); +} +} // namespace clangd +} // namespace clang Index: unittests/clangd/CMakeLists.txt === --- unittests/clangd/CMakeLists.txt +++ unittests/clangd/CMakeLists.txt @@ -21,6 +21,7 @@ JSONExprTests.cpp URITests.cpp TestFS.cpp + ThreadingTests.cpp TraceTests.cpp TUSchedulerTests.cpp SourceCodeTests.cpp Index: clangd/Threading.h === --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,65 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} + void cancel() { +assert(WasCancelled && "the object was moved"); +WasCancelled->store(true); + } -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); + bool isCancelled() const { +assert(WasCancelled && "the object was moved"); +return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} +private: + std::shared_ptr> WasCancelled; +}; -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); - } +/// Limits the number of threads that ca
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov added inline comments. Comment at: clangd/ASTWorker.cpp:102 + // not waste time on it. + LastUpdateCF->cancel(); +} sammccall wrote: > ilya-biryukov wrote: > > sammccall wrote: > > > This strategy has some upsides: > > > - we eventually get diagnostics for the latest version > > > - we don't begin building an AST that we know can never be read > > > - we don't "wait" when there's something useful to do > > > - it's simple to implement (apart from cancellation) > > > And downsides: > > > - diagnostics are built on the first keystroke > > > - indexing blocks interactive actions, because we don't (and can't > > > reasonably) respect cancellation before calling OnUpdated > > > - it requires CancellationFlag even though our actions aren't cancelable > > > > > > What's the goal here - is this a strategy to keep? Or a placeholder? Or > > > trying to maximize compatibility with the previous code? > > Trying to maximize the compatibility with existing code. There are > > certainly other strategies to schedule updates. > > I'm perfectly happy to explore those, but would try to keep it out of this > > patch, it's touching quite a lot of things already. And we should probably > > remove the callback from update in order to make implementing the timeouts > > simpler > This sounds fine - can you add a little bit of this rationale and things we > might want to change? Maybe at the end of the header comment? > > As it stands, it would be very easy for us to land this, move onto other > things for a month, and lose the reasoning why the strategy is this way. Added a section about cancellation to the comments in the `.cpp` file. Comment at: clangd/TUScheduler.cpp:13 +// ASTWorker that manages the AST is owned both by the processing thread and the +// TUScheduler. Therefore the destructor of ASTWorker to not block, as it is +// always run after the processing loop is finished. When the TUScheduler gives sammccall wrote: > I can't really follow these two sentences. I think we may want to give a > little motivation (it's not clear why the destructor needs to not block) > before introducing ASTWorkerHandle. e.g. > > // The TUScheduler should discard an ASTWorker when remove() is called, but > its thread may > // be busy and we don't want to block. So the workers are accessed via an > ASTWorkerHandle. > // Destroying the handle signals the worker to exit its run loop, gives up > shared ownership of the worker, and detaches the thread. Added your comment. Comment at: clangd/TUScheduler.cpp:82 + const std::shared_ptr File; + // Inputs, corresponding to the current state of File. + ParseInputs FileInputs; sammccall wrote: > ah, this is my confusion - i mixed up "current state of File" with "current > state of the file", which are almost opposites! > Do you think it's confusing to call the `File` member `AST`? All the > callsites actually seem to make sense with that name. SG, renamed. Comment at: clangd/TUScheduler.cpp:186 +auto AST = File->getAST().get(); +AST->runUnderLock([&](ParsedAST *AST) { + if (!AST) { sammccall wrote: > why under lock? I thought this is the only thread that can access? > > (if this locking is a no-op pending cleanup, fixme) That's the only way to access the AST with the current `CppFile` interface, I'll do a cleanup with the follow-up patch. Added a comment Comment at: clangd/TUScheduler.cpp:217 +std::size_t ASTWorker::getUsedBytes() const { + std::lock_guard Lock(Mutex); + return File->getUsedBytes(); sammccall wrote: > I don't think this works - other accesses of File aren't guarded by the mutex? > (And File is documented as only being used by the worker thread) > > One option is to cache this in a member after each rebuild(), and put that > member behind the mutex. It won't catch ASTs that grow over time, though :-( > A fundamental question is what this function should do if called while the > AST is being rebuilt on the worker thread. It work because `CppFile` is thread-safe at the moment. We don't even need the lock. I'll do the associated cleanup with the follow-up patch. > One option is to cache this in a member after each rebuild(), and put that > member behind the mutex. It won't catch ASTs that grow over time, though :-( That was the plan for the cleanup of `CppFile`. We could update this member after each read, too. This would give us a good estimate after AST grows. >A fundamental question is what this function should do if called while the AST >is being rebuilt on the worker thread. We only report estimated memory usage and it's only accurate when there are no active requests. So the answer is: we report memory used by the previous AST while doing the rebuild and memory used by the new AST after rebuild is finished. This seems goo
[PATCH] D42573: [clangd] The new threading implementation
sammccall added a comment. Very nice! Thanks for adding the docs to TUScheduler implementation, makes a big difference. Rest is almost all readability/comment bits. Substantive stuff is: - getUsedBytes() looks racy - I'm not sure we're choosing the right preamble My understanding is the threading-related bits of CppFile (locking, and deferRebuild etc) are obsolete after this patch. It's fine not to delete them here (I guess there could be some fallout), but maybe you want to add a comment in this patch marking them as obsolete? Testing: I think this is mostly covered by the existing TUScheduler tests. I'd suggest adding a unit test to AsyncTaskRunner though: e.g. start a bunch of threads while holding a mutex that prevents them from making progress (to verify actually async), then have them increment a counter and verify that the counter is N after waitForAll() returns. Comment at: clangd/ASTWorker.cpp:102 + // not waste time on it. + LastUpdateCF->cancel(); +} ilya-biryukov wrote: > sammccall wrote: > > This strategy has some upsides: > > - we eventually get diagnostics for the latest version > > - we don't begin building an AST that we know can never be read > > - we don't "wait" when there's something useful to do > > - it's simple to implement (apart from cancellation) > > And downsides: > > - diagnostics are built on the first keystroke > > - indexing blocks interactive actions, because we don't (and can't > > reasonably) respect cancellation before calling OnUpdated > > - it requires CancellationFlag even though our actions aren't cancelable > > > > What's the goal here - is this a strategy to keep? Or a placeholder? Or > > trying to maximize compatibility with the previous code? > Trying to maximize the compatibility with existing code. There are certainly > other strategies to schedule updates. > I'm perfectly happy to explore those, but would try to keep it out of this > patch, it's touching quite a lot of things already. And we should probably > remove the callback from update in order to make implementing the timeouts > simpler This sounds fine - can you add a little bit of this rationale and things we might want to change? Maybe at the end of the header comment? As it stands, it would be very easy for us to land this, move onto other things for a month, and lose the reasoning why the strategy is this way. Comment at: clangd/TUScheduler.cpp:38 + // Wait for all in-flight tasks to finish. + Tasks.waitForAll(); +} ilya-biryukov wrote: > sammccall wrote: > > destructor will do this > It's much safer to call it explicitly to not depend no the order of fields in > the class. Hmm, not sure it's much safer than putting a comment on a member, but up to you. If we're not using the destructor-blocking behavior, remove the destructor or replace with an assert? Comment at: clangd/TUScheduler.cpp:9 +//===--===// +// For each file, managed by TUScheduler, we store a single ASTWorker that +// manages an AST for that file. All operations that modify or read the AST are nit: store -> create? storage is a bit more complicated, but doesn't need to clutter the opening sentence. Comment at: clangd/TUScheduler.cpp:11 +// manages an AST for that file. All operations that modify or read the AST are +// run on a separate dedicated thread asynchronously in FIFO order. The +// ASTWorker that manages the AST is owned both by the processing thread and the nit: blank line after "...FIFO order"? separating "this is the hierarchy" from "this is how we manage lifetimes". In fact, I'd consider moving the whole lifetime explanation down and merging it with the ASTWorker class comment. It's more detail than strategy. Comment at: clangd/TUScheduler.cpp:12 +// run on a separate dedicated thread asynchronously in FIFO order. The +// ASTWorker that manages the AST is owned both by the processing thread and the +// TUScheduler. Therefore the destructor of ASTWorker to not block, as it is is shared by? Comment at: clangd/TUScheduler.cpp:13 +// ASTWorker that manages the AST is owned both by the processing thread and the +// TUScheduler. Therefore the destructor of ASTWorker to not block, as it is +// always run after the processing loop is finished. When the TUScheduler gives I can't really follow these two sentences. I think we may want to give a little motivation (it's not clear why the destructor needs to not block) before introducing ASTWorkerHandle. e.g. // The TUScheduler should discard an ASTWorker when remove() is called, but its thread may // be busy and we don't want to block. So the workers are accessed via an ASTWorkerHandle. // Destroying the handle signals the worker to exit its run
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov updated this revision to Diff 132821. ilya-biryukov marked 13 inline comments as done. ilya-biryukov added a comment. - Removed ASTWorker files, moved all the code to TUScheduler.cpp - Renamed setDone to stop - Added a comment to TUScheduler.cpp - Addressed other review comments Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 Files: clangd/CMakeLists.txt clangd/ClangdServer.h clangd/ClangdUnitStore.cpp clangd/ClangdUnitStore.h clangd/TUScheduler.cpp clangd/TUScheduler.h clangd/Threading.cpp clangd/Threading.h Index: clangd/Threading.h === --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,67 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} + void cancel() { +assert(WasCancelled && "the object was moved"); +WasCancelled->store(true); + } -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); + bool isCancelled() const { +assert(WasCancelled && "the object was moved"); +return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} +private: + std::shared_ptr> WasCancelled; +}; -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); - } +/// Limits the number of threads that can acquire the lock at the same time. +class Semaphore { +public: + Semaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +/// Runs tasks on separate (detached) threads and wait for all tasks to finish. +/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they +/// all complete on destruction. +class AsyncTaskRunner { +public: + /// Destructor waits for all pending tasks to finish. + ~AsyncTaskRunner(); + + void waitForAll(); + void runAsync(UniqueFunction Action); private: - bool RunSynchronously; - mutable std::mutex Mutex; - /// We run some tasks on separate threads(parsing, CppFile cleanup). - /// These threads looks into RequestQueue to find requests to handle and - /// terminate when Done is set to true. - std::vector Workers; - /// Setting Done to true will make the worker threads terminate. - bool Done = false; - /// A queue of requests. - std::deque, Context>> RequestQueue; - /// Condition variable to wake up worker threads. - std::condition_variable RequestCV; + std::mutex Mutex; + std::condition_variable TasksReachedZero; + std::size_t InFlightTasks = 0; }; } // namespace clangd } // namespace clang Index: clangd/Threading.cpp === --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) -: RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { -// Don't start the worker thread if we're running synchronously -return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { -
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov marked an inline comment as done. ilya-biryukov added inline comments. Comment at: clangd/ASTWorker.cpp:1 +//===--- ASTWorker.cpp *-C++-*-===// +// sammccall wrote: > This file could really use some high-level comments about the scheduling > strategy, throughout I've added the docs to TUScheduler.cpp Comment at: clangd/ASTWorker.cpp:102 + // not waste time on it. + LastUpdateCF->cancel(); +} sammccall wrote: > This strategy has some upsides: > - we eventually get diagnostics for the latest version > - we don't begin building an AST that we know can never be read > - we don't "wait" when there's something useful to do > - it's simple to implement (apart from cancellation) > And downsides: > - diagnostics are built on the first keystroke > - indexing blocks interactive actions, because we don't (and can't > reasonably) respect cancellation before calling OnUpdated > - it requires CancellationFlag even though our actions aren't cancelable > > What's the goal here - is this a strategy to keep? Or a placeholder? Or > trying to maximize compatibility with the previous code? Trying to maximize the compatibility with existing code. There are certainly other strategies to schedule updates. I'm perfectly happy to explore those, but would try to keep it out of this patch, it's touching quite a lot of things already. And we should probably remove the callback from update in order to make implementing the timeouts simpler Comment at: clangd/ASTWorker.h:45 + + std::shared_ptr getPossiblyStalePreamble() const; + std::size_t getUsedBytes() const; sammccall wrote: > can we reorder/group these by purpose/sequence? > e.g (no need for the comments, just illustrative). > > //lifecycle > run(); > setDone(); > > //write > update() > > //read > runWithAST() > getPossiblyStalePreamble() > > //misc > getUsedBytes() I used a different grouping: ``` //lifecycle run(); setDone(); //async write update() // async read runWithAST() // sync reads getPossiblyStalePreamble() getUsedBytes() ``` Does that make sense? Or do you feel read/write vs sync/async is a better distinction? Comment at: clangd/ASTWorker.h:54 +private: + using RequestWithCtx = std::pair, Context>; + sammccall wrote: > I think this might actually be easier to read with just `using Request = > UniqueFunction` and then spelling out `pair`. up to > you though. > I'd rather keep it as is. I specifically came up with this typedef because I got annoyed of typing `pair`. Happy to change it if you feel that's totally unreadable, of course. Comment at: clangd/ASTWorker.h:59 + // File and FileInputs are only accessed on the processing thread from run(). + // FIXME(ibiryukov): group CppFile and FileInputs into a separate class. + const std::shared_ptr File; sammccall wrote: > nit: "combine ... into one class"? > > I'd hope we won't end up with ASTWorker, CppFile, FileInputs, *and* another > thing? Removed the FIXME Comment at: clangd/TUScheduler.cpp:38 + // Wait for all in-flight tasks to finish. + Tasks.waitForAll(); +} sammccall wrote: > destructor will do this It's much safer to call it explicitly to not depend no the order of fields in the class. Comment at: clangd/TUScheduler.h:77 + /// In destructor, FileData signals to ASTWorker for file that it can exit. + struct FileData { +FileData(ParseInputs Inputs, std::shared_ptr Worker); sammccall wrote: > This would be superseded by ASTWorkerHandle, right? It's still there, stores inputs and `ASTWorkerHandle`. Inputs in the `ASTWorker` correspond to the latest **processed** update, we don't expose them in the API. These inputs correspond to the latest inputs of the file (they are used by `runWithPreamble` to provide proper inputs). Comment at: clangd/Threading.h:67 + + void waitForAll(); + void runAsync(UniqueFunction Action); sammccall wrote: > any need to expose this separately from the destructor? It's a useful helper to have. We can use it to rewrite tests when removing `future` returned from `addDocument`/`removeDocument`. Also see my other comment about calling it in destructor of `TUScheduler`. Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 ___ cfe-commits mailing list cfe-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov updated this revision to Diff 132803. ilya-biryukov added a comment. - Changed interface of ASTWorker so that it runs the processing loop itself. Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 Files: clangd/ASTWorker.cpp clangd/ASTWorker.h clangd/CMakeLists.txt clangd/ClangdServer.h clangd/ClangdUnitStore.cpp clangd/ClangdUnitStore.h clangd/TUScheduler.cpp clangd/TUScheduler.h clangd/Threading.cpp clangd/Threading.h Index: clangd/Threading.h === --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,65 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} + void cancel() { +assert(WasCancelled && "the object was moved"); +WasCancelled->store(true); + } -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); + bool isCancelled() const { +assert(WasCancelled && "the object was moved"); +return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} +private: + std::shared_ptr> WasCancelled; +}; -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); - } +/// Limits the number of threads that can acquire the lock at the same time. +class Semaphore { +public: + Semaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +/// Allows to run tasks on separate (detached) threads and wait for all tasks to +/// finish. +class AsyncTaskRunner { +public: + ~AsyncTaskRunner(); + + void waitForAll(); + void runAsync(UniqueFunction Action); private: - bool RunSynchronously; - mutable std::mutex Mutex; - /// We run some tasks on separate threads(parsing, CppFile cleanup). - /// These threads looks into RequestQueue to find requests to handle and - /// terminate when Done is set to true. - std::vector Workers; - /// Setting Done to true will make the worker threads terminate. - bool Done = false; - /// A queue of requests. - std::deque, Context>> RequestQueue; - /// Condition variable to wake up worker threads. - std::condition_variable RequestCV; + std::mutex Mutex; + std::condition_variable TasksReachedZero; + std::size_t InFlightTasks = 0; }; } // namespace clangd } // namespace clang Index: clangd/Threading.cpp === --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) -: RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { -// Don't start the worker thread if we're running synchronously -return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { -Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { -UniqueFunction Request; -Context Ctx; - -// Pick request from the queue -{ -
[PATCH] D42573: [clangd] The new threading implementation
sammccall added a comment. Really coming together! Comment at: clangd/ASTWorker.cpp:1 +//===--- ASTWorker.cpp *-C++-*-===// +// This file could really use some high-level comments about the scheduling strategy, throughout Comment at: clangd/ASTWorker.cpp:20 + StartedRunning(false), Done(false) { + if (RunSync) +return; ?! Comment at: clangd/ASTWorker.cpp:102 + // not waste time on it. + LastUpdateCF->cancel(); +} This strategy has some upsides: - we eventually get diagnostics for the latest version - we don't begin building an AST that we know can never be read - we don't "wait" when there's something useful to do - it's simple to implement (apart from cancellation) And downsides: - diagnostics are built on the first keystroke - indexing blocks interactive actions, because we don't (and can't reasonably) respect cancellation before calling OnUpdated - it requires CancellationFlag even though our actions aren't cancelable What's the goal here - is this a strategy to keep? Or a placeholder? Or trying to maximize compatibility with the previous code? Comment at: clangd/ASTWorker.h:1 +//===--- ASTWorker.h --*-C++-*-===// +// ASTWorker is an implementation detail of TUScheduler. Can we move it to TUScheduler.cpp, instead of exposing it, and remove this header? Comment at: clangd/ASTWorker.h:19 +namespace clangd { +struct InputsAndAST { + const ParseInputs &Inputs; This seems like an "implementation header" - nobody should depend on ASTWorker AIUI. So InputsAndAST shouldn't really go here, as it's part of the TUScheduler public interface. Comment at: clangd/ASTWorker.h:24 + +struct InputsAndPreamble { + const ParseInputs &Inputs; (InputsAndPreamble is entirely unused here, I think) Comment at: clangd/ASTWorker.h:29 + +/// Owns one instance of the AST, schedules updated and reads of it. +/// Also responsible for building and providing access to the preamble. updated -> updates Comment at: clangd/ASTWorker.h:38 + + // Must be called exactly once on processing thread. Will return after + // setDone() is called on a separate thread and all pending requests are As discussed offline, this lifecycle is a bit complicated :-) ASTWorker basically manages a thread, and it'd be nice if we could align the object lifetime and thread lifetime more closely. The difficulty seems to be that we want to TUScheduler to be able to discard ASTWorkers instantly, even though we can't interrupt them. After offline brainstorming, we came up with some sort of "ASTWorkerHandle" which: - owns the thread that run() is called on - detaches that thread on destruction - has a weak_ptr or shared_ptr to the worker itself, which it exposes We still have run() and setDone(), but they're private details. Comment at: clangd/ASTWorker.h:43 + /// Signal that run() should finish processing pending requests and exit. + void setDone(); + stop()? (or stopSoon/requestStop to be more explicit) Comment at: clangd/ASTWorker.h:45 + + std::shared_ptr getPossiblyStalePreamble() const; + std::size_t getUsedBytes() const; can we reorder/group these by purpose/sequence? e.g (no need for the comments, just illustrative). //lifecycle run(); setDone(); //write update() //read runWithAST() getPossiblyStalePreamble() //misc getUsedBytes() Comment at: clangd/ASTWorker.h:54 +private: + using RequestWithCtx = std::pair, Context>; + I think this might actually be easier to read with just `using Request = UniqueFunction` and then spelling out `pair`. up to you though. Comment at: clangd/ASTWorker.h:59 + // File and FileInputs are only accessed on the processing thread from run(). + // FIXME(ibiryukov): group CppFile and FileInputs into a separate class. + const std::shared_ptr File; nit: "combine ... into one class"? I'd hope we won't end up with ASTWorker, CppFile, FileInputs, *and* another thing? Comment at: clangd/ASTWorker.h:69 + std::queue Requests; /* GUARDED_BY(Mutex) */ + // Only set when last request is an update. + llvm::Optional LastUpdateCF; /* GUARDED_BY(Mutex) */ Why? e.g. `This allows us to cancel an update that was never read, if a subsequent update comes in` Comment at: clangd/TUScheduler.cpp:38 + // Wait for all in-flight tasks to finish. + Tasks.waitForAll(); +} destructor will do this Comment at: clangd/TUScheduler.h:77 + /// In destructor, FileData signals to
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov updated this revision to Diff 132570. ilya-biryukov marked an inline comment as done. ilya-biryukov added a comment. - Removed redundant includes Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D42573 Files: clangd/ASTWorker.cpp clangd/ASTWorker.h clangd/CMakeLists.txt clangd/ClangdServer.h clangd/ClangdUnitStore.cpp clangd/ClangdUnitStore.h clangd/TUScheduler.cpp clangd/TUScheduler.h clangd/Threading.cpp clangd/Threading.h Index: clangd/Threading.h === --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,65 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} + void cancel() { +assert(WasCancelled && "the object was moved"); +WasCancelled->store(true); + } -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); + bool isCancelled() const { +assert(WasCancelled && "the object was moved"); +return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { -if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; -} +private: + std::shared_ptr> WasCancelled; +}; -{ - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); -} -RequestCV.notify_one(); - } +/// Limits the number of threads that can acquire the lock at the same time. +class Semaphore { +public: + Semaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +/// Allows to run tasks on separate (detached) threads and wait for all tasks to +/// finish. +class AsyncTaskRunner { +public: + ~AsyncTaskRunner(); + + void waitForAll(); + void runAsync(UniqueFunction Action); private: - bool RunSynchronously; - mutable std::mutex Mutex; - /// We run some tasks on separate threads(parsing, CppFile cleanup). - /// These threads looks into RequestQueue to find requests to handle and - /// terminate when Done is set to true. - std::vector Workers; - /// Setting Done to true will make the worker threads terminate. - bool Done = false; - /// A queue of requests. - std::deque, Context>> RequestQueue; - /// Condition variable to wake up worker threads. - std::condition_variable RequestCV; + std::mutex Mutex; + std::condition_variable TasksReachedZero; + std::size_t InFlightTasks = 0; }; } // namespace clangd } // namespace clang Index: clangd/Threading.cpp === --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) -: RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { -// Don't start the worker thread if we're running synchronously -return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { -Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { -UniqueFunction Request; -Context Ctx; - -// Pick request from the queue -{ -
[PATCH] D42573: [clangd] The new threading implementation
ilya-biryukov marked 3 inline comments as done. ilya-biryukov added inline comments. Comment at: clangd/ClangdServer.h:140 +/// the working threads as soon as an idle thread is available. +///- scheduleOnQueue will schedule to a specific queue. Requests from the +/// same queue are not processed concurrently. Requests in each queue are sammccall wrote: > As discussed offline, Queue's semantics are very similar to a thread, except: > - cooperatively scheduled: Queues do not yield until idle > - limited in parallelism by the size of the threadpool > > In the interests of keeping things simple and familiar, I think we should > start out by using `std::thread`. > We can use a semaphore to limit the parallelism (I'm fine with doing this in > the first iteration, but urge you to consider leaving it out until we see > actual problems with using the CPU as our limit). This should give both > properties above (if the Queue only releases the semaphore when idle). > If we still have performance problems we may need to switch to a multiplexing > version, though I have a hard time imagining it (e.g. even on windows, thread > spawn should be <1us, and memory usage is trivial compared to anything that > touches an AST). Done exactly that. There's a new abstraction in the patch that manages creating threads and waiting for them to finish for us. It seems pretty simple, please take a look and let me know what you think. Comment at: clangd/ClangdServer.h:141 +///- scheduleOnQueue will schedule to a specific queue. Requests from the +/// same queue are not processed concurrently. Requests in each queue are +/// executed in the FIFO order. sammccall wrote: > Similarly, the free requests look a lot like standalone threads, with a few > enhancements that are implementable but also possible YAGNI. > - running code complete first is roughly[1] equivalent to elevating the > thread's priority (no portability shim in llvm yet, but it's pretty easy). I > don't think this needs to be in the first patch. > - limiting parallelism can be done with semaphores. In fact we can easily > express something like "up to 18 queued tasks, up to 20 free tasks, up to 20 > total", which nice for latency. > > [1] I see both advantages and disadvantages, happy to discuss more! We're not setting the priorities in the first version, but certainly agree we should add that later. And we're using a simple semaphore to limit the number of actively running threads. Comment at: clangd/ClangdServer.h:143 +/// executed in the FIFO order. class ThreadPool { public: sammccall wrote: > So overall here, I think that we can drop `ThreadPool` without much impact on > the design. > `Queue` would stay, as a wrapper around `std::thread` that lets you add tasks > to its runloop. It would be owned by FileData, and shutdown would be > triggered by its destructor. > > The advantage is one less layer here to understand, and an attractive > nuisance to tweak over time. > The downsides I see: > - threading is no longer abstracted away, so changes to it are less isolated > I think it's likely that we get away with the model staying simple. If it > gets complicated, we'll want to add the abstraction back in. But this isn't > java, we can abstract when we need it :-) > - RunSynchronously escapes into Scheduler. This is for the worse I think, > but only slightly. I tried fiddling with the idea and ended up abandoning the `RequestQueue` in favor of the `std::queue` with explicit locks. There's only one place where we queue requests now (processing the updates/reads of the ASTs), everything else is managed by creating separate `std::thread`s. Comment at: clangd/TUScheduler.cpp:23 +return; + Worker = std::thread([&]() { +while (true) { sammccall wrote: > I tend to find thread bodies more readable as a member function, up to you Moved it as a member function and made a (questionable) decision that clients of the class are responsible for running that function on a separate thread themselves. It allows to reuse the abstraction that runs the threads and waits for them to finish (also used when spawning threads for completion, and similar). It shouldn't be too bad, given that we have just a single client. Even though the interface is not particularly nice. Happy to chat about it. Comment at: clangd/TUScheduler.cpp:258 +std::lock_guard BarrierLock(Barrier); +// XXX: what if preamble got built by this time? +// if (!Preamble) sammccall wrote: > this seems like where shared_ptr might help us out if we're willing to deal > with it. > If we captured a shared_ptr here, then we'd want to call > getPossiblyStalePreamble inside rather than outside the lambda. > > So this would look something like: > - FileASTThread