iilyak commented on code in PR #4812:
URL: https://github.com/apache/couchdb/pull/4812#discussion_r1449169955


##########
src/couch_stats/src/couch_stats_resource_tracker.erl:
##########
@@ -0,0 +1,1014 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stats_resource_tracker).
+
+-behaviour(gen_server).
+
+-export([
+    start_link/0,
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3,
+    terminate/2
+]).
+
+-export([
+    inc/1, inc/2,
+    maybe_inc/2,
+    get_pid_ref/0,
+    accumulate_delta/1
+]).
+
+-export([
+    create_context/0, create_context/1, create_context/3,
+    create_coordinator_context/1, create_coordinator_context/2,
+    is_enabled/0,
+    get_resource/0,
+    get_resource/1,
+    set_context_dbname/1,
+    set_context_handler_fun/1,
+    set_context_username/1,
+    track/1,
+    should_track/1
+]).
+
+-export([
+    active/0,
+    active_coordinators/0,
+    active_workers/0,
+    find_unmonitored/0
+]).
+
+-export([
+    count_by/1,
+    group_by/2,
+    group_by/3,
+    group_by/4,
+    sorted/1,
+    sorted_by/1,
+    sorted_by/2,
+    sorted_by/3,
+
+    find_by_pid/1,
+
+    unsafe_foldl/3,
+
+    term_to_flat_json/1
+]).
+
+-export([
+    make_delta/0
+]).
+
+%% Singular increment operations
+-export([
+    db_opened/0,
+    doc_read/0,
+    row_read/0,
+    btree_fold/0,
+    ioq_called/0,
+    js_evaled/0,
+    js_filtered/0,
+    js_filtered_error/0,
+    js_filtered_doc/0,
+    mango_match_evaled/0,
+    get_kv_node/0,
+    get_kp_node/0
+]).
+
+%% Plural increment operations
+-export([
+    js_filtered_docs/1,
+    io_bytes_read/1,
+    io_bytes_written/1
+]).
+
+-export([
+    field/2,
+    curry_field/1
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+%% Use these for record upgrades over the wire and in ETS tables
+%% TODO: alternatively, just delete these. Currently using a map
+%% for shipping deltas over the wire, avoiding much of the
+%% problem here. We'll likely still need to handle upgrades to
+%% map format over time, so let's decide a course of action here.
+-define(RCTX_V1, rctx_v1).
+-define(RCTX, ?RCTX_V1).
+
+-define(MANGO_EVAL_MATCH, mango_eval_match).
+-define(DB_OPEN_DOC, docs_read).
+-define(DB_OPEN, db_open).
+-define(COUCH_SERVER_OPEN, db_open).
+-define(COUCH_BT_FOLDS, btree_folds).
+-define(COUCH_BT_GET_KP_NODE, get_kp_node).
+-define(COUCH_BT_GET_KV_NODE, get_kv_node).
+-define(COUCH_JS_FILTER, js_filter).
+-define(COUCH_JS_FILTER_ERROR, js_filter_error).
+-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs).
+-define(ROWS_READ, rows_read).
+
+%% TODO: overlap between this and couch btree fold invocations
+%% TODO: need some way to distinguish fols on views vs find vs all_docs
+-define(FRPC_CHANGES_ROW, changes_processed).
+-define(FRPC_CHANGES_RETURNED, changes_returned).
+%%-define(FRPC_CHANGES_ROW, ?ROWS_READ).
+
+%% Module pdict markers
+-define(DELTA_TA, csrt_delta_ta).
+-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0
+-define(PID_REF, csrt_pid_ref). %% track local ID
+
+
+-record(st, {
+    eviction_delay = 10 * 1000, %% How many ms dead processes are visible
+    scan_interval = 2048, %% How regularly to perfom scans
+    tracking = #{} %% track active processes for eventual eviction
+}).
+
+
+%% TODO: switch to:
+%% -record(?RCTX, {
+-record(rctx, {
+    %% Metadata
+    started_at = tnow(),
+    updated_at = tnow(),
+    exited_at, %% TODO: do we need a final exit time and additional update 
times afterwards?
+    pid_ref,
+    mon_ref,
+    mfa,
+    nonce,
+    from,
+    type = unknown, %% 
unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
+    state = alive,
+    dbname,
+    username,
+
+    %% Stats counters
+    db_open = 0,
+    docs_read = 0,
+    rows_read = 0,
+    btree_folds = 0,
+    changes_processed = 0,
+    changes_returned = 0,
+    ioq_calls = 0,
+    io_bytes_read = 0,
+    io_bytes_written = 0,
+    js_evals = 0,
+    js_filter = 0,
+    js_filter_error = 0,
+    js_filtered_docs = 0,
+    mango_eval_match = 0,
+    %% TODO: switch record definitions to be macro based, eg:
+    %% ?COUCH_BT_GET_KP_NODE = 0,
+    get_kv_node = 0,
+    get_kp_node = 0
+}).
+
+%% monotonic time now in millisecionds
+tnow() ->
+    erlang:monotonic_time(millisecond).
+
+is_enabled() ->
+    config:get_boolean(?MODULE_STRING, "enabled", true).
+
+db_opened() -> inc(db_opened).
+doc_read() -> inc(docs_read).
+row_read() -> inc(rows_read).
+btree_fold() -> inc(?COUCH_BT_FOLDS).
+%% TODO: do we need ioq_called and this access pattern?
+ioq_called() -> is_enabled() andalso inc(ioq_calls).
+js_evaled() -> inc(js_evals).
+js_filtered() -> inc(js_filter).
+js_filtered_error() -> inc(js_filter_error).
+js_filtered_doc() -> inc(js_filtered_docs).
+mango_match_evaled() -> inc(mango_eval_match).
+get_kv_node() -> inc(get_kv_node).
+get_kp_node() -> inc(get_kp_node).
+
+js_filtered_docs(N) -> inc(js_filtered_docs, N).
+io_bytes_read(N) -> inc(io_bytes_read, N).
+io_bytes_written(N) -> inc(io_bytes_written, N).
+
+inc(?DB_OPEN) ->
+    inc(?DB_OPEN, 1);
+inc(docs_read) ->
+    inc(docs_read, 1);
+inc(?ROWS_READ) ->
+    inc(?ROWS_READ, 1);
+inc(?FRPC_CHANGES_RETURNED) ->
+    inc(?FRPC_CHANGES_RETURNED, 1);
+inc(?COUCH_BT_FOLDS) ->
+    inc(?COUCH_BT_FOLDS, 1);
+inc(ioq_calls) ->
+    inc(ioq_calls, 1);
+inc(io_bytes_read) ->
+    inc(io_bytes_read, 1);
+inc(io_bytes_written) ->
+    inc(io_bytes_written, 1);
+inc(js_evals) ->
+    inc(js_evals, 1);
+inc(?COUCH_JS_FILTER) ->
+    inc(?COUCH_JS_FILTER, 1);
+inc(?COUCH_JS_FILTER_ERROR) ->
+    inc(?COUCH_JS_FILTER_ERROR, 1);
+inc(?COUCH_JS_FILTERED_DOCS) ->
+    inc(?COUCH_JS_FILTERED_DOCS, 1);
+inc(?MANGO_EVAL_MATCH) ->
+    inc(?MANGO_EVAL_MATCH, 1);
+inc(?COUCH_BT_GET_KP_NODE) ->
+    inc(?COUCH_BT_GET_KP_NODE, 1);
+inc(?COUCH_BT_GET_KV_NODE) ->
+    inc(?COUCH_BT_GET_KV_NODE, 1);
+inc(_) ->
+    0.
+
+
+inc(?DB_OPEN, N) ->
+    update_counter(#rctx.?DB_OPEN, N);
+inc(?ROWS_READ, N) ->
+    update_counter(#rctx.?ROWS_READ, N);
+inc(?FRPC_CHANGES_RETURNED, N) ->
+    update_counter(#rctx.?FRPC_CHANGES_RETURNED, N);
+inc(ioq_calls, N) ->
+    update_counter(#rctx.ioq_calls, N);
+inc(io_bytes_read, N) ->
+    update_counter(#rctx.io_bytes_read, N);
+inc(io_bytes_written, N) ->
+    update_counter(#rctx.io_bytes_written, N);
+inc(js_evals, N) ->
+    update_counter(#rctx.js_evals, N);
+inc(?COUCH_JS_FILTER, N) ->
+    update_counter(#rctx.?COUCH_JS_FILTER, N);
+inc(?COUCH_JS_FILTER_ERROR, N) ->
+    update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N);
+inc(?COUCH_JS_FILTERED_DOCS, N) ->
+    update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N);
+inc(?MANGO_EVAL_MATCH, N) ->
+    update_counter(#rctx.?MANGO_EVAL_MATCH, N);
+inc(?DB_OPEN_DOC, N) ->
+    update_counter(#rctx.?DB_OPEN_DOC, N);
+inc(?FRPC_CHANGES_ROW, N) ->
+    update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of 
rows_read
+inc(?COUCH_BT_GET_KP_NODE, N) ->
+    update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N);
+inc(?COUCH_BT_GET_KV_NODE, N) ->
+    update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N);
+inc(_, _) ->
+    %% inc needs to allow unknown types to pass for accumulate_update to handle
+    %% updates from nodes with newer data formats
+    0.
+
+maybe_inc([mango, evaluate_selector], Val) ->
+    inc(?MANGO_EVAL_MATCH, Val);
+maybe_inc([couchdb, database_reads], Val) ->
+    inc(?DB_OPEN_DOC, Val);
+maybe_inc([fabric_rpc, changes, processed], Val) ->
+    inc(?FRPC_CHANGES_ROW, Val);
+maybe_inc([fabric_rpc, changes, returned], Val) ->
+    inc(?FRPC_CHANGES_RETURNED, Val);
+maybe_inc([fabric_rpc, view, rows_read], Val) ->
+    inc(?ROWS_READ, Val);
+maybe_inc([couchdb, couch_server, open], Val) ->
+    inc(?DB_OPEN, Val);
+maybe_inc([couchdb, btree, folds], Val) ->
+    inc(?COUCH_BT_FOLDS, Val);
+maybe_inc([couchdb, btree, kp_node], Val) ->
+    inc(?COUCH_BT_GET_KP_NODE, Val);
+maybe_inc([couchdb, btree, kv_node], Val) ->
+    inc(?COUCH_BT_GET_KV_NODE, Val);
+maybe_inc([couchdb, query_server, js_filter_error], Val) ->
+    inc(?COUCH_JS_FILTER_ERROR, Val);
+maybe_inc([couchdb, query_server, js_filter], Val) ->
+    inc(?COUCH_JS_FILTER, Val);
+maybe_inc([couchdb, query_server, js_filtered_docs], Val) ->
+    inc(?COUCH_JS_FILTERED_DOCS, Val);
+maybe_inc(_Metric, _Val) ->
+    %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]),
+    0.
+
+
+%% TODO: update stats_descriptions.cfg for relevant apps
+should_track([fabric_rpc, all_docs, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, changes, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, changes, processed]) ->
+    is_enabled();
+should_track([fabric_rpc, changes, returned]) ->
+    is_enabled();
+should_track([fabric_rpc, map_view, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, reduce_view, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, get_all_security, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, open_doc, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, update_docs, spawned]) ->
+    is_enabled();
+should_track([fabric_rpc, open_shard, spawned]) ->
+    is_enabled();
+should_track([mango_cursor, view, all_docs]) ->
+    is_enabled();
+should_track([mango_cursor, view, idx]) ->
+    is_enabled();
+should_track(_Metric) ->
+    %%io:format("SKIPPING METRIC: ~p~n", [Metric]),
+    false.
+
+accumulate_delta(Delta) when is_map(Delta) ->
+    %% TODO: switch to creating a batch of updates to invoke a single
+    %% update_counter rather than sequentially invoking it for each field
+    is_enabled() andalso maps:foreach(fun inc/2, Delta);
+accumulate_delta(undefined) ->
+    ok;
+accumulate_delta(Other) ->
+    io:format("CSRT:ACC_DELTA UNKNOWN DELTA: ~p~n", [Other]).
+
+
+update_counter(Field, Count) ->
+    is_enabled() andalso update_counter(get_pid_ref(), Field, Count).
+
+
+update_counter({_Pid,_Ref}=PidRef, Field, Count) ->
+    %% TODO: mem3 crashes without catch, why do we lose the stats table?
+    is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, 
Count}, #rctx{pid_ref=PidRef}).
+
+
+active() -> active_int(all).
+active_coordinators() -> active_int(coordinators).
+active_workers() -> active_int(workers).
+
+
+active_int(coordinators) ->
+    select_by_type(coordinators);
+active_int(workers) ->
+    select_by_type(workers);
+active_int(all) ->
+    lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)).
+
+
+select_by_type(coordinators) ->
+    ets:select(couch_stats_resource_tracker,
+        [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]);
+select_by_type(workers) ->
+    ets:select(couch_stats_resource_tracker,
+        [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]);
+select_by_type(all) ->
+    lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)).
+
+
+field(#rctx{pid_ref=Val}, pid_ref) -> Val;
+%% NOTE: Pros and cons to doing these convert functions here
+%% Ideally, this would be done later so as to prefer the core data structures
+%% as long as possible, but we currently need the output of this function to
+%% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by
+%% structure provided by the caller of *_by aggregator functions below.
+%% For now, we just always return jiffy:encode'able data types.
+field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val);
+field(#rctx{nonce=Val}, nonce) -> Val;
+field(#rctx{from=Val}, from) -> Val;
+field(#rctx{type=Val}, type) -> convert_type(Val);
+field(#rctx{state=Val}, state) -> Val;
+field(#rctx{dbname=Val}, dbname) -> Val;
+field(#rctx{username=Val}, username) -> Val;
+field(#rctx{db_open=Val}, db_open) -> Val;
+field(#rctx{docs_read=Val}, docs_read) -> Val;
+field(#rctx{rows_read=Val}, rows_read) -> Val;
+field(#rctx{btree_folds=Val}, btree_folds) -> Val;
+field(#rctx{changes_processed=Val}, changes_processed) -> Val;
+field(#rctx{changes_returned=Val}, changes_returned) -> Val;
+field(#rctx{ioq_calls=Val}, ioq_calls) -> Val;
+field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val;
+field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val;
+field(#rctx{js_evals=Val}, js_evals) -> Val;
+field(#rctx{js_filter=Val}, js_filter) -> Val;
+field(#rctx{js_filter_error=Val}, js_filter_error) -> Val;
+field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val;
+field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val;
+field(#rctx{get_kv_node=Val}, get_kv_node) -> Val;
+field(#rctx{get_kp_node=Val}, get_kp_node) -> Val.
+
+
+curry_field(Field) ->
+    fun(Ele) -> field(Ele, Field) end.
+
+
+count_by(KeyFun) ->
+    group_by(KeyFun, fun(_) -> 1 end).
+
+
+group_by(KeyFun, ValFun) ->
+    group_by(KeyFun, ValFun, fun erlang:'+'/2).
+
+
+group_by(KeyFun, ValFun, AggFun) ->
+    group_by(KeyFun, ValFun, AggFun, fun ets:foldl/3).
+
+
+%% eg: group_by(mfa, docs_read).
+%% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls).
+%% eg: ^^ or: group_by([mfa, docs_read], ioq_calls).
+%% eg: group_by([username, dbname, mfa], docs_read).
+%% eg: group_by([username, dbname, mfa], ioq_calls).
+%% eg: group_by([username, dbname, mfa], js_filters).
+group_by(KeyL, ValFun, AggFun, Fold) when is_list(KeyL) ->
+    KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end,
+    group_by(KeyFun, ValFun, AggFun, Fold);
+group_by(Key, ValFun, AggFun, Fold) when is_atom(Key) ->
+    group_by(curry_field(Key), ValFun, AggFun, Fold);
+group_by(KeyFun, Val, AggFun, Fold) when is_atom(Val) ->
+    group_by(KeyFun, curry_field(Val), AggFun, Fold);
+group_by(KeyFun, ValFun, AggFun, Fold) ->
+    FoldFun = fun(Ele, Acc) ->
+        Key = KeyFun(Ele),
+        Val = ValFun(Ele),
+        CurrVal = maps:get(Key, Acc, 0),
+        NewVal = AggFun(CurrVal, Val),
+        maps:put(Key, NewVal, Acc)
+    end,
+    Fold(FoldFun, #{}, ?MODULE).
+
+
+%% Sorts largest first
+sorted(Map) when is_map(Map) ->
+    lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)).
+
+
+%% eg: sorted_by([username, dbname, mfa], ioq_calls)
+%% eg: sorted_by([dbname, mfa], doc_reads)
+sorted_by(KeyFun) -> sorted(count_by(KeyFun)).
+sorted_by(KeyFun, ValFun) -> sorted(group_by(KeyFun, ValFun)).
+sorted_by(KeyFun, ValFun, AggFun) -> sorted(group_by(KeyFun, ValFun, AggFun)).
+
+
+term_to_flat_json({shutdown, Reason0}) when is_atom(Reason0) ->
+    Reason = atom_to_binary(Reason0),
+    <<"shutdown: ", Reason/binary>>;
+term_to_flat_json({type, Atom}) when is_atom(Atom) ->
+    atom_to_binary(Atom);
+term_to_flat_json({type, {coordinator, Verb0, Path0}}=_Type) ->
+    Verb = atom_to_binary(Verb0),
+    Path = list_to_binary(Path0),
+    <<"coordinator:", Verb/binary, ":", Path/binary>>;
+term_to_flat_json({type, {worker, M0, F0}}=_Type) ->
+    M = atom_to_binary(M0),
+    F = atom_to_binary(F0),
+    <<"worker:", M/binary, ":", F/binary>>;
+term_to_flat_json(Tuple) when is_tuple(Tuple) ->
+    erlang:tuple_to_list(Tuple);
+term_to_flat_json(Pid) when is_pid(Pid) ->
+    ?l2b(pid_to_list(Pid));
+term_to_flat_json(Ref) when is_reference(Ref) ->
+    ?l2b(ref_to_list(Ref));
+term_to_flat_json(Atom) when is_atom(Atom) ->
+    atom_to_binary(Atom);
+term_to_flat_json(undefined) ->
+    null;
+term_to_flat_json(null) ->
+    null;
+term_to_flat_json(T) ->
+    T.
+
+to_flat_json(#rctx{}=Rctx) ->
+    #rctx{
+        updated_at = TP,
+        started_at = TInit,
+        pid_ref = {Pid, Ref},
+        mfa = MFA0,
+        nonce = Nonce0,
+        from = From0,
+        dbname = DbName,
+        username = UserName,
+        db_open = DbOpens,
+        docs_read = DocsRead,
+        rows_read = RowsRead,
+        js_filter = JSFilters,
+        js_filter_error = JSFilterErrors,
+        js_filtered_docs = JSFilteredDocss,
+        state = State0,
+        type = Type,
+        get_kp_node = KpNodes,
+        get_kv_node = KvNodes,
+        btree_folds = ChangesProcessed,
+        changes_returned = ChangesReturned,
+        ioq_calls = IoqCalls
+    } = Rctx,
+    PidRef = {term_to_flat_json(Pid), term_to_flat_json(Ref)},
+    MFA = case MFA0 of
+        {M0, F0, A0} ->

Review Comment:
   ```
   {_, _, _} -> convert_mfa(MFA1)
   ```
   
   also do we want guards `is_atom(M)`, `is_atom(F)`, `is_integer(A)`, `A > 0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to