This is an automated email from the ASF dual-hosted git repository. jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/COUCHDB-3326-clustered-purge-pr5-implementation by this push: new 407b1dc Address Ilya's comments 407b1dc is described below commit 407b1dcd337f3e54c051c3cfa7e671726ff6bb7e Author: jiangphcn <jian...@cn.ibm.com> AuthorDate: Thu Jun 21 16:10:06 2018 +0800 Address Ilya's comments - use EPI decider to perform indexer existence checker function - swap "stop_couch/1" and "remove files" - refactor reason for throw bad_request - directly use meck:unload/0 COUCHDB-3326 --- src/couch/src/couch_db.erl | 37 ++++------------ src/couch/src/couch_db_plugin.erl | 4 ++ src/couch/test/couch_bt_engine_upgrade_tests.erl | 4 +- .../src/couch_index_plugin_couch_db.erl | 10 +++++ src/couch_mrview/src/couch_mrview_index.erl | 2 - src/couch_mrview/src/couch_mrview_util.erl | 5 +-- .../test/couch_mrview_purge_docs_fabric_tests.erl | 2 +- .../src/cpse_test_purge_bad_checkpoints.erl | 49 ++-------------------- src/mem3/src/mem3_epi.erl | 3 +- .../src/mem3_plugin_couch_db.erl} | 13 ++++-- src/mem3/src/mem3_rep.erl | 6 +-- 11 files changed, 44 insertions(+), 91 deletions(-) diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 2645ea2..bbf7537 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -81,7 +81,7 @@ get_minimum_purge_seq/1, purge_client_exists/3, - get_purge_client_fun/2, + get_purge_client_fun/1, update_doc/3, update_doc/4, @@ -475,7 +475,8 @@ purge_client_exists(DbName, DocId, Props) -> LagThreshold = NowSecs - LagWindow, try - CheckFun = get_purge_client_fun(DocId, Props), + Type = couch_util:get_value(<<"type">>, Props), + CheckFun = get_purge_client_fun(Type), Exists = CheckFun(Props), if not Exists -> ok; true -> Updated = couch_util:get_value(<<"updated_on">>, Props), @@ -496,33 +497,13 @@ purge_client_exists(DbName, DocId, Props) -> end. -get_purge_client_fun(DocId, Props) -> - M0 = couch_util:get_value(<<"verify_module">>, Props), - M = try - binary_to_existing_atom(M0, latin1) - catch error:badarg -> - Fmt1 = "Missing index module '~p' for purge checkpoint '~s'", - couch_log:error(Fmt1, [M0, DocId]), - throw(failed) - end, +get_purge_client_fun(Type) -> + couch_db_plugin:get_purge_client_fun( + Type, fun get_purge_client_fun_int/1). - F0 = couch_util:get_value(<<"verify_function">>, Props), - try - F = binary_to_existing_atom(F0, latin1), - case erlang:function_exported(M, F, 1) of - true -> - fun M:F/1; - false -> - Fmt2 = "Missing exported function '~p' in '~p' - for purge checkpoint '~s'", - couch_log:error(Fmt2, [F0, M0, DocId]), - throw(failed) - end - catch error:badarg -> - Fmt3 = "Missing function '~p' in '~p' for purge checkpoint '~s'", - couch_log:error(Fmt3, [F0, M0, DocId]), - throw(failed) - end. + +get_purge_client_fun_int(_Type) -> + undefined. set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl index 8163256..2f61464 100644 --- a/src/couch/src/couch_db_plugin.erl +++ b/src/couch/src/couch_db_plugin.erl @@ -18,6 +18,7 @@ after_doc_read/2, validate_docid/1, check_is_admin/1, + get_purge_client_fun/2, on_compact/2, on_delete/2 ]). @@ -57,6 +58,9 @@ check_is_admin(Db) -> %% callbacks return true only if it specifically allow the given Id couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []). +get_purge_client_fun(Type, Default) -> + maybe_handle(get_purge_client_fun, [Type], Default). + on_compact(DbName, DDocs) -> Handle = couch_epi:get_handle(?SERVICE_ID), couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []). diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl index 6aef366..8c748f8 100644 --- a/src/couch/test/couch_bt_engine_upgrade_tests.erl +++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl @@ -37,10 +37,10 @@ setup() -> teardown({Ctx, Paths}) -> + test_util:stop_couch(Ctx), lists:foreach(fun(Path) -> file:delete(Path) - end, Paths), - test_util:stop_couch(Ctx). + end, Paths). upgrade_test_() -> diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl index 937f7c8..43b2976 100644 --- a/src/couch_index/src/couch_index_plugin_couch_db.erl +++ b/src/couch_index/src/couch_index_plugin_couch_db.erl @@ -13,9 +13,19 @@ -module(couch_index_plugin_couch_db). -export([ + get_purge_client_fun/1, on_compact/2 ]). +get_purge_client_fun(Type) -> + case Type of + <<"mrview">> -> + {decided, fun couch_mrview_index:verify_index_exists/1}; + _ -> + no_decision + end. + + on_compact(DbName, DDocs) -> couch_mrview_index:ensure_local_purge_docs(DbName, DDocs). diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 7756f52..02b8941 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -305,8 +305,6 @@ update_local_purge_doc(Db, State, PSeq) -> {<<"type">>, <<"mrview">>}, {<<"purge_seq">>, PSeq}, {<<"updated_on">>, NowSecs}, - {<<"verify_module">>, <<"couch_mrview_index">>}, - {<<"verify_function">>, <<"verify_index_exists">>}, {<<"dbname">>, get(db_name, State)}, {<<"ddoc_id">>, get(idx_name, State)}, {<<"signature">>, Sig} diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index b274961..a9ae661 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -44,14 +44,13 @@ get_local_purge_doc_id(Sig) -> - Version = "v" ++ config:get("purge", "version", "1") ++ "-", - ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Version ++ Sig). + ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig). get_value_from_options(Key, Options) -> case couch_util:get_value(Key, Options) of undefined -> - Reason = binary_to_list(Key) ++ " must exist in Options.", + Reason = <<"'", Key/binary, "' must exists in options.">>, throw({bad_request, Reason}); Value -> Value end. diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl index 99bfa9e..6b0d4d9 100644 --- a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl +++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl @@ -31,7 +31,7 @@ setup() -> teardown(DbName) -> - meck:unload(couch_mrview_index), + meck:unload(), ok = fabric:delete_db(DbName, [?ADMIN_CTX]). diff --git a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl index 3d5edb1..b511e01 100644 --- a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl +++ b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl @@ -48,7 +48,7 @@ teardown_each(Db) -> cpse_bad_purge_seq(Db1) -> - Db2 = save_local_doc(Db1, <<"foo">>, ?MODULE, valid_fun), + Db2 = save_local_doc(Db1, <<"foo">>), ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), ok = couch_db:set_purge_infos_limit(Db2, 5), @@ -56,35 +56,8 @@ cpse_bad_purge_seq(Db1) -> ?assertEqual(1, couch_db:get_minimum_purge_seq(Db3)). -cpse_bad_verify_mod(Db1) -> - Db2 = save_local_doc(Db1, 2, [invalid_module], valid_fun), - ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), - - ok = couch_db:set_purge_infos_limit(Db2, 5), - {ok, Db3} = couch_db:reopen(Db2), - ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). - - -cpse_bad_verify_fun(Db1) -> - Db2 = save_local_doc(Db1, 2, ?MODULE, [invalid_function]), - ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), - - ok = couch_db:set_purge_infos_limit(Db2, 5), - {ok, Db3} = couch_db:reopen(Db2), - ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). - - -cpse_verify_fun_throws(Db1) -> - Db2 = save_local_doc(Db1, 2, ?MODULE, throw_fun), - ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), - - ok = couch_db:set_purge_infos_limit(Db2, 5), - {ok, Db3} = couch_db:reopen(Db2), - ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). - - cpse_verify_non_boolean(Db1) -> - Db2 = save_local_doc(Db1, 2, ?MODULE, non_bool_fun), + Db2 = save_local_doc(Db1, 2), ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), ok = couch_db:set_purge_infos_limit(Db2, 5), @@ -92,30 +65,16 @@ cpse_verify_non_boolean(Db1) -> ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)). -save_local_doc(Db1, PurgeSeq, Mod, Fun) -> +save_local_doc(Db1, PurgeSeq) -> {Mega, Secs, _} = os:timestamp(), NowSecs = Mega * 1000000 + Secs, Doc = couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE({[ - {<<"_id">>, <<"_local/purge-test-stuff-v1">>}, + {<<"_id">>, <<"_local/purge-test-stuff">>}, {<<"purge_seq">>, PurgeSeq}, {<<"timestamp_utc">>, NowSecs}, - {<<"verify_module">>, Mod}, - {<<"verify_function">>, Fun}, {<<"verify_options">>, {[{<<"signature">>, <<"stuff">>}]}}, {<<"type">>, <<"test">>} ]}))), {ok, _} = couch_db:update_doc(Db1, Doc, []), {ok, Db2} = couch_db:reopen(Db1), Db2. - - -valid_fun(_Options) -> - true. - - -throw_fun(_Options) -> - throw(failed). - - -not_bool(_Options) -> - ok. diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl index ebcd596..4bf2bf5 100644 --- a/src/mem3/src/mem3_epi.erl +++ b/src/mem3/src/mem3_epi.erl @@ -30,7 +30,8 @@ app() -> providers() -> [ - {chttpd_handlers, mem3_httpd_handlers} + {couch_db, mem3_plugin_couch_db}, + {chttpd_handlers, mem3_httpd_handlers} ]. diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/mem3/src/mem3_plugin_couch_db.erl similarity index 68% copy from src/couch_index/src/couch_index_plugin_couch_db.erl copy to src/mem3/src/mem3_plugin_couch_db.erl index 937f7c8..40b4113 100644 --- a/src/couch_index/src/couch_index_plugin_couch_db.erl +++ b/src/mem3/src/mem3_plugin_couch_db.erl @@ -10,12 +10,17 @@ % License for the specific language governing permissions and limitations under % the License. --module(couch_index_plugin_couch_db). +-module(mem3_plugin_couch_db). -export([ - on_compact/2 + get_purge_client_fun/1 ]). -on_compact(DbName, DDocs) -> - couch_mrview_index:ensure_local_purge_docs(DbName, DDocs). +get_purge_client_fun(Type) -> + case Type of + <<"internal_replication">> -> + {decided, fun mem3_rep:verify_purge_checkpoint/1}; + _ -> + no_decision + end. diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 3f224cd..cff10aa 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -122,9 +122,7 @@ make_local_id(SourceThing, TargetThing, Filter) -> make_purge_id(SourceUUID, TargetUUID) -> - Version = "v" ++ config:get("purge", "version", "1") ++ "-", - ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mem3-" ++ Version ++ - ?b2l(SourceUUID) ++ "-" ++ ?b2l(TargetUUID)). + <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>. verify_purge_checkpoint(Props) -> @@ -500,8 +498,6 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) -> {<<"type">>, <<"internal_replication">>}, {<<"updated_on">>, NowSecs}, {<<"purge_seq">>, PurgeSeq}, - {<<"verify_module">>, <<"mem3_rep">>}, - {<<"verify_function">>, <<"verify_purge_checkpoint">>}, {<<"dbname">>, Source#shard.dbname}, {<<"source">>, atom_to_binary(Source#shard.node, latin1)}, {<<"target">>, atom_to_binary(Target#shard.node, latin1)},