@@ -0,0 +1,1014 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%   http://www.apache.org/licenses/LICENSE-2.0
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+    start_link/0,
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3,
+    terminate/2
+    inc/1, inc/2,
+    maybe_inc/2,
+    get_pid_ref/0,
+    accumulate_delta/1
+    create_context/0, create_context/1, create_context/3,
+    create_coordinator_context/1, create_coordinator_context/2,
+    is_enabled/0,
+    get_resource/0,
+    get_resource/1,
+    set_context_dbname/1,
+    set_context_handler_fun/1,
+    set_context_username/1,
+    track/1,
+    should_track/1
+    active/0,
+    active_coordinators/0,
+    active_workers/0,
+    find_unmonitored/0
+    count_by/1,
+    group_by/2,
+    group_by/3,
+    group_by/4,
+    sorted/1,
+    sorted_by/1,
+    sorted_by/2,
+    sorted_by/3,
+    find_by_pid/1,
+    unsafe_foldl/3,
+    term_to_flat_json/1
+    make_delta/0
+%% Singular increment operations
+    db_opened/0,
+    doc_read/0,
+    row_read/0,
+    btree_fold/0,
+    ioq_called/0,
+    js_evaled/0,
+    js_filtered/0,
+    js_filtered_error/0,
+    js_filtered_doc/0,
+    mango_match_evaled/0,
+    get_kv_node/0,
+    get_kp_node/0
+%% Plural increment operations
+    js_filtered_docs/1,
+    io_bytes_read/1,
+    io_bytes_written/1
+    field/2,
+    curry_field/1
+%% Use these for record upgrades over the wire and in ETS tables
+%% TODO: alternatively, just delete these. Currently using a map
+%% for shipping deltas over the wire, avoiding much of the
+%% problem here. We'll likely still need to handle upgrades to
+%% map format over time, so let's decide a course of action here.
+-define(RCTX_V1, rctx_v1).
+-define(RCTX, ?RCTX_V1).
+-define(MANGO_EVAL_MATCH, mango_eval_match).
+-define(DB_OPEN_DOC, docs_read).
+-define(DB_OPEN, db_open).
+-define(COUCH_SERVER_OPEN, db_open).
+-define(COUCH_BT_FOLDS, btree_folds).
+-define(COUCH_BT_GET_KP_NODE, get_kp_node).
+-define(COUCH_BT_GET_KV_NODE, get_kv_node).
+-define(COUCH_JS_FILTER, js_filter).
+-define(COUCH_JS_FILTER_ERROR, js_filter_error).
+-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs).
+-define(ROWS_READ, rows_read).
+%% TODO: overlap between this and couch btree fold invocations
+%% TODO: need some way to distinguish fols on views vs find vs all_docs
+-define(FRPC_CHANGES_ROW, changes_processed).
+-define(FRPC_CHANGES_RETURNED, changes_returned).
+%% Module pdict markers
+-define(DELTA_TA, csrt_delta_ta).
+-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0
+-define(PID_REF, csrt_pid_ref). %% track local ID
+-record(st, {
+    eviction_delay = 10 * 1000, %% How many ms dead processes are visible
+    scan_interval = 2048, %% How regularly to perfom scans
+    tracking = #{} %% track active processes for eventual eviction
+%% TODO: switch to:
+%% -record(?RCTX, {
+-record(rctx, {
+    %% Metadata
+    started_at = tnow(),
+    updated_at = tnow(),
+    exited_at, %% TODO: do we need a final exit time and additional update 
times afterwards?
+    pid_ref,
+    mon_ref,
+    mfa,
+    nonce,
+    from,
+    type = unknown, %% 
+    state = alive,
+    dbname,
+    username,
+    %% Stats counters
+    db_open = 0,
+    docs_read = 0,
+    rows_read = 0,
+    btree_folds = 0,
+    changes_processed = 0,
+    changes_returned = 0,
+    ioq_calls = 0,
+    io_bytes_read = 0,
+    io_bytes_written = 0,
+    js_evals = 0,
+    js_filter = 0,
+    js_filter_error = 0,
+    js_filtered_docs = 0,
+    mango_eval_match = 0,
+    %% TODO: switch record definitions to be macro based, eg:
+    %% ?COUCH_BT_GET_KP_NODE = 0,
+    get_kv_node = 0,
+    get_kp_node = 0
+%% monotonic time now in millisecionds
+tnow() ->
+    erlang:monotonic_time(millisecond).
+is_enabled() ->
+    config:get_boolean(?MODULE_STRING, "enabled", true).
+db_opened() -> inc(db_opened).
+doc_read() -> inc(docs_read).
+row_read() -> inc(rows_read).
+btree_fold() -> inc(?COUCH_BT_FOLDS).
+%% TODO: do we need ioq_called and this access pattern?
+ioq_called() -> is_enabled() andalso inc(ioq_calls).
+js_evaled() -> inc(js_evals).
+js_filtered() -> inc(js_filter).
+js_filtered_error() -> inc(js_filter_error).
+js_filtered_doc() -> inc(js_filtered_docs).
+mango_match_evaled() -> inc(mango_eval_match).
+get_kv_node() -> inc(get_kv_node).
+get_kp_node() -> inc(get_kp_node).
+js_filtered_docs(N) -> inc(js_filtered_docs, N).
+io_bytes_read(N) -> inc(io_bytes_read, N).
+io_bytes_written(N) -> inc(io_bytes_written, N).
+inc(?DB_OPEN) ->
+    inc(?DB_OPEN, 1);
+inc(docs_read) ->
+    inc(docs_read, 1);
+inc(?ROWS_READ) ->
+    inc(?ROWS_READ, 1);
+inc(?COUCH_BT_FOLDS) ->
+    inc(?COUCH_BT_FOLDS, 1);
+inc(ioq_calls) ->
+    inc(ioq_calls, 1);
+inc(io_bytes_read) ->
+    inc(io_bytes_read, 1);
+inc(io_bytes_written) ->
+    inc(io_bytes_written, 1);
+inc(js_evals) ->
+    inc(js_evals, 1);
+    inc(?COUCH_JS_FILTER, 1);
+    inc(?COUCH_JS_FILTER_ERROR, 1);
+    inc(?MANGO_EVAL_MATCH, 1);
+    inc(?COUCH_BT_GET_KP_NODE, 1);
+    inc(?COUCH_BT_GET_KV_NODE, 1);
+inc(_) ->
+    0.
+inc(?DB_OPEN, N) ->
+    update_counter(#rctx.?DB_OPEN, N);
+inc(?ROWS_READ, N) ->
+    update_counter(#rctx.?ROWS_READ, N);
+    update_counter(#rctx.?FRPC_CHANGES_RETURNED, N);
+inc(ioq_calls, N) ->
+    update_counter(#rctx.ioq_calls, N);
+inc(io_bytes_read, N) ->
+    update_counter(#rctx.io_bytes_read, N);
+inc(io_bytes_written, N) ->
+    update_counter(#rctx.io_bytes_written, N);
+inc(js_evals, N) ->
+    update_counter(#rctx.js_evals, N);
+inc(?COUCH_JS_FILTER, N) ->
+    update_counter(#rctx.?COUCH_JS_FILTER, N);
+    update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N);
+    update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N);
+    update_counter(#rctx.?MANGO_EVAL_MATCH, N);
+inc(?DB_OPEN_DOC, N) ->
+    update_counter(#rctx.?DB_OPEN_DOC, N);
+    update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of 
+    update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N);
+    update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N);
+inc(_, _) ->
+    %% inc needs to allow unknown types to pass for accumulate_update to handle
+    %% updates from nodes with newer data formats
+    0.
+maybe_inc([mango, evaluate_selector], Val) ->
+    inc(?MANGO_EVAL_MATCH, Val);
+maybe_inc([couchdb, database_reads], Val) ->
+    inc(?DB_OPEN_DOC, Val);
+maybe_inc([fabric_rpc, changes, processed], Val) ->
+    inc(?FRPC_CHANGES_ROW, Val);
+maybe_inc([fabric_rpc, changes, returned], Val) ->
+maybe_inc([fabric_rpc, view, rows_read], Val) ->
+    inc(?ROWS_READ, Val);
+maybe_inc([couchdb, couch_server, open], Val) ->
+    inc(?DB_OPEN, Val);
+maybe_inc([couchdb, btree, folds], Val) ->
+    inc(?COUCH_BT_FOLDS, Val);
+maybe_inc([couchdb, btree, kp_node], Val) ->
+    inc(?COUCH_BT_GET_KP_NODE, Val);
+maybe_inc([couchdb, btree, kv_node], Val) ->
+    inc(?COUCH_BT_GET_KV_NODE, Val);
+maybe_inc([couchdb, query_server, js_filter_error], Val) ->
+    inc(?COUCH_JS_FILTER_ERROR, Val);
+maybe_inc([couchdb, query_server, js_filter], Val) ->
+    inc(?COUCH_JS_FILTER, Val);
+maybe_inc([couchdb, query_server, js_filtered_docs], Val) ->
+maybe_inc(_Metric, _Val) ->
+    %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]),
+    0.
+%% TODO: update stats_descriptions.cfg for relevant apps
+should_track([fabric_rpc, all_docs, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, changes, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, changes, processed]) ->
+    is_enabled();
+should_track([fabric_rpc, changes, returned]) ->
+    is_enabled();
+should_track([fabric_rpc, map_view, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, reduce_view, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, get_all_security, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, open_doc, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, update_docs, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, open_shard, spawned]) ->
+    is_enabled();
+should_track([mango_cursor, view, all_docs]) ->
+    is_enabled();
+should_track([mango_cursor, view, idx]) ->
+    is_enabled();
+should_track(_Metric) ->
+    %%io:format("SKIPPING METRIC: ~p~n", [Metric]),
+    false.
+accumulate_delta(Delta) when is_map(Delta) ->
+    %% TODO: switch to creating a batch of updates to invoke a single
+    %% update_counter rather than sequentially invoking it for each field
+    is_enabled() andalso maps:foreach(fun inc/2, Delta);
+accumulate_delta(undefined) ->
+    ok;
+accumulate_delta(Other) ->
+    io:format("CSRT:ACC_DELTA UNKNOWN DELTA: ~p~n", [Other]).
+update_counter(Field, Count) ->
+    is_enabled() andalso update_counter(get_pid_ref(), Field, Count).
+update_counter({_Pid,_Ref}=PidRef, Field, Count) ->
+    %% TODO: mem3 crashes without catch, why do we lose the stats table?
+    is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, 
Count}, #rctx{pid_ref=PidRef}).
+active() -> active_int(all).
+active_coordinators() -> active_int(coordinators).
+active_workers() -> active_int(workers).
+active_int(coordinators) ->
+    select_by_type(coordinators);
+active_int(workers) ->
+    select_by_type(workers);
+active_int(all) ->
+    lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)).
+select_by_type(coordinators) ->
+    ets:select(couch_stats_resource_tracker,
+        [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]);
+select_by_type(workers) ->
+    ets:select(couch_stats_resource_tracker,
+        [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]);
+select_by_type(all) ->
+    lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)).
+field(#rctx{pid_ref=Val}, pid_ref) -> Val;
+%% NOTE: Pros and cons to doing these convert functions here
+%% Ideally, this would be done later so as to prefer the core data structures
+%% as long as possible, but we currently need the output of this function to
+%% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by
+%% structure provided by the caller of *_by aggregator functions below.
+%% For now, we just always return jiffy:encode'able data types.
+field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val);
+field(#rctx{nonce=Val}, nonce) -> Val;
+field(#rctx{from=Val}, from) -> Val;
+field(#rctx{type=Val}, type) -> convert_type(Val);
+field(#rctx{state=Val}, state) -> Val;
+field(#rctx{dbname=Val}, dbname) -> Val;
+field(#rctx{username=Val}, username) -> Val;
+field(#rctx{db_open=Val}, db_open) -> Val;
+field(#rctx{docs_read=Val}, docs_read) -> Val;
+field(#rctx{rows_read=Val}, rows_read) -> Val;
+field(#rctx{btree_folds=Val}, btree_folds) -> Val;
+field(#rctx{changes_processed=Val}, changes_processed) -> Val;
+field(#rctx{changes_returned=Val}, changes_returned) -> Val;
+field(#rctx{ioq_calls=Val}, ioq_calls) -> Val;
+field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val;
+field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val;
+field(#rctx{js_evals=Val}, js_evals) -> Val;
+field(#rctx{js_filter=Val}, js_filter) -> Val;
+field(#rctx{js_filter_error=Val}, js_filter_error) -> Val;
+field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val;
+field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val;
+field(#rctx{get_kv_node=Val}, get_kv_node) -> Val;
+field(#rctx{get_kp_node=Val}, get_kp_node) -> Val.
+curry_field(Field) ->
+    fun(Ele) -> field(Ele, Field) end.
+count_by(KeyFun) ->
+    group_by(KeyFun, fun(_) -> 1 end).
+group_by(KeyFun, ValFun) ->
+    group_by(KeyFun, ValFun, fun erlang:'+'/2).
+group_by(KeyFun, ValFun, AggFun) ->
+    group_by(KeyFun, ValFun, AggFun, fun ets:foldl/3).
+%% eg: group_by(mfa, docs_read).
+%% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls).
+%% eg: ^^ or: group_by([mfa, docs_read], ioq_calls).
+%% eg: group_by([username, dbname, mfa], docs_read).
+%% eg: group_by([username, dbname, mfa], ioq_calls).
+%% eg: group_by([username, dbname, mfa], js_filters).
+group_by(KeyL, ValFun, AggFun, Fold) when is_list(KeyL) ->
+    KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end,
+    group_by(KeyFun, ValFun, AggFun, Fold);
+group_by(Key, ValFun, AggFun, Fold) when is_atom(Key) ->
+    group_by(curry_field(Key), ValFun, AggFun, Fold);
+group_by(KeyFun, Val, AggFun, Fold) when is_atom(Val) ->
+    group_by(KeyFun, curry_field(Val), AggFun, Fold);
+group_by(KeyFun, ValFun, AggFun, Fold) ->
+    FoldFun = fun(Ele, Acc) ->
+        Key = KeyFun(Ele),
+        Val = ValFun(Ele),
+        CurrVal = maps:get(Key, Acc, 0),
+        NewVal = AggFun(CurrVal, Val),
+        maps:put(Key, NewVal, Acc)
+    end,
+    Fold(FoldFun, #{}, ?MODULE).
+%% Sorts largest first
+sorted(Map) when is_map(Map) ->
+    lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)).
+%% eg: sorted_by([username, dbname, mfa], ioq_calls)
+%% eg: sorted_by([dbname, mfa], doc_reads)
+sorted_by(KeyFun) -> sorted(count_by(KeyFun)).
+sorted_by(KeyFun, ValFun) -> sorted(group_by(KeyFun, ValFun)).
+sorted_by(KeyFun, ValFun, AggFun) -> sorted(group_by(KeyFun, ValFun, AggFun)).
+term_to_flat_json({shutdown, Reason0}) when is_atom(Reason0) ->
+    Reason = atom_to_binary(Reason0),
+    <<"shutdown: ", Reason/binary>>;
+term_to_flat_json({type, Atom}) when is_atom(Atom) ->
+    atom_to_binary(Atom);
+term_to_flat_json({type, {coordinator, Verb0, Path0}}=_Type) ->
+    Verb = atom_to_binary(Verb0),
+    Path = list_to_binary(Path0),
+    <<"coordinator:", Verb/binary, ":", Path/binary>>;
+term_to_flat_json({type, {worker, M0, F0}}=_Type) ->
+    M = atom_to_binary(M0),
+    F = atom_to_binary(F0),
+    <<"worker:", M/binary, ":", F/binary>>;
+term_to_flat_json(Tuple) when is_tuple(Tuple) ->
+    erlang:tuple_to_list(Tuple);
+term_to_flat_json(Pid) when is_pid(Pid) ->
+    ?l2b(pid_to_list(Pid));
+term_to_flat_json(Ref) when is_reference(Ref) ->
+    ?l2b(ref_to_list(Ref));
+term_to_flat_json(Atom) when is_atom(Atom) ->
+    atom_to_binary(Atom);
+term_to_flat_json(undefined) ->
+    null;
+term_to_flat_json(null) ->
+    null;
+term_to_flat_json(T) ->
+    T.
+to_flat_json(#rctx{}=Rctx) ->
+    #rctx{
+        updated_at = TP,
+        started_at = TInit,
+        pid_ref = {Pid, Ref},
+        mfa = MFA0,
+        nonce = Nonce0,
+        from = From0,
+        dbname = DbName,
+        username = UserName,
+        db_open = DbOpens,
+        docs_read = DocsRead,
+        rows_read = RowsRead,
+        js_filter = JSFilters,
+        js_filter_error = JSFilterErrors,
+        js_filtered_docs = JSFilteredDocss,
+        state = State0,
+        type = Type,
+        get_kp_node = KpNodes,
+        get_kv_node = KvNodes,
+        btree_folds = ChangesProcessed,
+        changes_returned = ChangesReturned,
+        ioq_calls = IoqCalls
+    } = Rctx,
+    PidRef = {term_to_flat_json(Pid), term_to_flat_json(Ref)},
+    MFA = case MFA0 of
+        {M0, F0, A0} ->

Review Comment:
   {_, _, _} -> convert_mfa(MFA1)
   also do we want guards `is_atom(M)`, `is_atom(F)`, `is_integer(A)`, `A > 0`?

