chewbranca opened a new pull request, #4812: URL: https://github.com/apache/couchdb/pull/4812
# Couch Stats Resource Tracker Overview We currently lack visibility into the types of operations that are inducing system resource usage in CouchDB. We can see in the IOQ stats the raw quantities of IO operations being induced, at categories of essentially "normal database reads", "database writes", "view index writes", "internal replication", and "compaction". The problem is connecting the dots between the data in IOQ and the corresponding database operation types inducing the work, and going a step further, connecting that with individual requests to identify the actual operations inducing all of the work. In particular, I believe this is especially pronounced in aggregate database read operations that perform a fold over the `id_tree` or `seq_tree` of the underlying `.couch` file. We do not currently instrument `couch_btree.erl` with `couch_stats` to track operations, so we do not have any data to correlate `couch_btree:fold*` operations with the volume of IOQ traffic flowing through the system. A basic first step would be to add counters for the amount of btree fold reads performed, however, that would still be insufficient as it would not allow us to distinguish between `_all_docs`, `_view`, `_find`, and `_changes` unless we track which type of fold operation is being induced. Ideally, we should be able to correlate the overall IOQ throughput with the different types of API operations being induced. Beyond that, it would be fantastic if we can extend CouchDB's introspection capabilities such that we can track the types of operations being performed at the user/db/request level, allowing users to understand what operations and requests are utilizing system resources. This writeup proposes an approach to alleviate these introspection blindspots by collecting additional cluster level metrics at the RPC operation level providing context about the overall volume of requests, the magnitude of data processed, and the response payload magnitude. By extending the global stats collection logic we can allow for process local tracking of stats and build a system for tracking and exposing that data at the RPC level, the node local level, and at an HTTP API level. I think it's essential we target cluster level workloads in addition to request level so we can correlate resource usage with individual requests. Given how variable API operations are in CouchDB (eg single doc read reads one doc from 3 shard replicas, whereas a `_find` query failing to find an index will do a full database scan _every time_), I think it's important to be able to easily see what types of operations are consuming cluster resources, and then readily identify the heavy hitting requests. This data can be tracked in records in ETS tables allowing for realtime introspection of active resource consumption across all operations. We can focus on monotonically increasing counters for tracking core operations and avoid an assortment of complexity from negative values and dealing with histograms. We can leave histograms to other stacks by tracking start/etc/stop timestamps. By only focusing on monotonically increasing values as a function of resource consumption we can take a delta between any two timestamps and get a precise rate of consumption for this process/coordinator during the given time window. This also allows us to iteratively track stats between any start and stop point. The simple case is from process start to process finish, but anywhere any between is a valid delta and we can distinguish between live snapshots and final full process workload. By having RPC worker processes store data to the node local ETS tables in addition to sending resouce usage deltas in rexi replies we can track node local RPC worker resource usage in addition to node local coordinator processes and how much work they've induced across the cluster. This provides both insight into the active heavy operations in the cluster but also provides users with easy access to identify heavy hitter requests for further optimization. By focusing on only tracking monotonically increasing counters about core resource usage we can utilize a delta based system allowing for resource tracking on the desired stat. Essentially this allows us to create `recon:proc_window` but `couchdb:proc_window` and identify the most active processes by account or docs reads or IOQ calls or rows read or JS filter invocations or by data volume or whatever else we choose to track. The delta based approach allows us to decouple from any particular time interval based collection approach and instead focus on what provides the most insight in any particular context. This allows at the base case to collect stats time start to time end, but also provides intermediate values at whtaever intervals/rates we desire. This also allows us to use the same stats collection logic for both streaming and non streaming API endpoints as we can handle full deltas or partials. It's key we support delta operations as we need to see long running processes live, not when they're completed potentially hours later, so delta updates are essential to live tracking on long running operations. (In practice, long running `_find` query RPC workers can induce work for potentially hours, so it's inappropriate to wait until the worker completes to stream the resource usage, as otherwise it'll be hours behind when it actually happened). With the idea of using the existing `couch_stats:increment_counter` logic to handle resource tracking, we can extend that `increment_counter` logic so that in addition to the normal global tracking of the desired metric, we also begin tracking a process local set of statistics allowing us so isolate resources utilized to individual requests and even rpc workers. This is straightforward to accomplish from within `couch_stats:increment_counter` as we have the exact stats being recorded but also we're still in the process that invoked those stats, so this draft PR also has the local process performing operations update an ets table entry for its own usage levels. This results in all worker processes concurrently updating ETS tables in realtime as a function of resource usage. Unlike our normal stats collection where we have many processes writing to a small number of keys creating lock contention, for process local stats each process only writes to a singular key corresponding to the process ID, so there is no overlapping stats collection across keys, allowing us to take full advantage of ETS `write_concurrency`. Each process updates in realtime its process usage stats as the normal `couch_stats:increment_counter` code paths are exercised. We utilize a timestamp based approach to track when each update occurs, and given we're only interacting with monotonically increasing counters, we can take the delta between any two time points and that will give us a reasonably accurate rate of change. This allows us to create the equivalent of `recon:proc_window` but for `couch_db:proc_window` to find busiest processes/requests/databases/shards/users/etc. The timestamp delta approach also allows us to send deltas from rpc worker back to coordinator during the normally desired `rexi:reply` occurances by embedding the resource delta into the rexi reply, allowing us to incrementally send back worker usage stats to be aggregated at the coordinator level so we can find runaway processes prior to seeing them in the report logs. This approach also allows us extend `rexi:ping` with the same logic so we can keep resource usage stats streaming independently of when rows are streamed back to the coodinator (a major issue we've encountered with long running bad `_find` queries). This also allows us to create watchdog processes that kill a cluster process chain in the event it has surpassed certain thresholds. This would be easily achievable at the node local level or coordinator cluster level. We'll also likely want some type of watchdog process to clear our old entries in the ETS cache. The process being tracked itself shouldn't be accountable for the longevity in the stats table, as we want to leave a small buffer of time so we don't lose info about the processes the second they exit. This is a common problem when using `recon:proc_window` where if you try and `process_info` a process you found with `proc_window` it could be dead by the end of the proc window, losing any potential insights. By having say a 10 second buffer on letting processes remained completed in the stats table we can see more easily track and interact with live work. This system also allows us to generate a per process report of resources consumed both for local processes invoked by way of `fabric_rpc` and also cluster wide aggregations at the coordinator level allowing for realtime usage reporting of active requests utilizing the new report engine. For example: > [report] 2023-10-06T23:24:35.974642Z [email protected] <0.296.0> -------- [csrt-pid-usage-lifetime changes_processed=0 docs_read=0 mfa="{fabric_rpc,open_doc,3}" nonce="null" pid_ref="<0.1558.0>#Ref<0.1796228611.1272446977.134363>" rows_read=0 state="{down,noproc}" updated_at="{1696,634675,974598}"] # Core Tasks * extend stats collection into `fabric_rpc` (or `mango_httpd` `dreyfus_rpc`, etc) - target core streaming API operations; changes feed as first step - each operation should include _at least_ metrics for: 1) number of rpc invocations 2) number of documents processed (request magnitude) 3) number of documents streamed out (response magnitude) - for example, changes feed should do: 1) `couch_stats:increment_counter([fabric_rpc, changes_feed])` - on rpc invocation 2) `couch_stats:increment_counter([fabric_rpc, changes_feed, row_processed])` - on rows read; perhaps also for docs read 3) `couch_stats:increment_counter([fabric_rpc, changes_feed, rows_returned])` - when streaming rows back to the client 4) we should also include JS filter invocations for relevant API operations - somewhat awkward with rows vs docs and needing js filters - we'll likely need to play around with what all to collect here, but in general we need direct visibility into core operations induced and what is performing them; Cluster level metrics give us the former, and reports/real time reporting gives us the latter * Extend `couch_stats` to do local process stat tracking - when we do `couch_stats:increment_counter`, do something like `maybe_track_local_stat(StatOp)` where we can track local process stats for the subset of stats we're interested in having reports and real time stats available for - this is the key second step from tracking the additional RPC data. Basically we start tracking the core data we want so we can use that for cluster status introspection and usage levels, then we gather the local stats for identifying which requests are heaviest - this provides a mechanism to track any additional stats on RPC we want without having to immediately introduce new metrics for all RPC operations we want, while skipping undesired operations * Store local stats into ETS backend - several counters backend options available. I believe an ETS table (or shared set of ETS tables like we do with `couch_server(N)` and others) that utilizies a combination of `read_concurrency`, `write_concurrency`, and `distributed_counters` will provide us with a performant system that allows for isolated tracking of per process stats in a way that allows for concurrent writers with no overlapping writers in addition to read concurrency and distributed counters to minimize impact of writing to the shared table across processes. Each process will be its only writer, minimizing concurrency issues, yet we can still do a more expensive read operation over the ETS table(s) to allow for aggregate real time tooling to expose at the `remsh` and HTTP API (eg `couch_debug` functions for finding heaviest requests or similar exposing of data over http to end up in a Fauxton interface providing real time stats - key off of `{self(), make_ref()}` to ensure uniqueness in stats table - also can store: - user - db/shard - request type - coordinator vs rpc vs background task - these are specifically for introspection and grouping on with ets queries - use records for tracking data - this allows for use of atomic `ets:update_counter` operations - easily interact with particular fields - also allows for `ets:match` and `ets:select` to perform aggregate queries about overal system load. I think this is only possible with records in ETS, I don't think we could use counters and select with maps? * Extend `rexi:{reply,stream,etc}` to send usage deltas - we can take the current stats record for the process and delta with that since a `T0` stats record and provide a delta. We we make that delta we store the stats at the time of the delta `TDelta1`, then afterwards we perform our deltas against the last record snapshot and then we can continue to send deltas over time and still return a full workload usage if desired. - key thing here is we need to stream the stats back, we can't wait until the RPC worker is done because sometimes that takes hours. Need to be able to get iterative stats so we can find problematic processes live - need to address related issue where responses coming in from other shards are not accounted for in the usage as those responses are dropped, eg when limit is reached in Mango find at the coordinator level before shard level - alternative is specifically sending stats lines, but I think we'll be far better served by always sending deltas with every rpc line * Introduce watchdog process around stats collection This has two core jobs: 1) clear out old process stats after desired delay - we want to have data about recent processes that just exited to faciliate human interaction with busy yet fast lived processes 2) provide an automated mechanism for killing processes - I think this is an important initial feature, at the very least we should be able to tell a cluster to kill find requests that are still going after an hour, for example. - can easily find long running processes or heavy processes using ets:select - can also key on user to find most active users - I think with a bit of thought and a few configuration options we can make this usable for helping out with problematic specific workflows on a per user/request basis - for example: - "automatically kill all find requests that have processed a million docs yet returned none" * Introduce remsh/http level introspection tooling - eg make `couchdb:proc_window` and some other tools for filtering and sorting the stats collection tables. This tooling should be readily available by way of remsh but we should also expose the same introspection APIs over http from the get go so that this can be utilized directly by users with curl and end up in a nice Fauxton dashboard showing realtime usage levels. - this doesn't have to be complicated, but should easily be able to query the local rpc node, all cluster rpc nodes, and all cluster coordinators. It should be easy to get a cluster level view of overall usage, grouped by the desired set of stats. - provide tooling to teardown RPC workers and coordinators with full worker teardown. Once we make it easy to find the heavy hitters, we need to make it easy to take action against them. Obviously being able to kill the processes is essential, but I could see providing additional functionality like perhaps allowing for lowering the IOQ priority for the given request if it's a request that genuinely needs to complete but is impacting the system. I've got a hacked together http API to demonstrate being able to query the stats collection across the cluster directly over http: <img width="1719" alt="Screenshot 2023-10-06 at 3 29 50 PM" src="https://github.com/apache/couchdb/assets/5326/1aebe0f7-ec8a-4f36-8a0f-f92385d8a96b"> # Overall Approach The key task here is to build out the mechanisms for collecting/tracking stats locally, shipping them back over the wire to the coordinator for aggregation, handling the deltas, exposing them realtime stats, and then generating reports with the data. I believe this to be the core work at hand, where afterwards extending out the API operations to collect the data is fairly straightforward; for instance, once the functionality is in place, adding changes feed reports is simply adding the new stats to collect to the relevant `fabric_rpc` functions and then ensuring we've flagged those new stats as desired fields to expose in our resource usage tracking. We could easily extend compactions or indexing with similar stats and reporting logic. Once we have a system in place for tracking process local metrics, sending and aggregating the deltas, then exposing the data over querying interfaces, we can easily extend this level of introspection to any operations in the system, and in my opinion, we should extend this coverage to 100% of database operations such that all resource usage is accounted for and tracked. This is still a bit rough in a few areas, and not everything is fully operational, but essentially the core framework is operational and demonstrates a working version of what I've detailed out. I believe this to be a viable approach and that the proof of concept provides a clear path forward on how to deliver this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
