This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this 
push:
     new 51694f7  Bulk docs transaction batching
51694f7 is described below

commit 51694f74911683ca009af42779e6fc362c6fdc5a
Author: Nick Vatamaniuc <vatam...@apache.org>
AuthorDate: Tue May 19 12:52:02 2020 -0400

    Bulk docs transaction batching
    
     * Interactive (regular) requests are split into smaller transactions, so
       larger updates won't fail with either timeout so or transaction too large
       FDB errors.
    
     * Non-interactive (replicated) requests can now batch their updates in a 
few
       transaction and gain extra performance.
    
    Batch size is configurable:
    ```
    [fabric]
    update_docs_batch_size = 5000000
    ```
---
 rebar.config.script                           |   2 +-
 rel/overlay/etc/default.ini                   |   3 +
 src/fabric/src/fabric2_db.erl                 | 173 ++++++++++++++++----
 src/fabric/src/fabric2_fdb.erl                |   8 +
 src/fabric/test/fabric2_update_docs_tests.erl | 222 ++++++++++++++++++++++++++
 5 files changed, 379 insertions(+), 29 deletions(-)

diff --git a/rebar.config.script b/rebar.config.script
index 03c380f..c145566 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -152,7 +152,7 @@ DepDescs = [
 %% Independent Apps
 {config,           "config",           {tag, "2.1.7"}},
 {b64url,           "b64url",           {tag, "1.0.2"}},
-{erlfdb,           "erlfdb",           {tag, "v1.2.0"}},
+{erlfdb,           "erlfdb",           {tag, "v1.2.1"}},
 {ets_lru,          "ets-lru",          {tag, "1.1.0"}},
 {khash,            "khash",            {tag, "1.1.0"}},
 {snappy,           "snappy",           {tag, "CouchDB-1.0.4"}},
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 66680a4..35e5147 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -242,6 +242,9 @@ port = 6984
 ;
 ; Byte size of binary chunks written to FDB values. Defaults to FDB max limit.
 ;binary_chunk_size = 100000
+;
+; Bulk docs transaction batch size in bytes
+;update_docs_batch_size = 5000000
 
 ; [rexi]
 ; buffer_count = 2000
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 8764d4e..6540e0b 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -156,6 +156,19 @@
 
 -define(RETURN(Term), throw({?MODULE, Term})).
 
+-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 5000000).
+
+
+-record(bacc, {
+    db,
+    docs,
+    batch_size,
+    options,
+    rev_futures,
+    seen,
+    results
+}).
+
 
 create(DbName, Options) ->
     case validate_dbname(DbName) of
@@ -861,18 +874,8 @@ update_docs(Db, Docs0, Options) ->
     Docs1 = apply_before_doc_update(Db, Docs0, Options),
     try
         validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),
-        Resps0 = case lists:member(replicated_changes, Options) of
-            false ->
-                fabric2_fdb:transactional(Db, fun(TxDb) ->
-                    update_docs_interactive(TxDb, Docs1, Options)
-                end);
-            true ->
-                lists:map(fun(Doc) ->
-                    fabric2_fdb:transactional(Db, fun(TxDb) ->
-                        update_doc_int(TxDb, Doc, Options)
-                    end)
-                end, Docs1)
-        end,
+
+        Resps0 = batch_update_docs(Db, Docs1, Options),
 
         % Notify index builder
         fabric2_index:db_updated(name(Db)),
@@ -895,7 +898,7 @@ update_docs(Db, Docs0, Options) ->
                     Else
             end
         end, Resps0),
-        case lists:member(replicated_changes, Options) of
+        case is_replicated(Options) of
             true ->
                 {ok, lists:flatmap(fun(R) ->
                     case R of
@@ -1647,9 +1650,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
         <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
         _ -> false
     end,
-    IsReplicated = lists:member(replicated_changes, Options),
     try
-        case {IsLocal, IsReplicated} of
+        case {IsLocal, is_replicated(Options)} of
             {false, false} -> update_doc_interactive(Db, Doc, Options);
             {false, true} -> update_doc_replicated(Db, Doc, Options);
             {true, _} -> update_local_doc(Db, Doc, Options)
@@ -1659,17 +1661,119 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
     end.
 
 
-update_docs_interactive(Db, Docs0, Options) ->
-    Docs = tag_docs(Docs0),
-    Futures = get_winning_rev_futures(Db, Docs),
-    {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) ->
-        try
-            update_docs_interactive(Db, Doc, Options, Futures, SeenIds)
-        catch throw:{?MODULE, Return} ->
-            {Return, SeenIds}
+batch_update_docs(Db, Docs, Options) ->
+    BAcc = #bacc{
+        db = Db,
+        docs = Docs,
+        batch_size = get_batch_size(Options),
+        options = Options,
+        rev_futures = #{},
+        seen = [],
+        results = []
+    },
+    #bacc{results = Res} = batch_update_docs(BAcc),
+    lists:reverse(Res).
+
+
+batch_update_docs(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_docs(#bacc{db = Db} = BAcc) ->
+    #bacc{
+        db = Db,
+        docs = Docs,
+        options = Options
+    } = BAcc,
+
+    BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        BAccTx = BAcc#bacc{db = TxDb},
+        case is_replicated(Options) of
+            false ->
+                Tagged = tag_docs(Docs),
+                RevFutures = get_winning_rev_futures(TxDb, Tagged),
+                BAccTx1 = BAccTx#bacc{
+                    docs = Tagged,
+                    rev_futures = RevFutures
+                },
+                batch_update_interactive_tx(BAccTx1);
+            true ->
+                BAccTx1 = batch_update_replicated_tx(BAccTx),
+                % For replicated updates reset `seen` after every transaction
+                BAccTx1#bacc{seen = []}
         end
-    end, [], Docs),
-    Result.
+    end),
+
+    % Clean up after the transaction ends so we can recurse with a clean state
+    maps:map(fun(Tag, RangeFuture) when is_reference(Tag) ->
+        ok = erlfdb:cancel(RangeFuture, [flush])
+    end, BAccTx2#bacc.rev_futures),
+
+    BAcc1 = BAccTx2#bacc{
+        db = Db,
+        rev_futures = #{}
+    },
+
+    batch_update_docs(BAcc1).
+
+
+batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_interactive_tx(#bacc{} = BAcc) ->
+    #bacc{
+        db = TxDb,
+        docs = [Doc | Docs],
+        options = Options,
+        batch_size = MaxSize,
+        rev_futures = RevFutures,
+        seen = Seen,
+        results = Results
+    } = BAcc,
+    {Res, Seen1} = try
+        update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
+    catch throw:{?MODULE, Return} ->
+        {Return, Seen}
+    end,
+    BAcc1 = BAcc#bacc{
+        docs = Docs,
+        results = [Res | Results],
+        seen = Seen1
+    },
+    case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+        true -> BAcc1;
+        false -> batch_update_interactive_tx(BAcc1)
+    end.
+
+
+batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_replicated_tx(#bacc{} = BAcc) ->
+    #bacc{
+        db = TxDb,
+        docs = [Doc | Docs],
+        options = Options,
+        batch_size = MaxSize,
+        seen = Seen,
+        results = Results
+    } = BAcc,
+    case lists:member(Doc#doc.id, Seen) of
+        true ->
+            % If we already updated this doc in the current transaction, wait
+            % till the next transaction to update it again.
+            BAcc;
+        false ->
+            Res = update_doc_int(TxDb, Doc, Options),
+            BAcc1 = BAcc#bacc{
+                docs = Docs,
+                results = [Res | Results],
+                seen = [Doc#doc.id | Seen]
+            },
+            case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+                true -> BAcc1;
+                false -> batch_update_replicated_tx(BAcc1)
+            end
+    end.
 
 
 update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
@@ -2122,9 +2226,8 @@ doc_to_revid(#doc{revs = Revs}) ->
 tag_docs([]) ->
     [];
 tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
-    NewDoc = Doc#doc{
-        meta = [{ref, make_ref()} | Meta]
-    },
+    Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}),
+    NewDoc = Doc#doc{meta = Meta1},
     [NewDoc | tag_docs(Rest)].
 
 
@@ -2226,3 +2329,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
                 fabric2_fdb:ensure_current(TxDb)
             end)
     end.
+
+
+is_replicated(Options) when is_list(Options) ->
+    lists:member(replicated_changes, Options).
+
+
+get_batch_size(Options) ->
+    case fabric2_util:get_value(batch_size, Options) of
+        undefined ->
+            config:get_integer("fabric", "update_docs_batch_size",
+                ?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
+        Val when is_integer(Val) ->
+            Val
+    end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index f274aa6..e8f6e0d 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -75,6 +75,8 @@
 
     new_versionstamp/1,
 
+    get_approximate_tx_size/1,
+
     debug_cluster/0,
     debug_cluster/2
 ]).
@@ -1159,6 +1161,12 @@ new_versionstamp(Tx) ->
     {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
 
 
+get_approximate_tx_size(#{} = TxDb) ->
+    require_transaction(TxDb),
+    #{tx := Tx} = TxDb,
+    erlfdb:wait(erlfdb:get_approximate_size(Tx)).
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
 
diff --git a/src/fabric/test/fabric2_update_docs_tests.erl 
b/src/fabric/test/fabric2_update_docs_tests.erl
new file mode 100644
index 0000000..5a2389a
--- /dev/null
+++ b/src/fabric/test/fabric2_update_docs_tests.erl
@@ -0,0 +1,222 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_update_docs_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include("fabric2_test.hrl").
+
+
+update_docs_test_() ->
+    {
+        "Test update_docs",
+        {
+            setup,
+            fun setup_all/0,
+            fun teardown_all/1,
+            {
+                foreach,
+                fun setup/0,
+                fun cleanup/1,
+                [
+                    ?TDEF_FE(update_docs),
+                    ?TDEF_FE(update_docs_replicated),
+                    ?TDEF_FE(update_docs_batches),
+                    ?TDEF_FE(update_docs_replicated_batches),
+                    ?TDEF_FE(update_docs_duplicate_ids_conflict),
+                    ?TDEF_FE(update_docs_duplicate_ids_with_batches),
+                    ?TDEF_FE(update_docs_replicate_batches_duplicate_id)
+                ]
+            }
+        }
+    }.
+
+
+setup_all() ->
+    test_util:start_couch([fabric]).
+
+
+teardown_all(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+setup() ->
+    {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+    meck:new(erlfdb, [passthrough]),
+    Db.
+
+
+cleanup(#{} = Db) ->
+    meck:unload(),
+    ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+update_docs(Db) ->
+    ?assertEqual({ok, []}, fabric2_db:update_docs(Db, [])),
+
+    Doc1 = doc(),
+    Res1 = fabric2_db:update_docs(Db, [Doc1]),
+    ?assertMatch({ok, [_]}, Res1),
+    {ok, [Doc1Res]} = Res1,
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res),
+    {ok, {1, Rev1}} = Doc1Res,
+    {ok, Doc1Open} = fabric2_db:open_doc(Db, Doc1#doc.id),
+    ?assertEqual(Doc1#doc{revs = {1, [Rev1]}}, Doc1Open),
+
+    Doc2 = doc(),
+    Doc3 = doc(),
+    Res2 = fabric2_db:update_docs(Db, [Doc2, Doc3]),
+    ?assertMatch({ok, [_, _]}, Res2),
+    {ok, [Doc2Res, Doc3Res]} = Res2,
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res),
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc3Res).
+
+
+update_docs_replicated(Db) ->
+    Opts = [replicated_changes],
+
+    ?assertEqual({ok, []}, fabric2_db:update_docs(Db, [], Opts)),
+
+    Doc1 = doc(10, {1, [rev()]}),
+    ?assertMatch({ok, []}, fabric2_db:update_docs(Db, [Doc1], Opts)),
+    {ok, Doc1Open} = fabric2_db:open_doc(Db, Doc1#doc.id),
+    ?assertEqual(Doc1, Doc1Open),
+
+    Doc2 = doc(10, {1, [rev()]}),
+    Doc3 = doc(10, {1, [rev()]}),
+    ?assertMatch({ok, []}, fabric2_db:update_docs(Db, [Doc2, Doc3], Opts)),
+    {ok, Doc2Open} = fabric2_db:open_doc(Db, Doc2#doc.id),
+    ?assertEqual(Doc2, Doc2Open),
+    {ok, Doc3Open} = fabric2_db:open_doc(Db, Doc3#doc.id),
+    ?assertEqual(Doc3, Doc3Open).
+
+
+update_docs_batches(Db) ->
+    Opts = [{batch_size, 5000}],
+
+    Docs1 = [doc(9000), doc(9000)],
+
+    meck:reset(erlfdb),
+    ?assertMatch({ok, [_ | _]}, fabric2_db:update_docs(Db, Docs1, Opts)),
+    ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+    lists:foreach(fun(#doc{} = Doc) ->
+        ?assertMatch({ok, #doc{}}, fabric2_db:open_doc(Db, Doc#doc.id))
+    end, Docs1),
+
+    Docs2 = [doc(10), doc(10), doc(9000), doc(10)],
+
+    meck:reset(erlfdb),
+    ?assertMatch({ok, [_ | _]}, fabric2_db:update_docs(Db, Docs2, Opts)),
+    ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+    lists:foreach(fun(#doc{} = Doc) ->
+        ?assertMatch({ok, #doc{}}, fabric2_db:open_doc(Db, Doc#doc.id))
+    end, Docs2).
+
+
+update_docs_replicated_batches(Db) ->
+    Opts = [{batch_size, 5000}, replicated_changes],
+
+    Docs1 = [doc(Size, {1, [rev()]}) || Size <- [9000, 9000]],
+
+    meck:reset(erlfdb),
+    ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs1, Opts)),
+    ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+    lists:foreach(fun(#doc{} = Doc) ->
+        ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id))
+    end, Docs1),
+
+    Docs2 = [doc(Size, {1, [rev()]}) || Size <- [10, 10, 9000, 10]],
+
+    meck:reset(erlfdb),
+    ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs2, Opts)),
+    ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+    lists:foreach(fun(#doc{} = Doc) ->
+        ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id))
+    end, Docs2).
+
+
+update_docs_duplicate_ids_conflict(Db) ->
+    Doc = doc(),
+
+    Res = fabric2_db:update_docs(Db, [Doc, doc(), Doc]),
+    ?assertMatch({ok, [_, _, _]}, Res),
+
+    {ok, [Doc1Res, Doc2Res, Doc3Res]} = Res,
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res),
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res),
+    ?assertMatch(conflict, Doc3Res).
+
+
+update_docs_duplicate_ids_with_batches(Db) ->
+    Opts = [{batch_size, 5000}],
+
+    Doc = doc(9000),
+
+    meck:reset(erlfdb),
+    Res = fabric2_db:update_docs(Db, [Doc, doc(9000), Doc], Opts),
+    ?assertMatch({ok, [_, _, _]}, Res),
+    ?assertEqual(3, meck:num_calls(erlfdb, transactional, 2)),
+
+    {ok, [Doc1Res, Doc2Res, Doc3Res]} = Res,
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res),
+    ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res),
+    ?assertMatch(conflict, Doc3Res).
+
+
+update_docs_replicate_batches_duplicate_id(Db) ->
+    Opts = [replicated_changes],
+
+    Doc = doc(10, {1, [rev()]}),
+    Docs = [Doc, Doc],
+
+    meck:reset(erlfdb),
+    ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs, Opts)),
+    ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+    ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id)).
+
+
+% Utility functions
+
+doc() ->
+    doc(2).
+
+
+doc(Size) ->
+    doc(Size, undefined).
+
+
+doc(Size, Revs) ->
+    Doc = #doc{
+        id = fabric2_util:uuid(),
+        body = doc_body(Size)
+    },
+    case Revs of
+        undefined -> Doc;
+        _ -> Doc#doc{revs = Revs}
+    end.
+
+
+rev() ->
+    fabric2_util:to_hex(crypto:strong_rand_bytes(16)).
+
+
+doc_body(Size) when is_integer(Size), Size >= 2 ->
+    Val = fabric2_util:to_hex(crypto:strong_rand_bytes(Size div 2)),
+    {[{<<"x">>, Val}]}.

Reply via email to