davisp commented on code in PR #4491:
URL: https://github.com/apache/couchdb/pull/4491#discussion_r1145347376
##########
src/couch_index/src/couch_index_server.erl:
##########
@@ -169,9 +173,10 @@ handle_call({async_open, {DbName, DDocId, Sig}, {ok,
Pid}}, _From, State) ->
link(Pid),
add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {Pid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
+ ets:delete(State#st.openers, Pid),
Review Comment:
You also need a delete call in async_open.
##########
src/couch_index/src/couch_index_server.erl:
##########
@@ -286,67 +302,92 @@ reset_indexes(DbName, #st{} = State) ->
add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
- ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
+ ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}),
+ ets:delete(St#st.openers, Pid).
Review Comment:
I think you're confusing pids here. The openers contain opener pids, where
this Pid is an indexer pid that's not in openers. Check the `new_index` code to
show how we send back the indexer pid before unlinking it.
##########
src/couch_index/src/couch_index_server.erl:
##########
@@ -286,67 +302,92 @@ reset_indexes(DbName, #st{} = State) ->
add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
- ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
+ ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}),
+ ets:delete(St#st.openers, Pid).
rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
ets:delete(St#st.by_sig, {DbName, Sig}),
ets:delete(St#st.by_pid, Pid),
+ ets:delete(St#st.openers, Pid),
Review Comment:
Samesies as lost comment.
##########
src/couch_index/src/couch_index_server.erl:
##########
@@ -286,67 +302,92 @@ reset_indexes(DbName, #st{} = State) ->
add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
- ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
+ ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}),
+ ets:delete(St#st.openers, {Pid, {DbName, Sig}}).
rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
ets:delete(St#st.by_sig, {DbName, Sig}),
ets:delete(St#st.by_pid, Pid),
+ ets:delete(St#st.openers, {Pid, {DbName, Sig}}),
lists:foreach(
fun(DDocId) ->
ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
end,
DDocIds
).
+rem_from_ets(DbName, #st{} = State) ->
+ SigDDocIds = lists:foldl(
+ fun({_, {DDocId, Sig}}, DDict) ->
+ dict:append(Sig, DDocId, DDict)
+ end,
+ dict:new(),
+ ets:lookup(State#st.by_db, DbName)
+ ),
+ Fun = fun({Sig, DDocIds}) ->
+ [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+ unlink(Pid),
+ receive
+ {'EXIT', Pid, _} ->
+ ok
+ after 0 ->
+ ok
+ end,
+ rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+ end,
+ lists:foreach(Fun, dict:to_list(SigDDocIds)).
+
handle_db_event(DbName, created, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St)
->
- DDocResult = couch_util:with_db(DbName, fun(Db) ->
- couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
- end),
- LocalShards =
- try
- mem3:local_shards(mem3:dbname(DbName))
- catch
- Class:Msg ->
- couch_log:warning(
- "~p got ~p:~p when fetching local shards for ~p",
- [?MODULE, Class, Msg, DbName]
- ),
- []
- end,
- DbShards = [mem3:name(Sh) || Sh <- LocalShards],
- lists:foreach(
- fun(DbShard) ->
- lists:foreach(
- fun({_DbShard, {_DDocId, Sig}}) ->
- % check if there are other ddocs with the same Sig for the
same db
- SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1',
Sig}}),
- if
- length(SigDDocs) > 1 ->
- % remove records from by_db for this DDoc
- Args = [DbShard, DDocId, Sig],
- gen_server:cast(St#st.server_name, {rem_from_ets,
Args});
- true ->
- % single DDoc with this Sig - close couch_index
processes
- case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
- [{_, IndexPid}] ->
- (catch gen_server:cast(IndexPid,
{ddoc_updated, DDocResult}));
- [] ->
- []
- end
- end
- end,
- ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
- )
- end,
- DbShards
- ),
- {ok, St};
+ %% this handle_db_event function must not crash (or it takes down the
couch_index_server)
+ try
+ DDocResult = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
+ end),
+ LocalShards = mem3:local_shards(mem3:dbname(DbName)),
+ DbShards = [mem3:name(Sh) || Sh <- LocalShards],
+ lists:foreach(
+ fun(DbShard) ->
+ lists:foreach(
+ fun({_DbShard, {_DDocId, Sig}}) ->
+ % check if there are other ddocs with the same Sig for
the same db
+ SigDDocs = ets:match_object(St#st.by_db, {DbShard,
{'$1', Sig}}),
+ if
+ length(SigDDocs) > 1 ->
+ % remove records from by_db for this DDoc
+ Args = [DbShard, DDocId, Sig],
+ gen_server:cast(St#st.server_name,
{rem_from_ets, Args});
+ true ->
+ % single DDoc with this Sig - close
couch_index processes
+ case ets:lookup(St#st.by_sig, {DbShard, Sig})
of
+ [{_, IndexPid}] ->
+ (catch gen_server:cast(
+ IndexPid, {ddoc_updated,
DDocResult}
+ ));
+ [] ->
+ []
+ end
+ end
+ end,
+ ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
+ )
+ end,
+ DbShards
+ ),
+ {ok, St}
+ catch
+ Class:Reason:Stack ->
+ couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p,
stack ~p", [
+ ?MODULE, Class, DbName, Reason, Stack
+ ]),
+ gen_server:cast(St#st.server_name, {rem_from_ets, [DbName]}),
Review Comment:
I'm with @rnewson that the `[DbName]` seems cleaner than the slightly more
subtle `when is_binary` guard.
##########
src/couch_index/src/couch_index_server.erl:
##########
@@ -286,67 +302,92 @@ reset_indexes(DbName, #st{} = State) ->
add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
- ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
+ ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}),
+ ets:delete(St#st.openers, {Pid, {DbName, Sig}}).
rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
ets:delete(St#st.by_sig, {DbName, Sig}),
ets:delete(St#st.by_pid, Pid),
+ ets:delete(St#st.openers, {Pid, {DbName, Sig}}),
lists:foreach(
fun(DDocId) ->
ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
end,
DDocIds
).
+rem_from_ets(DbName, #st{} = State) ->
+ SigDDocIds = lists:foldl(
+ fun({_, {DDocId, Sig}}, DDict) ->
+ dict:append(Sig, DDocId, DDict)
+ end,
+ dict:new(),
+ ets:lookup(State#st.by_db, DbName)
+ ),
+ Fun = fun({Sig, DDocIds}) ->
+ [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+ unlink(Pid),
+ receive
+ {'EXIT', Pid, _} ->
+ ok
+ after 0 ->
+ ok
+ end,
+ rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+ end,
+ lists:foreach(Fun, dict:to_list(SigDDocIds)).
+
handle_db_event(DbName, created, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St)
->
- DDocResult = couch_util:with_db(DbName, fun(Db) ->
- couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
- end),
- LocalShards =
- try
- mem3:local_shards(mem3:dbname(DbName))
- catch
- Class:Msg ->
- couch_log:warning(
- "~p got ~p:~p when fetching local shards for ~p",
- [?MODULE, Class, Msg, DbName]
- ),
- []
- end,
- DbShards = [mem3:name(Sh) || Sh <- LocalShards],
- lists:foreach(
- fun(DbShard) ->
- lists:foreach(
- fun({_DbShard, {_DDocId, Sig}}) ->
- % check if there are other ddocs with the same Sig for the
same db
- SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1',
Sig}}),
- if
- length(SigDDocs) > 1 ->
- % remove records from by_db for this DDoc
- Args = [DbShard, DDocId, Sig],
- gen_server:cast(St#st.server_name, {rem_from_ets,
Args});
- true ->
- % single DDoc with this Sig - close couch_index
processes
- case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
- [{_, IndexPid}] ->
- (catch gen_server:cast(IndexPid,
{ddoc_updated, DDocResult}));
- [] ->
- []
- end
- end
- end,
- ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
- )
- end,
- DbShards
- ),
- {ok, St};
+ %% this handle_db_event function must not crash (or it takes down the
couch_index_server)
+ try
+ DDocResult = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
+ end),
+ LocalShards = mem3:local_shards(mem3:dbname(DbName)),
+ DbShards = [mem3:name(Sh) || Sh <- LocalShards],
+ lists:foreach(
+ fun(DbShard) ->
+ lists:foreach(
+ fun({_DbShard, {_DDocId, Sig}}) ->
+ % check if there are other ddocs with the same Sig for
the same db
+ SigDDocs = ets:match_object(St#st.by_db, {DbShard,
{'$1', Sig}}),
+ if
+ length(SigDDocs) > 1 ->
+ % remove records from by_db for this DDoc
+ Args = [DbShard, DDocId, Sig],
+ gen_server:cast(St#st.server_name,
{rem_from_ets, Args});
+ true ->
+ % single DDoc with this Sig - close
couch_index processes
+ case ets:lookup(St#st.by_sig, {DbShard, Sig})
of
+ [{_, IndexPid}] ->
+ (catch gen_server:cast(
+ IndexPid, {ddoc_updated,
DDocResult}
+ ));
+ [] ->
+ []
+ end
+ end
+ end,
+ ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
+ )
+ end,
+ DbShards
Review Comment:
+1, nesting is a bit deep and a helper would help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]