Github user iilyak commented on a diff in the pull request:

    https://github.com/apache/couchdb-fabric/pull/47#discussion_r60672502
  
    --- Diff: src/fabric_doc_open_revs.erl ---
    @@ -56,266 +57,390 @@ go(DbName, Id, Revs, Options) ->
             rexi_monitor:stop(RexiMon)
         end.
     
    +
     handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, 
#state{workers=Workers}=State) ->
    -    NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers),
    -    skip(State#state{workers=NewWorkers});
    +    NewState = State#state{
    +        workers = lists:keydelete(NodeRef, #shard.node, Workers)
    +    },
    +    handle_message({ok, []}, nil, NewState);
    +
     handle_message({rexi_EXIT, _}, Worker, #state{workers=Workers}=State) ->
    -    skip(State#state{workers=lists:delete(Worker,Workers)});
    -handle_message({ok, RawReplies}, Worker, #state{revs = all} = State) ->
    +    NewState = State#state{
    +        workers = lists:delete(Worker, Workers)
    +    },
    +    handle_message({ok, []}, nil, NewState);
    +
    +handle_message({ok, RawReplies}, Worker, State) ->
         #state{
             dbname = DbName,
             reply_count = ReplyCount,
             worker_count = WorkerCount,
             workers = Workers,
    -        replies = All0,
    -        r = R
    +        replies = PrevReplies,
    +        r = R,
    +        revs = Revs,
    +        latest = Latest,
    +        repair = InRepair
         } = State,
    -    All = lists:foldl(fun(Reply,D) -> 
fabric_util:update_counter(Reply,1,D) end,
    -        All0, RawReplies),
    -    Reduced = fabric_util:remove_ancestors(All, []),
    -    Complete = (ReplyCount =:= (WorkerCount - 1)),
    -    QuorumMet = lists:all(fun({_,{_, C}}) -> C >= R end, Reduced),
    -    case Reduced of All when QuorumMet andalso ReplyCount =:= (R-1) ->
    -        Repair = false;
    -    _ ->
    -        Repair = [D || {_,{{ok,D}, _}} <- Reduced]
    +
    +    IsTree = Revs == all orelse Latest,
    +
    +    {NewReplies, QuorumMet, Repair} = case IsTree of
    +        true ->
    +            {NewReplies0, AllInternal, Repair0} =
    +                    tree_replies(PrevReplies, tree_sort(RawReplies)),
    +            NumLeafs = length(couch_key_tree:get_all_leafs(PrevReplies)),
    +            SameNumRevs = length(RawReplies) == NumLeafs,
    +            QMet = AllInternal andalso SameNumRevs andalso ReplyCount + 1 
>= R,
    +            {NewReplies0, QMet, Repair0};
    +        false ->
    +            {NewReplies0, MinCount} = dict_replies(PrevReplies, 
RawReplies),
    +            {NewReplies0, MinCount >= R, false}
         end,
    -    case maybe_reply(DbName, Reduced, Complete, Repair, R) of
    -    noreply ->
    -        {ok, State#state{replies = All, reply_count = ReplyCount+1,
    -                        workers = lists:delete(Worker,Workers)}};
    -    {reply, FinalReply} ->
    -        fabric_util:cleanup(lists:delete(Worker,Workers)),
    -        {stop, FinalReply}
    -    end;
    -handle_message({ok, RawReplies0}, Worker, State) ->
    -    % we've got an explicit revision list, but if latest=true the workers 
may
    -    % return a descendant of the requested revision.  Take advantage of the
    -    % fact that revisions are returned in order to keep track.
    -    RawReplies = strip_not_found_missing(RawReplies0),
    -    #state{
    -        dbname = DbName,
    -        reply_count = ReplyCount,
    -        worker_count = WorkerCount,
    -        workers = Workers,
    -        replies = All0,
    -        r = R
    -    } = State,
    -    All = lists:zipwith(fun({Rev, D}, Reply) ->
    -        if Reply =:= error -> {Rev, D}; true ->
    -            {Rev, fabric_util:update_counter(Reply, 1, D)}
    -        end
    -    end, All0, RawReplies),
    -    Reduced = [fabric_util:remove_ancestors(X, []) || {_, X} <- All],
    -    FinalReplies = [choose_winner(X, R) || X <- Reduced, X =/= []],
    +
         Complete = (ReplyCount =:= (WorkerCount - 1)),
    -    case is_repair_needed(All, FinalReplies) of
    -    true ->
    -        Repair = [D || {_,{{ok,D}, _}} <- lists:flatten(Reduced)];
    -    false ->
    -        Repair = false
    -    end,
    -    case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of
    -    noreply ->
    -        {ok, State#state{replies = All, reply_count = ReplyCount+1,
    -                        workers=lists:delete(Worker,Workers)}};
    -    {reply, FinalReply} ->
    -        fabric_util:cleanup(lists:delete(Worker,Workers)),
    -        {stop, FinalReply}
    +
    +    case QuorumMet orelse Complete of
    +        true ->
    +            fabric_util:cleanup(lists:delete(Worker, Workers)),
    +            maybe_read_repair(
    +                    DbName,
    +                    IsTree,
    +                    NewReplies,
    +                    ReplyCount + 1,
    +                    InRepair orelse Repair
    +                ),
    +            {stop, format_reply(IsTree, NewReplies)};
    +        false ->
    +            {ok, State#state{
    +                replies = NewReplies,
    +                reply_count = ReplyCount + 1,
    +                workers = lists:delete(Worker, Workers),
    +                repair = InRepair orelse Repair
    +            }}
    +    end.
    +
    +
    +tree_replies(RevTree, []) ->
    +    {RevTree, true, false};
    +
    +tree_replies(RevTree0, [{ok, Doc} | Rest]) ->
    +    {RevTree1, Done, Repair} = tree_replies(RevTree0, Rest),
    +    Path = couch_doc:to_path(Doc),
    +    case couch_key_tree:merge(RevTree1, Path) of
    +        {RevTree2, internal_node} ->
    +            {RevTree2, Done, Repair};
    +        {RevTree2, new_leaf} ->
    +            {RevTree2, Done, true};
    +        {RevTree2, _} ->
    +            {RevTree2, false, true}
    +    end;
    +
    +tree_replies(RevTree0, [{{not_found, missing}, {Pos, Rev}} | Rest]) ->
    +    {RevTree1, Done, Repair} = tree_replies(RevTree0, Rest),
    +    Node = {Rev, ?REV_MISSING, []},
    +    Path = {Pos, Node},
    +    case couch_key_tree:merge(RevTree1, Path) of
    +        {RevTree2, internal_node} ->
    +            {RevTree2, Done, true};
    +        {RevTree2, _} ->
    +            {RevTree2, false, Repair}
         end.
     
    -skip(#state{revs=all} = State) ->
    -    handle_message({ok, []}, nil, State);
    -skip(#state{revs=Revs} = State) ->
    -    handle_message({ok, [error || _Rev <- Revs]}, nil, State).
    -
    -maybe_reply(_, [], false, _, _) ->
    -    noreply;
    -maybe_reply(_, [], true, _, _) ->
    -    {reply, {ok, []}};
    -maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) ->
    -    case Complete orelse lists:all(fun({_,{_, C}}) -> C >= R end, 
ReplyDict) of
    -    true ->
    -        maybe_execute_read_repair(DbName, RepairDocs),
    -        {reply, unstrip_not_found_missing(extract_replies(ReplyDict))};
    -    false ->
    -        noreply
    +
    +tree_sort(Replies) ->
    +    SortFun = fun(A, B) -> sort_key(A) =< sort_key(B) end,
    +    lists:sort(SortFun, Replies).
    +
    +
    +sort_key({ok, #doc{revs = {Pos, [Rev | _]}}}) ->
    +    {Pos, Rev};
    +sort_key({{not_found, _}, {Pos, Rev}}) ->
    +    {Pos, Rev}.
    +
    +
    +dict_replies(Dict, []) ->
    +    Counts = [Count || {_Key, {_Reply, Count}} <- Dict],
    +    {Dict, lists:min(Counts)};
    +
    +dict_replies(Dict, [Reply | Rest]) ->
    +    NewDict = fabric_util:update_counter(Reply, 1, Dict),
    +    dict_replies(NewDict, Rest).
    +
    +
    +maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
    +    Docs = case IsTree of
    +        true -> tree_repair_docs(Replies, DoRepair);
    +        false -> dict_repair_docs(Replies, ReplyCount)
    +    end,
    +    case length(Docs) > 0 of
    +        true ->
    +            erlang:spawn(fun() -> read_repair(Db, Docs) end);
    +        false ->
    +            ok
         end.
     
    -extract_replies(Replies) ->
    -    lists:map(fun({_,{Reply,_}}) -> Reply end, Replies).
    -
    -choose_winner(Options, R) ->
    -    case lists:dropwhile(fun({_,{_Reply, C}}) -> C < R end, Options) of
    -    [] ->
    -        case [Elem || {_,{{ok, #doc{}}, _}} = Elem <- Options] of
    -        [] ->
    -            hd(Options);
    -        Docs ->
    -            lists:last(lists:sort(Docs))
    -        end;
    -    [QuorumMet | _] ->
    -        QuorumMet
    +
    +tree_repair_docs(_Replies, false) ->
    +    [];
    +
    +tree_repair_docs(Replies, true) ->
    +    Leafs = couch_key_tree:get_all_leafs(Replies),
    +    [Doc || {Doc, {_Pos, _}} <- Leafs, is_record(Doc, doc)].
    +
    +
    +dict_repair_docs(Replies, ReplyCount) ->
    +    NeedsRepair = lists:any(fun({_, {_, C}}) -> C < ReplyCount end, 
Replies),
    +    if not NeedsRepair -> []; true ->
    +        [Doc || {_, {{ok, Doc}, _}} <- Replies]
         end.
     
    -% repair needed if any reply other than the winner has been received for a 
rev
    -is_repair_needed([], []) ->
    -    false;
    -is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) ->
    -    is_repair_needed(Tail1, Tail2);
    -is_repair_needed(_, _) ->
    -    true.
    -
    -maybe_execute_read_repair(_Db, false) ->
    -    ok;
    -maybe_execute_read_repair(Db, Docs) ->
    -    [#doc{id=Id} | _] = Docs,
    +
    +read_repair(Db, Docs) ->
         Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
         case Res of
             {ok, []} ->
                 couch_stats:increment_counter([fabric, read_repairs, success]);
             _ ->
                 couch_stats:increment_counter([fabric, read_repairs, failure]),
    +            [#doc{id=Id} | _] = Docs,
                 couch_log:notice("read_repair ~s ~s ~p", [Db, Id, Res])
         end.
     
    -% hackery required so that not_found sorts first
    -strip_not_found_missing([]) ->
    -    [];
    -strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) ->
    -    [{not_found, Rev} | strip_not_found_missing(Rest)];
    -strip_not_found_missing([Else | Rest]) ->
    -    [Else | strip_not_found_missing(Rest)].
     
    -unstrip_not_found_missing([]) ->
    -    [];
    -unstrip_not_found_missing([{not_found, Rev} | Rest]) ->
    -    [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)];
    -unstrip_not_found_missing([Else | Rest]) ->
    -    [Else | unstrip_not_found_missing(Rest)].
    +format_reply(true, Replies) ->
    +    tree_format_replies(Replies);
    +
    +format_reply(false, Replies) ->
    +    dict_format_replies(Replies).
    +
    +
    +tree_format_replies(RevTree) ->
    +    Leafs = couch_key_tree:get_all_leafs(RevTree),
    +    lists:sort(lists:map(fun(Reply) ->
    +        case Reply of
    +            {?REV_MISSING, {Pos, [Rev]}} ->
    +                {{not_found, missing}, {Pos, Rev}};
    +            {Doc, _} when is_record(Doc, doc) ->
    +                {ok, Doc}
    +        end
    +    end, Leafs)).
    +
    +
    +dict_format_replies(Dict) ->
    +    lists:sort([Reply || {_, {Reply, _}} <- Dict]).
    +
    +
    +
    +-ifdef(TEST).
    +-include_lib("eunit/include/eunit.hrl").
    +
     
    -all_revs_test() ->
    +setup() ->
         config:start_link([]),
    -    meck:new([fabric, couch_stats]),
    +    meck:new([fabric, couch_stats, couch_log]),
         meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
         meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
    -    meck:new(couch_log),
    -    meck:expect(couch_log, notice, fun(_,_) -> ok end),
    -
    -    State0 = #state{worker_count = 3, workers=[nil,nil,nil], r = 2, revs = 
all},
    -    Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
    -    Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
    -    Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
    -
    -    % an empty worker response does not count as meeting quorum
    -    ?assertMatch(
    -        {ok, #state{workers=[nil,nil]}},
    -        handle_message({ok, []}, nil, State0)
    -    ),
    -
    -    ?assertMatch(
    -        {ok, #state{workers=[nil, nil]}},
    -        handle_message({ok, [Foo1, Bar1]}, nil, State0)
    -    ),
    -    {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0),
    -
    -    % the normal case - workers agree
    -    ?assertEqual(
    -        {stop, [Bar1, Foo1]},
    -        handle_message({ok, [Foo1, Bar1]}, nil, State1)
    -    ),
    -
    -    % a case where the 2nd worker has a newer Foo - currently we're 
considering
    -    % Foo to have reached quorum and execute_read_repair()
    -    ?assertEqual(
    -        {stop, [Bar1, Foo2]},
    -        handle_message({ok, [Foo2, Bar1]}, nil, State1)
    -    ),
    -
    -    % a case where quorum has not yet been reached for Foo
    -    ?assertMatch(
    -        {ok, #state{}},
    -        handle_message({ok, [Bar1]}, nil, State1)
    -    ),
    -    {ok, State2} = handle_message({ok, [Bar1]}, nil, State1),
    -
    -    % still no quorum, but all workers have responded.  We include Foo1 in 
the
    -    % response and execute_read_repair()
    -    ?assertEqual(
    -        {stop, [Bar1, Foo1]},
    -        handle_message({ok, [Bar1]}, nil, State2)
    -      ),
    -    meck:unload([fabric, couch_log, couch_stats]),
    +    meck:expect(couch_log, notice, fun(_, _) -> ok end).
    +
    +
    +teardown(_) ->
    +    (catch meck:unload([fabric, couch_stats, couch_log])),
         config:stop().
     
    -specific_revs_test() ->
    -    config:start_link([]),
    -    meck:new([fabric, couch_stats]),
    -    meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
    -    meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
    -    meck:new(couch_log),
    -    meck:expect(couch_log, notice, fun(_,_) -> ok end),
     
    -    Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}],
    -    State0 = #state{
    +state0(Revs, Latest) ->
    +    #state{
             worker_count = 3,
    -        workers = [nil, nil, nil],
    +        workers = [w1, w2, w3],
             r = 2,
             revs = Revs,
    -        latest = false,
    -        replies = [{Rev,[]} || Rev <- Revs]
    -    },
    -    Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
    -    Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
    -    Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
    -    Baz1 = {{not_found, missing}, {1,<<"baz">>}},
    -    Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}},
    -
    -    ?assertMatch(
    -        {ok, #state{}},
    -        handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0)
    -    ),
    -    {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0),
    -
    -    % the normal case - workers agree
    -    ?assertEqual(
    -        {stop, [Foo1, Bar1, Baz1]},
    -        handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1)
    -    ),
    -
    -    % latest=true, worker responds with Foo2 and we return it
    -    State0L = State0#state{latest = true},
    -    ?assertMatch(
    -        {ok, #state{}},
    -        handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L)
    -    ),
    -    {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L),
    -    ?assertEqual(
    -        {stop, [Foo2, Bar1, Baz1]},
    -        handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L)
    -    ),
    -
    -    % Foo1 is included in the read quorum for Foo2
    -    ?assertEqual(
    -        {stop, [Foo2, Bar1, Baz1]},
    -        handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L)
    -    ),
    -
    -    % {not_found, missing} is included in the quorum for any found revision
    -    ?assertEqual(
    -        {stop, [Foo2, Bar1, Baz2]},
    -        handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L)
    -    ),
    -
    -    % a worker failure is skipped
    -    ?assertMatch(
    -        {ok, #state{}},
    -        handle_message({rexi_EXIT, foo}, nil, State1L)
    -    ),
    -    {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L),
    -    ?assertEqual(
    -        {stop, [Foo2, Bar1, Baz2]},
    -        handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L)
    -      ),
    -    meck:unload([fabric, couch_log, couch_stats]),
    -    config:stop().
    +        latest = Latest
    +    }.
    +
    +
    +revs() -> [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}].
    +
    +
    +foo1() -> {ok, #doc{revs = {1, [<<"foo">>]}}}.
    +foo2() -> {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}.
    +bar1() -> {ok, #doc{revs = {1, [<<"bar">>]}}}.
    +bazNF() -> {{not_found, missing}, {1,<<"baz">>}}.
    +baz1() -> {ok, #doc{revs = {1, [<<"baz">>]}}}.
    +
    +
    +
    +open_doc_revs_test_() ->
    +    {
    +        foreach,
    +        fun setup/0,
    +        fun teardown/1,
    +        [
    --- End diff --
    
    It does look like we return ordered results. Should we add test to check 
that results are sorted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to