Github user iilyak commented on a diff in the pull request:

    https://github.com/apache/couchdb-fabric/pull/47#discussion_r60671795
  
    --- Diff: src/fabric_doc_open_revs.erl ---
    @@ -56,266 +57,390 @@ go(DbName, Id, Revs, Options) ->
             rexi_monitor:stop(RexiMon)
         end.
     
    +
     handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, 
#state{workers=Workers}=State) ->
    -    NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers),
    -    skip(State#state{workers=NewWorkers});
    +    NewState = State#state{
    +        workers = lists:keydelete(NodeRef, #shard.node, Workers)
    +    },
    +    handle_message({ok, []}, nil, NewState);
    +
     handle_message({rexi_EXIT, _}, Worker, #state{workers=Workers}=State) ->
    -    skip(State#state{workers=lists:delete(Worker,Workers)});
    -handle_message({ok, RawReplies}, Worker, #state{revs = all} = State) ->
    +    NewState = State#state{
    +        workers = lists:delete(Worker, Workers)
    +    },
    +    handle_message({ok, []}, nil, NewState);
    +
    +handle_message({ok, RawReplies}, Worker, State) ->
         #state{
             dbname = DbName,
             reply_count = ReplyCount,
             worker_count = WorkerCount,
             workers = Workers,
    -        replies = All0,
    -        r = R
    +        replies = PrevReplies,
    +        r = R,
    +        revs = Revs,
    +        latest = Latest,
    +        repair = InRepair
         } = State,
    -    All = lists:foldl(fun(Reply,D) -> 
fabric_util:update_counter(Reply,1,D) end,
    -        All0, RawReplies),
    -    Reduced = fabric_util:remove_ancestors(All, []),
    -    Complete = (ReplyCount =:= (WorkerCount - 1)),
    -    QuorumMet = lists:all(fun({_,{_, C}}) -> C >= R end, Reduced),
    -    case Reduced of All when QuorumMet andalso ReplyCount =:= (R-1) ->
    -        Repair = false;
    -    _ ->
    -        Repair = [D || {_,{{ok,D}, _}} <- Reduced]
    +
    +    IsTree = Revs == all orelse Latest,
    +
    +    {NewReplies, QuorumMet, Repair} = case IsTree of
    +        true ->
    +            {NewReplies0, AllInternal, Repair0} =
    +                    tree_replies(PrevReplies, tree_sort(RawReplies)),
    +            NumLeafs = length(couch_key_tree:get_all_leafs(PrevReplies)),
    +            SameNumRevs = length(RawReplies) == NumLeafs,
    +            QMet = AllInternal andalso SameNumRevs andalso ReplyCount + 1 
>= R,
    +            {NewReplies0, QMet, Repair0};
    +        false ->
    +            {NewReplies0, MinCount} = dict_replies(PrevReplies, 
RawReplies),
    +            {NewReplies0, MinCount >= R, false}
         end,
    -    case maybe_reply(DbName, Reduced, Complete, Repair, R) of
    -    noreply ->
    -        {ok, State#state{replies = All, reply_count = ReplyCount+1,
    -                        workers = lists:delete(Worker,Workers)}};
    -    {reply, FinalReply} ->
    -        fabric_util:cleanup(lists:delete(Worker,Workers)),
    -        {stop, FinalReply}
    -    end;
    -handle_message({ok, RawReplies0}, Worker, State) ->
    -    % we've got an explicit revision list, but if latest=true the workers 
may
    -    % return a descendant of the requested revision.  Take advantage of the
    -    % fact that revisions are returned in order to keep track.
    -    RawReplies = strip_not_found_missing(RawReplies0),
    -    #state{
    -        dbname = DbName,
    -        reply_count = ReplyCount,
    -        worker_count = WorkerCount,
    -        workers = Workers,
    -        replies = All0,
    -        r = R
    -    } = State,
    -    All = lists:zipwith(fun({Rev, D}, Reply) ->
    -        if Reply =:= error -> {Rev, D}; true ->
    -            {Rev, fabric_util:update_counter(Reply, 1, D)}
    -        end
    -    end, All0, RawReplies),
    -    Reduced = [fabric_util:remove_ancestors(X, []) || {_, X} <- All],
    -    FinalReplies = [choose_winner(X, R) || X <- Reduced, X =/= []],
    +
         Complete = (ReplyCount =:= (WorkerCount - 1)),
    -    case is_repair_needed(All, FinalReplies) of
    -    true ->
    -        Repair = [D || {_,{{ok,D}, _}} <- lists:flatten(Reduced)];
    -    false ->
    -        Repair = false
    -    end,
    -    case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of
    -    noreply ->
    -        {ok, State#state{replies = All, reply_count = ReplyCount+1,
    -                        workers=lists:delete(Worker,Workers)}};
    -    {reply, FinalReply} ->
    -        fabric_util:cleanup(lists:delete(Worker,Workers)),
    -        {stop, FinalReply}
    +
    +    case QuorumMet orelse Complete of
    +        true ->
    +            fabric_util:cleanup(lists:delete(Worker, Workers)),
    +            maybe_read_repair(
    +                    DbName,
    +                    IsTree,
    +                    NewReplies,
    +                    ReplyCount + 1,
    +                    InRepair orelse Repair
    +                ),
    +            {stop, format_reply(IsTree, NewReplies)};
    +        false ->
    +            {ok, State#state{
    +                replies = NewReplies,
    +                reply_count = ReplyCount + 1,
    +                workers = lists:delete(Worker, Workers),
    +                repair = InRepair orelse Repair
    +            }}
    +    end.
    +
    +
    +tree_replies(RevTree, []) ->
    +    {RevTree, true, false};
    +
    +tree_replies(RevTree0, [{ok, Doc} | Rest]) ->
    +    {RevTree1, Done, Repair} = tree_replies(RevTree0, Rest),
    +    Path = couch_doc:to_path(Doc),
    +    case couch_key_tree:merge(RevTree1, Path) of
    +        {RevTree2, internal_node} ->
    +            {RevTree2, Done, Repair};
    +        {RevTree2, new_leaf} ->
    +            {RevTree2, Done, true};
    +        {RevTree2, _} ->
    +            {RevTree2, false, true}
    +    end;
    +
    +tree_replies(RevTree0, [{{not_found, missing}, {Pos, Rev}} | Rest]) ->
    +    {RevTree1, Done, Repair} = tree_replies(RevTree0, Rest),
    +    Node = {Rev, ?REV_MISSING, []},
    +    Path = {Pos, Node},
    +    case couch_key_tree:merge(RevTree1, Path) of
    +        {RevTree2, internal_node} ->
    +            {RevTree2, Done, true};
    +        {RevTree2, _} ->
    +            {RevTree2, false, Repair}
         end.
     
    -skip(#state{revs=all} = State) ->
    -    handle_message({ok, []}, nil, State);
    -skip(#state{revs=Revs} = State) ->
    -    handle_message({ok, [error || _Rev <- Revs]}, nil, State).
    -
    -maybe_reply(_, [], false, _, _) ->
    -    noreply;
    -maybe_reply(_, [], true, _, _) ->
    -    {reply, {ok, []}};
    -maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) ->
    -    case Complete orelse lists:all(fun({_,{_, C}}) -> C >= R end, 
ReplyDict) of
    -    true ->
    -        maybe_execute_read_repair(DbName, RepairDocs),
    -        {reply, unstrip_not_found_missing(extract_replies(ReplyDict))};
    -    false ->
    -        noreply
    +
    +tree_sort(Replies) ->
    +    SortFun = fun(A, B) -> sort_key(A) =< sort_key(B) end,
    +    lists:sort(SortFun, Replies).
    +
    +
    +sort_key({ok, #doc{revs = {Pos, [Rev | _]}}}) ->
    +    {Pos, Rev};
    +sort_key({{not_found, _}, {Pos, Rev}}) ->
    +    {Pos, Rev}.
    +
    +
    +dict_replies(Dict, []) ->
    +    Counts = [Count || {_Key, {_Reply, Count}} <- Dict],
    +    {Dict, lists:min(Counts)};
    +
    +dict_replies(Dict, [Reply | Rest]) ->
    +    NewDict = fabric_util:update_counter(Reply, 1, Dict),
    +    dict_replies(NewDict, Rest).
    +
    +
    +maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
    +    Docs = case IsTree of
    +        true -> tree_repair_docs(Replies, DoRepair);
    +        false -> dict_repair_docs(Replies, ReplyCount)
    +    end,
    +    case length(Docs) > 0 of
    +        true ->
    +            erlang:spawn(fun() -> read_repair(Db, Docs) end);
    +        false ->
    +            ok
         end.
     
    -extract_replies(Replies) ->
    -    lists:map(fun({_,{Reply,_}}) -> Reply end, Replies).
    -
    -choose_winner(Options, R) ->
    -    case lists:dropwhile(fun({_,{_Reply, C}}) -> C < R end, Options) of
    -    [] ->
    -        case [Elem || {_,{{ok, #doc{}}, _}} = Elem <- Options] of
    -        [] ->
    -            hd(Options);
    -        Docs ->
    -            lists:last(lists:sort(Docs))
    -        end;
    -    [QuorumMet | _] ->
    -        QuorumMet
    +
    +tree_repair_docs(_Replies, false) ->
    +    [];
    +
    +tree_repair_docs(Replies, true) ->
    +    Leafs = couch_key_tree:get_all_leafs(Replies),
    +    [Doc || {Doc, {_Pos, _}} <- Leafs, is_record(Doc, doc)].
    +
    +
    +dict_repair_docs(Replies, ReplyCount) ->
    +    NeedsRepair = lists:any(fun({_, {_, C}}) -> C < ReplyCount end, 
Replies),
    +    if not NeedsRepair -> []; true ->
    +        [Doc || {_, {{ok, Doc}, _}} <- Replies]
         end.
     
    -% repair needed if any reply other than the winner has been received for a 
rev
    -is_repair_needed([], []) ->
    -    false;
    -is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) ->
    -    is_repair_needed(Tail1, Tail2);
    -is_repair_needed(_, _) ->
    -    true.
    -
    -maybe_execute_read_repair(_Db, false) ->
    -    ok;
    -maybe_execute_read_repair(Db, Docs) ->
    -    [#doc{id=Id} | _] = Docs,
    +
    +read_repair(Db, Docs) ->
         Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
         case Res of
             {ok, []} ->
                 couch_stats:increment_counter([fabric, read_repairs, success]);
             _ ->
                 couch_stats:increment_counter([fabric, read_repairs, failure]),
    +            [#doc{id=Id} | _] = Docs,
    --- End diff --
    
    nitpick: Spaces `#doc{id = Id}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to