This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch batch-bulk-docs in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 230ce7b372fb5a4cbfcc7b95a24b5ba9f9ed7696 Author: Nick Vatamaniuc <vatam...@apache.org> AuthorDate: Tue May 19 12:52:02 2020 -0400 Add batching for _bulk_docs * Interactive (regular) _bulk_docs can be split into smaller transactions. * Non-interactive (replicated) _bulk_docs requests can be updated in batches with more than one document per transaction. --- src/fabric/src/fabric2_db.erl | 171 ++++++++++++++++++++++++++++++++++------- src/fabric/src/fabric2_fdb.erl | 8 ++ 2 files changed, 151 insertions(+), 28 deletions(-) diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 8764d4e..fb3a82d 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -156,6 +156,21 @@ -define(RETURN(Term), throw({?MODULE, Term})). +-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 5000000). + + +-record(bacc, { + db, + docs, + is_replicated, + batch_size, + options, + rev_futures, + seen, + seen_tx, + results +}). + create(DbName, Options) -> case validate_dbname(DbName) of @@ -861,18 +876,8 @@ update_docs(Db, Docs0, Options) -> Docs1 = apply_before_doc_update(Db, Docs0, Options), try validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)), - Resps0 = case lists:member(replicated_changes, Options) of - false -> - fabric2_fdb:transactional(Db, fun(TxDb) -> - update_docs_interactive(TxDb, Docs1, Options) - end); - true -> - lists:map(fun(Doc) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> - update_doc_int(TxDb, Doc, Options) - end) - end, Docs1) - end, + + Resps0 = batch_update_docs(Db, Docs1, Options), % Notify index builder fabric2_index:db_updated(name(Db)), @@ -895,7 +900,7 @@ update_docs(Db, Docs0, Options) -> Else end end, Resps0), - case lists:member(replicated_changes, Options) of + case is_replicated(Options) of true -> {ok, lists:flatmap(fun(R) -> case R of @@ -1647,9 +1652,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) -> <<?LOCAL_DOC_PREFIX, _/binary>> -> true; _ -> false end, - IsReplicated = lists:member(replicated_changes, Options), try - case {IsLocal, IsReplicated} of + case {IsLocal, is_replicated(Options)} of {false, false} -> update_doc_interactive(Db, Doc, Options); {false, true} -> update_doc_replicated(Db, Doc, Options); {true, _} -> update_local_doc(Db, Doc, Options) @@ -1659,17 +1663,115 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) -> end. -update_docs_interactive(Db, Docs0, Options) -> - Docs = tag_docs(Docs0), - Futures = get_winning_rev_futures(Db, Docs), - {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) -> - try - update_docs_interactive(Db, Doc, Options, Futures, SeenIds) - catch throw:{?MODULE, Return} -> - {Return, SeenIds} +batch_update_docs(Db, Docs, Options) -> + BAcc = #bacc{ + db = Db, + docs = Docs, + batch_size = get_batch_size(Options), + options = Options, + rev_futures = #{}, + seen = [], + seen_tx = [], + results = [] + }, + #bacc{results = Res} = batch_update_docs(BAcc), + lists:reverse(Res). + + +batch_update_docs(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_docs(#bacc{db = Db} = BAcc) -> + #bacc{ + db = Db, + docs = Docs, + options = Options + } = BAcc, + BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) -> + BAccTx = BAcc#bacc{db = TxDb}, + case is_replicated(Options) of + true -> + batch_update_replicated_tx(BAccTx); + false -> + Tagged = tag_docs(Docs), + RevFutures = get_winning_rev_futures(TxDb, Tagged), + BAccTx1 = BAccTx#bacc{ + docs = Tagged, + rev_futures = RevFutures + }, + batch_update_interactive_tx(BAccTx1) end - end, [], Docs), - Result. + end), + % Clean up after the transaction ends so we can recurse with a clean state + maps:map(fun(Tag, {fold_info, _St, Future}) when is_reference(Tag) -> + ok = erlfdb:cancel(Future) + end, BAccTx2#bacc.rev_futures), + BAcc1 = BAccTx2#bacc{ + db = Db, + rev_futures = #{}, + seen_tx = [] + }, + batch_update_docs(BAcc1). + + +batch_update_replicated_tx(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_replicated_tx(#bacc{} = BAcc) -> + #bacc{ + db = TxDb, + docs = [Doc | Docs], + options = Options, + batch_size = MaxSize, + seen_tx = SeenTx, + results = Results + } = BAcc, + case lists:member(Doc#doc.id, SeenTx) of + true -> + % If we already updated this doc in the current transaction, skip + % to the next transaction + BAcc; + false -> + Res = update_doc_int(TxDb, Doc, Options), + BAcc1 = BAcc#bacc{ + docs = Docs, + results = [Res | Results], + seen_tx = [Doc#doc.id | SeenTx] + }, + case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of + true -> BAcc1; + false -> batch_update_replicated_tx(BAcc1) + end + end. + + +batch_update_interactive_tx(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_interactive_tx(#bacc{} = BAcc) -> + #bacc{ + db = TxDb, + docs = [Doc | Docs], + options = Options, + batch_size = MaxSize, + rev_futures = RevFutures, + seen = Seen, + results = Results + } = BAcc, + {Res, Seen1} = try + update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen) + catch throw:{?MODULE, Return} -> + {Return, Seen} + end, + BAcc1 = BAcc#bacc{ + docs = Docs, + results = [Res | Results], + seen = Seen1 + }, + case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of + true -> BAcc1; + false -> batch_update_interactive_tx(BAcc1) + end. update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc, @@ -2122,9 +2224,8 @@ doc_to_revid(#doc{revs = Revs}) -> tag_docs([]) -> []; tag_docs([#doc{meta = Meta} = Doc | Rest]) -> - NewDoc = Doc#doc{ - meta = [{ref, make_ref()} | Meta] - }, + Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}), + NewDoc = Doc#doc{meta = Meta1}, [NewDoc | tag_docs(Rest)]. @@ -2226,3 +2327,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) -> fabric2_fdb:ensure_current(TxDb) end) end. + + +is_replicated(Options) when is_list(Options) -> + lists:member(replicated_changes, Options). + + +get_batch_size(Options) -> + case fabric2_util:get_value(batch_size, Options) of + undefined -> + config:get_integer("fabric", "update_docs_batch_size", + ?DEFAULT_UPDATE_DOCS_BATCH_SIZE); + Val when is_integer(Val) -> + Val + end. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index f274aa6..5ff1ed0 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -75,6 +75,8 @@ new_versionstamp/1, + get_approximate_tx_size/1, + debug_cluster/0, debug_cluster/2 ]). @@ -1159,6 +1161,12 @@ new_versionstamp(Tx) -> {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. +get_approximate_tx_size(#{} = TxDb) -> + require_transaction(TxDb), + #{tx := Tx} = TxDb, + erlfdb:get_approximate_size(Tx). + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).