chewbranca opened a new pull request, #5213:
URL: https://github.com/apache/couchdb/pull/5213

   # Couch Stats Resource Tracker
   
   This is a rework of PR: https://github.com/apache/couchdb/pull/4812
   
   ## Overview and Motivation
   
   Couch Stats Resource Tracker (CSRT) is a new engine for tracking the amount 
of
   resource operations induced by processes within CouchDB's Erlang VM. This PR
   specifically targets coordinator processes and RPC workers induced by said
   coordinator processes, but the underlying stats collection framework is 
designed
   to be usable by anything consuming resources in CouchDB, such that we can 
extend
   this out to background jobs like indexing, compaction, and replication. The 
long
   term stretch goal is to be able to account for all system level activity 
induced
   by CouchDB, but the practical goal is to be able to understand where and why
   most of the resources in the system are being utilized, at a request/job 
level
   granularity.
   
   This PR is primarily motivated by the current lack of visibility into
   identifying _what_ operations are inducing large quantities of work. We have
   node/cluster level visibility into the amounts of IO operations being 
induced,
   but we lack the ability to identify what request/operation induced that
   workload.
   
   This is especially problematic when dealing with large fold operations that 
do
   local filtering (eg filtered `_changes` feeds or filtered `_find` queries)
   because these operations lack the normal coordinator induced rate limiting 
that
   results naturally from funneling individual results back to the coordinator 
node
   to sort and process. In the local filtering case, we essentially do a direct
   fold over the shard and invoke a filter function on that doc to find a 
matching
   result, but in the event the docs fail to match this is essentially a local
   tight loop over the shard that when run in parallel can easily dominate IO
   operations. The `_find` logic has been extended to generate `report`'s to log
   and identify the heavy hitter requests, especially the degenerative `_find`
   queries that do a full database scan and find zero results to return.
   
   ## Approach in this PR and differences from mango reports
   
   This PR takes the idea of the mango reports and creates a unified framework 
for
   tracking these statistics in real time allowing for global querying of node 
and
   cluster level resource usage in the live processes. This PR reflexes an
   opinionated deviation from the approach in the mango reports, and instead of
   introducing new stats tracking, it proposes the core approach of:
   
   > Any stat worth tracking for reporting system usage is clearly worth 
tracking
   > properly as a proper CouchDB metric.
   
   So instead of embedding new stats like in the mango find reports, this system
   hooks into the `couch_stats:increment_counter` logic to piggy back off of the
   stats being collected in real time by the process doing the work, and then
   funnels those updates into an ets table keyed off of the local process, and
   joined at a cluster level by the coordinator ref, allowing for cluster level
   aggregation of individual http requests, live. These tracked stats are 
forwarded
   back to the coordinator process by way of embedding in the `rexi` RPC 
messages
   such that long running find queries and other heavy weight processes can be
   identified and tracked.
   
   We then log a report detailing the total resources induced by the http 
request
   so we can retroactively identify which requests are consuming the most
   resources. The reporting by default is configured to only happen at the
   coordinator level, but if you're able to handle the log volume it can be 
enabled
   for all workers too. Down the road a nice feature would be supporting writing
   reports directly to ClickHouse, some binary format, or even just a terse text
   format to allow for increased report volumes; currently high throughput 
report
   generation for coordinator and all rpc workers on high Q databases is
   substantial in data volume, but there's much room for optimization given the
   verbose nature of the current reports and how well they gzip up.
   
   ## New metrics introduced
   
   As a result of the above mentioned philosophy of properly tracking the stats
   worth tracking, this PR introduces a handful of new stats, predominantly in 
one
   of two forms, listed below. I've also included sample screenshots of the new
   metrics plotted during a 30 minute benchmark run that started with an empty
   cluster and aggressively created new databases/indexes/documents while 
growing
   worker count progressively during that run. All http traffic was ceased 
after 30
   minutes, and you can clearly see the phase change in operations when that
   happened.
   
   1) Core stats for previously missing metrics
   
   eg new stats for counting `couch_btree` reads and writes on kp/kv nodes
   
   <img width="1422" alt="Screenshot 2024-08-23 at 5 53 42 PM" 
src="https://github.com/user-attachments/assets/523ca54f-ad1c-4a04-8b45-de1c2af158e8";>
   
   
   2) RPC work induced
   
   The idea here is that we should be tracking 1-3 things for all induced RPC 
work:
   
     1) RPC worker spawned type
     2) RPC worker docs processed
     3) RPC worker docs returned
   
   Item 1) is the primary item for core RPC work, this allows us to see the 
volume
   of RPC workers spawned over time per node. Items 2) and 3) are specific to
   aggregate operations, with 3) specific to aggregate operations that can 
perform
   local filtering.
   
   The idea is that we can a) see the types of work being induced on nodes over
   time, observe how much documents are being processed by the aggregate worker
   operations, and then b) directly observe major discrepancies between docs
   processed and docs returned, as that's indicative of a missing index or 
poorly
   designed workflows.
   
   Here's a full cluster view of all nodes rpc traffic:
   
   <img width="1425" alt="Screenshot 2024-08-23 at 5 53 23 PM" 
src="https://github.com/user-attachments/assets/79fc7f6e-143b-4a63-a518-c43297683733";>
   
   In the case of our benchmark above, the workload was evenly distributed so 
all
   nodes performed similarly. This is a lot of data, but can easily be 
aggregated
   by node or type to identify non-homogeneous workloads. Here's a simpler view
   showing per node RPC workloads:
   
   <img width="1423" alt="Screenshot 2024-08-23 at 5 53 32 PM" 
src="https://github.com/user-attachments/assets/f93ed420-66a1-4ce4-a86b-aec8632b6d7e";>
   
    
   ## Tracking table for accumulating and querying metrics
   
   The central tracking table is a global ets table utilizing 
`read_concurrency`,
   `write_concurrency`, and `distributed_counters`, which results in an
   impressively performant global table in which all processes update their 
local
   stats. Writes are isolated to the process doing the work, so there is no
   contention of parallel writes to the same key. Aggregations are performed
   against the full ets table, but updates are constrained to a given key are
   constrained to the corresponding worker process.
   
   ### Previous design that failed to scale
   
   In previous PR I attempted to utilize a singular `gen_server` for monitoring 
the
   processes and performing some cleanup operations. This was optimized down to
   _only_ being a dedicated server doing `handle_call({monitor, Pid},..) ->
   monitor_pid(), {reply, ok, State}). handle_info({DOWN, ..., REF, ...}) ->
   ets:delete(maps:get(Ref, RefToPid))` and that was insufficient to handle the
   load. I tried various approaches but I was able to melt a singular 
`gen_server`
   easily. It's necessary to have a process monitor outside of the local process
   because coordinator/worker processes can and will get killed mid operation,
   therefore `after` clause/function based approaches are insufficient.
   
   Even with that minimal of a workload, I was able to melt the `gen_server`:
   
   <img width="569" alt="Screenshot 2024-04-04 at 3 45 15 PM" 
src="https://github.com/user-attachments/assets/22d64327-f4ee-4ef8-a71c-48c3126c8099";>
   
   and that's with it _really_ doing a minimum workload:
   
   <img width="736" alt="Screenshot 2024-04-09 at 5 19 37 PM" 
src="https://github.com/user-attachments/assets/d70f99c5-a984-4b35-aa80-aaee34d07e80";>
   
   
   This was my final attempt to make a singular `gen_server` architecture, but 
with
   80 core nodes I'm now fully convinced it's no longer viable to do singular
   `gen_server` systems in hot code paths and we must take more distributed
   approaches, either by way of sharding the servers or fully distributed.
   
   
   ### Distributed tracker appraoch in CSRT v2
   
   In the case of CSRT, I engaged a fully distributed approach that spawns a
   dedicated monitor process when a CSRT context is created by a coordinator or
   worker. This monitor process handles the lifetime of a given entry in the ets
   table so that we delete the worker entry when the worker is done. This 
dedicated
   monitor process also generates the report afterwards. Switching to the 
dedicated
   monitor approach eliminated the scaling issues I encountered, and the current
   architecture is able to readily handle max throughput load.
   
   The CSRT context is created in the coordinator process directly in
   `chttpd:process_request_int`, and in the worker process directly in the
   spawned process's initialization of `rexi_server:init_p`. The context is
   basically just `erlang:put(?CONTEXT_MARKER, {self(), make_ref()})` which is 
then
   the ets key used for tracking the coordinator process while it handles the 
given
   request.
   
   The `make_ref()` ensures that the coordinator processes that are reused in 
the
   Mochiweb worker pool distinguish between individual http requests. More
   generally, this allows a long lived process to isolate subsets of its own 
work.
   This is essential if we want to add the ability to funnel the CSRT context
   through IOQ/couch_file/couch_mrview/etc to accumulate 
   
   ### A note on PidRef vs nonce for identification
   
   Currently we don't funnel the coordinator's PidRef across the wire and 
instead
   rely on the `nonce` as a global aggregator key, and then the coordinator
   aggregations happen directly when the RPC responses are received and the 
deltas
   are extracted. We could add this fairly easily in `rexi:cast_ref`, but I do
   wonder if we'd be better off skipping the ref entirely and instead using
   `{self(), Nonce}` as the key given we already funnel it around. That won't 
work
   for background operations, so we'd need a `make_ref()` fallback for tracking
   those jobs, but I do like the idea of consolidating down to using the `nonce`
   given it's already the unique reference to the request inducing the workload,
   and we already send it over the wire ubiquitously for all coordinator/worker
   operations through `rexi:cast_ref`.
   
   ### Context creation and lifecycle
   
   We create the initial context in
   `chttpd:process_request_int`/`rexi_server:init_p` for the 
coordinator/workers,
   respectively, and then we progressively fill in the details for things like
   dbname/username/handler_fun so that we can track those data points naturally 
as
   they arise in the request codepath, for example adding the chttp_db handler 
when
   entering those functions, or setting the username after 
`chttpd_auth:authorize`
   returns. Similarly, in `fabric_rpc` we piggy back off of
   `fabric_rpc:set_io_priority` called by every callback function to cleanly set
   the dbname involved in the RPC request. We could also extend this to track 
the
   ddoc/index involved, if any.
   
   The idea is to make it easy for the local process to update its global 
tracked
   state at the appropriate points in the codebase so we can iteratively extend 
out
   the tracking throughout the codebase. Background indexing and compaction are
   apt targets for wiring in CSRT and we could even extend the `/_active_tasks`
   jobs to include status about resource usage.
   
   When we initiate the context, for workers, coordinators, or any future job
   types, we spawn a dedicated tracking monitor process that sits by idly until 
it
   gets a `stop` message from normal lifecycle termination, or it gets a `DOWN`
   message from the process doing work. In either case, the tracker process 
cleans
   up the corresponding ets table entry (the only form of two processes writing 
to
   the same key in the ets tracker, but handed off with no interweaving) and 
then
   conditionally generates a `report` to log the work induced.
   
   The default, when CSRT is enabled, is to log a report for the coordinator
   process totalling the tracked work induced by the RPC workers to fulfill the
   given request. It's configurable to also log workers, and there's some
   rudimentary filtering capabilities to allow for logging of only a specific 
rpc
   worker type, but this could be improved upon considerably. In general, the
   volume of these logs can be sizable, for example a singular http view request
   against a Q=64 database on a healthy cluster induces 196 RPC workers, all
   inducing their own volume of work and potentially logging a report. A compact
   form or additional filtering capabilities to log interesting reports would be
   beneficial.
   
   ## Status and next steps
   
   Overall I'm happy with the performance of the core tracking system, the 
modern
   ETS improvements with distributed counters on top of atomic increments are
   _really_ impressive! I had the test suite fully passing recently but I've 
done a
   decent bit of cleanup and restructuring recently so I haven't checked out a 
full
   CI run in a minute, I'll see how it looks on this PR and address anything 
that
   comes up. I'd like to start getting a review here and see what folks think, I
   think the structure of the code is in a good place to discuss and get 
feedback
   on. A few next steps to do:
   
   - [ ] add more tests
   - [ ] add Dialyzer specs `couch_stats_resource_tracker.erl` at the very least
   - [ ] fix `time` tracking, the `tnow()` is not a `positive_integer()`
   - [ ] add some standard query functions using ets:fun2ms
     - The parse transform makes these more challenging to handle dynamically, 
so
       let's add a handful of standard performant functions, eg:
       - sort_by({dbname, shard, user}, ioq_calls)
       - sort_by(user, ioq_calls)
       - sort_by({user, request_type}, docs_processed)
       - sort_by({request_type, user}, docs_processed)
       - sort_by(user, get_kv_nodes)
       - etc
   - [ ] think about additional filtering on logs:
     - skip report fields with zero values?
     - set minimum thresholds for reports?
     - allow skipping of some coordinator reports? eg welcome handler
   - [ ] design on metrics namespacing from `rexi_server:init_p`
     - eg `should_increment([M, F, spawned])` vs
       - `should_increment([rexi_rpc, M, F, spawned]`
       - or `should_increment([couchdb, rpc_worker, M, F, spawned]`
   
   ## Sample report
   
   Filtered changes http request with `?include_docs=true` and JS filter:
   
   ```
   [report] 2024-09-02T23:46:08.175264Z node1@127.0.0.1 <0.1012.0> -------- 
[csrt-pid-usage-lifetime changes_returned=1528 db_open=7 dbname="foo" 
docs_read=1528 from="null" get_kp_nodes=14 get_kv_nodes=180 ioq_calls=3254 
js_filter=1528 js_filter_error=0 js_filtered_docs=1528 mfa="null" 
nonce="bce5d2ce6e" path="/foo/_changes" 
pid_ref="<0.965.0>:#Ref<0.2614010930.743440385.194505>" rows_read=1528 
started_at=-576460745696 type="coordinator:GET:fun 
chttpd_db:handle_changes_req/2" updated_at=-576460745696 username="adm"]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to