chewbranca opened a new pull request, #4812:
URL: https://github.com/apache/couchdb/pull/4812

   # Couch Stats Resource Tracker Overview
   
   We currently lack visibility into the types of operations that are inducing
   system resource usage in CouchDB. We can see in the IOQ stats the raw
   quantities of IO operations being induced, at categories of essentially 
"normal
   database reads", "database writes", "view index writes", "internal
   replication", and "compaction". The problem is connecting the dots between 
the
   data in IOQ and the corresponding database operation types inducing the work,
   and going a step further, connecting that with individual requests to 
identify
   the actual operations inducing all of the work.
   
   In particular, I believe this is especially pronounced in aggregate
   database read operations that perform a fold over the `id_tree` or 
`seq_tree` of
   the underlying `.couch` file. We do not currently instrument 
`couch_btree.erl`
   with `couch_stats` to track operations, so we do not have any data to 
correlate
   `couch_btree:fold*` operations with the volume of IOQ traffic flowing through
   the system. A basic first step would be to add counters for the amount of 
btree
   fold reads performed, however, that would still be insufficient as it would 
not
   allow us to distinguish between `_all_docs`, `_view`, `_find`, and `_changes`
   unless we track which type of fold operation is being induced.
   
   Ideally, we should be able to correlate the overall IOQ throughput with the
   different types of API operations being induced. Beyond that, it would be
   fantastic if we can extend CouchDB's introspection capabilities such that we
   can track the types of operations being performed at the user/db/request 
level,
   allowing users to understand what operations and requests are utilizing 
system
   resources.
   
   This writeup proposes an approach to alleviate these introspection 
blindspots by
   collecting additional cluster level metrics at the RPC operation level 
providing
   context about the overall volume of requests, the magnitude of data 
processed,
   and the response payload magnitude. By extending the global stats collection
   logic we can allow for process local tracking of stats and build a system for
   tracking and exposing that data at the RPC level, the node local level, and 
at
   an HTTP API level. I think it's essential we target cluster level workloads 
in
   addition to request level so we can correlate resource usage with individual
   requests. Given how variable API operations are in CouchDB (eg single doc 
read
   reads one doc from 3 shard replicas, whereas a `_find` query failing to find 
an
   index will do a full database scan _every time_), I think it's important  to 
be
   able to easily see what types of operations are consuming cluster resources, 
and
   then readily identify the heavy hitting requests.
   
   This data can be tracked in records in ETS tables allowing for realtime
   introspection of active resource consumption across all operations. We can 
focus
   on monotonically increasing counters for tracking core operations and avoid 
an
   assortment of complexity from negative values and dealing with histograms. We
   can leave histograms to other stacks by tracking start/etc/stop timestamps. 
By
   only focusing on monotonically increasing values as a function of resource
   consumption we can take a delta between any two timestamps and get a precise
   rate of consumption for this process/coordinator during the given time 
window.
   This also allows us to iteratively track stats between any start and stop 
point.
   The simple case is from process start to process finish, but anywhere any
   between is a valid delta and we can distinguish between live snapshots and 
final
   full process workload. By having RPC worker processes store data to the node
   local ETS tables in addition to sending resouce usage deltas in rexi
   replies we can track node local RPC worker resource usage in addition to node
   local coordinator processes and how much work they've induced across the
   cluster. This provides both insight into the active heavy operations in the
   cluster but also provides users with easy access to identify heavy hitter
   requests for further optimization.
   
   By focusing on only tracking monotonically increasing counters about core
   resource usage we can utilize a delta based system allowing for resource
   tracking on the desired stat. Essentially this allows us to create
   `recon:proc_window` but `couchdb:proc_window` and identify the most active
   processes by account or docs reads or IOQ calls or rows read or JS filter
   invocations or by data volume or whatever else we choose to track.
   
   The delta based approach allows us to decouple from any particular time
   interval based collection approach and instead focus on what provides the 
most
   insight in any particular context. This allows at the base case to collect
   stats time start to time end, but also provides intermediate values at 
whtaever
   intervals/rates we desire. This also allows us to use the same stats 
collection
   logic for both streaming and non streaming API endpoints as we can handle 
full
   deltas or partials. It's key we support delta operations as we need to see 
long
   running processes live, not when they're completed potentially hours later, 
so
   delta updates are essential to live tracking on long running operations. (In
   practice, long running `_find` query RPC workers can induce work for
   potentially hours, so it's inappropriate to wait until the worker completes 
to
   stream the resource usage, as otherwise it'll be hours behind when it 
actually
   happened).
   
   With the idea of using the existing `couch_stats:increment_counter` logic to
   handle resource tracking, we can extend that `increment_counter` logic so 
that
   in addition to the normal global tracking of the desired metric, we also 
begin
   tracking a process local set of statistics allowing us so isolate resources
   utilized to individual requests and even rpc workers.
   
   This is straightforward to accomplish from within
   `couch_stats:increment_counter` as we have the exact stats being recorded but
   also we're still in the process that invoked those stats, so this draft PR 
also
   has the local process performing operations update an ets table entry for its
   own usage levels. This results in all worker processes concurrently updating
   ETS tables in realtime as a function of resource usage. Unlike our normal 
stats
   collection where we have many processes writing to a small number of keys
   creating lock contention, for process local stats each process only writes 
to a
   singular key corresponding to the process ID, so there is no overlapping 
stats
   collection across keys, allowing us to take full advantage of ETS
   `write_concurrency`.
   
   Each process updates in realtime its process usage stats as the normal
   `couch_stats:increment_counter` code paths are exercised. We utilize a
   timestamp based approach to track when each update occurs, and given we're 
only
   interacting with monotonically increasing counters, we can take the delta
   between any two time points and that will give us a reasonably accurate rate 
of
   change. This allows us to create the equivalent of `recon:proc_window` but 
for
   `couch_db:proc_window` to find busiest
   processes/requests/databases/shards/users/etc.
   
   The timestamp delta approach also allows us to send deltas from rpc worker 
back
   to coordinator during the normally desired `rexi:reply` occurances by 
embedding
   the resource delta into the rexi reply, allowing us to incrementally send 
back
   worker usage stats to be aggregated at the coordinator level so we can find
   runaway processes prior to seeing them in the report logs. This approach also
   allows us extend `rexi:ping` with the same logic so we can keep resource 
usage
   stats streaming independently of when rows are streamed back to the 
coodinator
   (a major issue we've encountered with long running bad `_find` queries).
   
   This also allows us to create watchdog processes that kill a cluster process
   chain in the event it has surpassed certain thresholds. This would be easily
   achievable at the node local level or coordinator cluster level. We'll also
   likely want some type of watchdog process to clear our old entries in the ETS
   cache. The process being tracked itself shouldn't be accountable for the
   longevity in the stats table, as we want to leave a small buffer of time so 
we
   don't lose info about the processes the second they exit. This is a common
   problem when using `recon:proc_window` where if you try and `process_info` a
   process you found with `proc_window` it could be dead by the end of the proc
   window, losing any potential insights. By having say a 10 second buffer on
   letting processes remained completed in the stats table we can see more 
easily
   track and interact with live work.
   
   This system also allows us to generate a per process report of resources
   consumed both for local processes invoked by way of `fabric_rpc` and also
   cluster wide aggregations at the coordinator level allowing for realtime 
usage
   reporting of active requests utilizing the new report engine. For example:
   
   > [report] 2023-10-06T23:24:35.974642Z [email protected] <0.296.0> -------- 
[csrt-pid-usage-lifetime changes_processed=0 docs_read=0 
mfa="{fabric_rpc,open_doc,3}" nonce="null" 
pid_ref="<0.1558.0>#Ref<0.1796228611.1272446977.134363>" rows_read=0 
state="{down,noproc}" updated_at="{1696,634675,974598}"]
   
   
   # Core Tasks
   
   * extend stats collection into `fabric_rpc` (or `mango_httpd` `dreyfus_rpc`, 
etc)
     - target core streaming API operations; changes feed as first step
     - each operation should include _at least_ metrics for:
       1) number of rpc invocations
       2) number of documents processed (request magnitude)
       3) number of documents streamed out (response magnitude)
     - for example, changes feed should do:
       1) `couch_stats:increment_counter([fabric_rpc, changes_feed])`
         - on rpc invocation
       2) `couch_stats:increment_counter([fabric_rpc, changes_feed, 
row_processed])`
         - on rows read; perhaps also for docs read
       3) `couch_stats:increment_counter([fabric_rpc, changes_feed, 
rows_returned])`
         - when streaming rows back to the client
       4) we should also include JS filter invocations for relevant API 
operations 
     - somewhat awkward with rows vs docs and needing js filters
     - we'll likely need to play around with what all to collect here, but in
       general we need direct visibility into core operations induced and what 
is
       performing them; Cluster level metrics give us the former, and 
reports/real
       time reporting gives us the latter
   
   * Extend `couch_stats` to do local process stat tracking
     - when we do `couch_stats:increment_counter`, do something like
       `maybe_track_local_stat(StatOp)` where we can track local process stats 
for
       the subset of stats we're interested in having reports and real time 
stats
       available for
     - this is the key second step from tracking the additional RPC data. 
Basically
       we start tracking the core data we want so we can use that for cluster
       status introspection and usage levels, then we gather the local stats for
       identifying which requests are heaviest
     - this provides a mechanism to track any additional stats on RPC we want
       without having to immediately introduce new metrics for all RPC 
operations we
       want, while skipping undesired operations
   
   * Store local stats into ETS backend
     - several counters backend options available. I believe an ETS table (or
       shared set of ETS tables like we do with `couch_server(N)` and others) 
that
       utilizies a combination of `read_concurrency`, `write_concurrency`, and
       `distributed_counters` will provide us with a performant system that 
allows
       for isolated tracking of per process stats in a way that allows for
       concurrent writers with no overlapping writers in addition to read
       concurrency and distributed counters to minimize impact of writing to the
       shared table across processes. Each process will be its only writer,
       minimizing concurrency issues, yet we can still do a more expensive read
       operation over the ETS table(s) to allow for aggregate real time tooling 
to
       expose at the `remsh` and HTTP API (eg `couch_debug` functions for 
finding
       heaviest requests or similar exposing of data over http to end up in a
       Fauxton interface providing real time stats
     - key off of `{self(), make_ref()}` to ensure uniqueness in stats table
     - also can store:
       - user
       - db/shard
       - request type
       - coordinator vs rpc vs background task
       - these are specifically for introspection and grouping on with ets 
queries
     - use records for tracking data
       - this allows for use of atomic `ets:update_counter` operations
       - easily interact with particular fields
       - also allows for `ets:match` and `ets:select` to perform aggregate 
queries
         about overal system load. I think this is only possible with records in
         ETS, I don't think we could use counters and select with maps?
   
   * Extend `rexi:{reply,stream,etc}` to send usage deltas
     - we can take the current stats record for the process and delta with that
       since a `T0` stats record and provide a delta. We we make that delta we
       store the stats at the time of the delta `TDelta1`, then afterwards we 
       perform our deltas against the last record snapshot and then we can 
continue
       to send deltas over time and still return a full workload usage if 
desired.
     - key thing here is we need to stream the stats back, we can't wait until 
the
       RPC worker is done because sometimes that takes hours. Need to be able 
to get
       iterative stats so we can find problematic processes live
     - need to address related issue where responses coming in from other shards
       are not accounted for in the usage as those responses are dropped, eg 
when
       limit is reached in Mango find at the coordinator level before shard 
level
     - alternative is specifically sending stats lines, but I think we'll be far
       better served by always sending deltas with every rpc line
   
   * Introduce watchdog process around stats collection
   
   This has two core jobs:
   
     1) clear out old process stats after desired delay
       - we want to have data about recent processes that just exited to 
faciliate
         human interaction with busy yet fast lived processes
     2) provide an automated mechanism for killing processes
       - I think this is an important initial feature, at the very least we 
should
         be able to tell a cluster to kill find requests that are still going 
after
         an hour, for example.
       - can easily find long running processes or heavy processes using 
ets:select
       - can also key on user to find most active users
       - I think with a bit of thought and a few configuration options we can 
make
         this usable for helping out with problematic specific workflows on a 
per
         user/request basis
       - for example:
         - "automatically kill all find requests that have processed a million 
docs
           yet returned none"
   
   * Introduce remsh/http level introspection tooling
     - eg make `couchdb:proc_window` and some other tools for filtering and 
sorting
       the stats collection tables. This tooling should be readily available by 
way
       of remsh but we should also expose the same introspection APIs over http
       from the get go so that this can be utilized directly by users with curl 
and
       end up in a nice Fauxton dashboard showing realtime usage levels.
     - this doesn't have to be complicated, but should easily be able to query 
the
       local rpc node, all cluster rpc nodes, and all cluster coordinators. It
       should be easy to get a cluster level view of overall usage, grouped by 
the
       desired set of stats.
     - provide tooling to teardown RPC workers and coordinators with full worker
       teardown. Once we make it easy to find the heavy hitters, we need to 
make it
       easy to take action against them. Obviously being able to kill the 
processes
       is essential, but I could see providing additional functionality like
       perhaps allowing for lowering the IOQ priority for the given request if 
it's
       a request that genuinely needs to complete but is impacting the system.
   
   I've got a hacked together http API to demonstrate being able to query the 
stats
   collection across the cluster directly over http:
   
   <img width="1719" alt="Screenshot 2023-10-06 at 3 29 50 PM" 
src="https://github.com/apache/couchdb/assets/5326/1aebe0f7-ec8a-4f36-8a0f-f92385d8a96b";>
   
   
   # Overall Approach
   
   The key task here is to build out the mechanisms for collecting/tracking 
stats
   locally, shipping them back over the wire to the coordinator for aggregation,
   handling the deltas, exposing them realtime stats, and then generating 
reports
   with the data. I believe this to be the core work at hand, where afterwards
   extending out the API operations to collect the data is fairly 
straightforward;
   for instance, once the functionality is in place, adding changes feed reports
   is simply adding the new stats to collect to the relevant `fabric_rpc`
   functions and then ensuring we've flagged those new stats as desired fields 
to
   expose in our resource usage tracking. We could easily extend compactions or
   indexing with similar stats and reporting logic. Once we have a system in 
place
   for tracking process local metrics, sending and aggregating the deltas, then
   exposing the data over querying interfaces, we can easily extend this level 
of
   introspection to any operations in the system, and in my opinion, we should
   extend this coverage to 100% of database operations such that all resource
   usage is accounted for and tracked.
   
   This is still a bit rough in a few areas, and not everything is fully
   operational, but essentially the core framework is operational and 
demonstrates
   a working version of what I've detailed out. I believe this to be a viable
   approach and that the proof of concept provides a clear path forward on how 
to
   deliver this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to