Hello Bryan.
I'm detect problem.

Problem is in reduce phase.

1. See riak_kv_mrc_pipe:mr2pipe_phases implementation. It convert MapReduce job 
spec to riak_pipe spec. 
In this fun created ConstHashCookie as  Now = now(), and use it as chashfun 
value for fitting  in reduce phase.
This generated value actually used in riak_kv_w_reduce:done function, you try 
make prereduce not reduced data and send to output.
But output vnode in that case is
preflist for ConstHashCookie,i.e. some random value and n_val for this phase is 
always 1, that why sometimes calculated perflist for this phase is empty.

Do you have any suggestion how we can fix it?

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: [email protected]
Кому: "Bryan Fink" <[email protected]>
Копия: "Riak-Users" <[email protected]>
Отправленные: Четверг, 31 Январь 2013 г 17:34:34
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce        
when one node is down.

Thank your for response.
1. Riak 1.2. I'm clone it form github master branch some times ago.
2. 6 nodes in out test environment.
3. more than 100 millions.(100 thousands per day)
4. near 100 thousands (index by date).

I'm prepare simple test module and test scenario. It must help you to help me:)

1. Generate four riak node.
[xx riak]$ make devrel
2. Start all nodes and join it to cluster.
[xx dev]$ dev1/bin/riak-admin ringready
TRUE All nodes agree on the ring ['[email protected]','[email protected]',
                                  '[email protected]','[email protected]']

3. Kill fourth node.
[xx dev]$ dev4/bin/riak stop

[xx dev]$ dev1/bin/riak-admin ringready
FALSE ['[email protected]'] down.  All nodes need to be up to check.

4. Compile my test module and include it in code path on all started nodes(I'm 
include it in riak_kv before compiling riak).

5. Join to erlang console on node dev1.
[xx dev]$ dev1/bin/riak attach

6. Test

Generate test dataset (50 objects).
([email protected])1> mapred_test:make_data(50).
ok

Check that all data available.
([email protected])7> mapred_test:check_data(50).
{not_available,0}
Ok. All data saved.
Check that all data available 50 times.
([email protected])8> mapred_test:check_data_n(50,50).
ok
Ok. All data really saved.

Try Count object count this MapReduce.
([email protected])23> mapred_test:pipe_mapreduce_check(50).
{not_available,0}
Ok. 
....Repeat 7 times.
Attempts #8.
([email protected])23> mapred_test:pipe_mapreduce_check(50).
{not_available,50}
!!!Fail!!!MapReduce actualy return {ok,[]}.

Try generate some statistics. Repeat MapReduce task 50 times.
([email protected])29> mapred_test:pipe_mapreduce_check_n(50,50).
ok count 39
failed count 11.

Every 1 of 5 task failed.

Code of mapred_test module:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mapred_test).


-export([map/3,
                 reduce/2,
                 make_data/1,
                 check_data_n/2,
                 check_data/1,
                 pipe_mapreduce_check/1,
                 pipe_mapreduce_check_n/2]).

%%map for counter
map({error,notfound},_,_)->
        [];
map(_,_,_)->
        [1].

%%simle sum.
reduce(L,_)->
        [lists:foldl(fun(I,Acc)->I+Acc end,0,L)].

%%Generate data set for test.
make_data(Count)->
        {ok,Conn} = riak:local_client(),
        make_data_loop(Count,Conn).
make_data_loop(0,_)->
        ok;
make_data_loop(Count,Conn)->
        Key = list_to_binary(integer_to_list(Count)),
        RObj = riak_object:new(<<"mapred_test">>,Key,1),
        case Conn:put(RObj) of
                ok->
                        make_data_loop(Count-1,Conn);
                Else->
                        {error,Else}
        end.

        
check_data(Count)->
        {ok,Conn} = riak:local_client(),
        {not_available,check_data_loop(Count,Conn,0)}.

check_data_loop(0,_Conn,Acc)->
        Acc;
check_data_loop(Count,Conn,Acc)->
        Key = list_to_binary(integer_to_list(Count)),
        case Conn:get(<<"mapred_test">>,Key) of
                {ok,_}->
                        check_data_loop(Count-1,Conn,Acc);
                _->
                        check_data_loop(Count-1,Conn,Acc+1)
        end.

check_data_n(_Count,0)->
        ok;
check_data_n(Count,N)->
        case check_data(Count) of
                {not_available,0}->
                        check_data_n(Count,N-1);
                Else->
                        Else
        end.

pipe_mapreduce_check(Count)->
        Query = [{map, {modfun,?MODULE,map},[do_prereduce,none],false},
                         {reduce, 
{modfun,?MODULE,reduce},[{reduce_phase_batch_size, 1000}], true}],
        case riak_kv_mrc_pipe:mapred(<<"mapred_test">>,Query,60000) of
                {ok,[I]} when is_integer(I)->
                        {not_available,Count-I};
                {ok,[]}->
                        {not_available,Count};
                Else->
                        Else
        end.

pipe_mapreduce_check_n(Count,N)->
        {Good,Bad}=pipe_mapreduce_check_n(Count,N,{0,0}),
        io:format("ok count ~p ~nfailed count ~p~n",[Good,Bad]).

pipe_mapreduce_check_n(_Count,0,Acc)->
        Acc;
pipe_mapreduce_check_n(Count,N,{Good,Bad})->
        case pipe_mapreduce_check(Count) of
                {not_available,0}->
                        pipe_mapreduce_check_n(Count,N-1,{Good+1,Bad});
                {not_available,_}->
                        pipe_mapreduce_check_n(Count,N-1,{Good,Bad+1});
                Else->
                        {0,{runtime_error,Else}}
        end.


%%%%%%%%%%%%%%%%%%%%%%

Thanks,
Alexander Gunin.

----- Исходное сообщение -----
От: "Bryan Fink" <[email protected]>
Кому: [email protected]
Копия: "John Daily" <[email protected]>, "Riak-Users" 
<[email protected]>
Отправленные: Четверг, 31 Январь 2013 г 17:03:09
Тема: Re: Differences between riak_client and riak_kv_mrc_pipe MapReduce when 
one node is down.

On Thu, Jan 31, 2013 at 6:07 AM,  <[email protected]> wrote:
> Sorry John. You don't understand my question.
> 1. One node(I mean physical(erlang) node) in cluster is down.
> 2. It was down when i'm start job,when perform job and after it. We power off 
> this node. It's under repair. But we don't remove this node from cluster.

Aha. Thank you for the clarification. Sorry for pushing John in the
wrong direction. Your new description leads me to think that the
problem is likely in the reduce phase (where we do, yes, use an nval
of 1, but also a constant hash that doesn't account for node
liveness).

As yet, I've been unable to reproduce exactly what you'r seeing,
though. I always get an error instead of an empty result. Answers to
some of these questions may help me:

1. What version of Riak are you running?
2. How many nodes do you have in the cluster?
3. About how many keys are in this bucket?
4. About how many keys do you expect to match the index query?

Thanks,
Bryan

_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to