Hi everyone,
first time here. Thanks in advance.
I am experiencing issues with MapReduce and it seems to timeout after a certain
volume data threshold is reached. The reducer is only one and here is the
mapreduce initiation script:
#!/usr/bin/env ruby
[…]
@client = Riak::Client.new(
:nodes => [
{:host => 'db1', :pb_port => 8087, :http_port => 8098},
{:host => 'db2', :pb_port => 8087, :http_port => 8098},
{:host => 'db3', :pb_port => 8087, :http_port => 8098}
],
:protocol => 'pbc'
)
start_key = "#{cust}:#{setup}:#{start_time}"
end_key = "#{cust}:#{setup}:#{end_time}"
result = Riak::MapReduce.new(@client).
index(bucket_name, index_name, start_key..end_key).
map('map95th').
reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep =>
true).
run()
puts result
The following is the code for the map95th and reduce95th javascript functions:
function map95th(v, keyData, arg) {
var key_elements = v['key'].split(':');
var cust = key_elements[0];
var setup = key_elements[1];
var sid = key_elements[2];
var ts = key_elements[3];
var result_key = cust + ':' + setup + ':' + ts;
var obj = {}
var obj_data = Riak.mapValuesJson(v)[0];
obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60;
return_val = obj_data['bps'];
return [ return_val ];
}
// if used, this must be a single reducer! Call from Ruby like this:
// reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep =>
true).
function reduce95th(values) {
var sorted = values.sort(function(a,b) { return a - b; });
var pct = sorted.length / 100;
var element_95th = pct * 95;
element_95th = parseInt(element_95th, 10) + 1;
return [ sorted[element_95th] ];
}
Now here is the interesting part. The MR goes through one record per minute. If
I run it for a period of less than ~20 days, it executes. Otherwise, it times
out:
[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$
[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb
yellingtone default $((`date +%s` - 20 * 86400)) `date +%s`
125581.51666666666
[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb
yellingtone default $((`date +%s` - 30 * 86400)) `date +%s`
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in
`decode_response': Expected success from Riak but received 0.
{"phase":1,"error":"timeout","input":null,"type":null,"stack":null}
(Riak::ProtobuffsFailedRequest)
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in
`mapred'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in
`block in mapred'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in
`block in recover_from'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in
`take'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in
`recover_from'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in
`protobuffs'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in
`backend'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in
`mapred'
from
/Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in
`run'
from ./95h.rb:29:in `<main>'
[deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$
The records being processed look lie this:
{"rx_bytes":3485395.0,"tx_bytes":1658479.0}
When running the script with more than 20 days worth of data (two records per
minute are processed, which amounts to 2 * 60 * 24 * 20 = more than 57,600
processed), the script times out and here are some things from the logs:
==> /var/log/riak/erlang.log.1 <==
Erlang has closed
==> /var/log/riak/error.log <==
2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 Pipe
worker startup failed:fitting was gone before startup
==> /var/log/riak/console.log <==
2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state
wait_for_input terminated with reason: timeout
==> /var/log/riak/error.log <==
2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in state
wait_for_input terminated with reason: timeout
==> /var/log/riak/console.log <==
2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process
<0.22049.4326> with 0 neighbours exited with reason: timeout in
gen_fsm:terminate/7 line 611
==> /var/log/riak/crash.log <==
2013-07-14 13:03:51 =CRASH REPORT====
crasher:
initial call: riak_pipe_vnode_worker:init/1
pid: <0.22049.4326>
registered_name: []
exception exit:
{timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]}
ancestors: [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>]
messages: []
links: [<0.710.0>,<0.709.0>]
dictionary:
[{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}]
trap_exit: false
status: running
heap_size: 832040
stack_size: 24
reductions: 1456611
neighbours:
==> /var/log/riak/error.log <==
2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process
<0.22049.4326> with 0 neighbours exited with reason: timeout in
gen_fsm:terminate/7 line 611
==> /var/log/riak/crash.log <==
2013-07-14 13:03:52 =SUPERVISOR REPORT====
Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup}
Context: child_terminated
Reason: timeout
Offender:
[{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}]
==> /var/log/riak/console.log <==
2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup
had child undefined started with {riak_pipe_vnode_worker,start_link,undefined}
at <0.22049.4326> exit with reason timeout in context child_terminated
==> /var/log/riak/error.log <==
2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor riak_pipe_vnode_worker_sup
had child undefined started with {riak_pipe_vnode_worker,start_link,undefined}
at <0.22049.4326> exit with reason timeout in context child_terminated
The data is in leveldb and is accessed through secondary indexes.
This is a 3 node cluster with 32GB ram, current usage is about 12G per node.
n_val=3. The same issues occurs on a similar 2 node cluster with 8GB of ram
(usage is ~6G).
The following is my app.config:
[
{riak_api, [
{pb_ip, "0.0.0.0" },
{pb_port, 8087 },
{pb_backlog, 100 }
]},
{riak_core, [
{default_bucket_props, [
{n_val, 3},
{last_write_wins, true}
]},
{ring_state_dir, "/storage/riak/ring"},
{ring_creation_size, 256},
{http, [ {"0.0.0.0", 8098 } ]},
{https, [{ "0.0.0.0", 8069 }]},
{ssl, [
{certfile, "/etc/ssl/riak/server.crt"},
{cacertfile, "/etc/ssl/riak/root.crt"},
{keyfile, "/etc/ssl/riak/server.key"}
]},
{handoff_port, 8099 },
{dtrace_support, false},
{enable_health_checks, true},
{platform_bin_dir, "/usr/sbin"},
{platform_data_dir, "/storage/riak"},
{platform_etc_dir, "/etc/riak"},
{platform_lib_dir, "/usr/lib/riak/lib"},
{platform_log_dir, "/var/log/riak"}
]},
{riak_kv, [
{storage_backend, riak_kv_eleveldb_backend},
{anti_entropy, {on, []}},
{anti_entropy_build_limit, {1, 3600000}},
{anti_entropy_expire, 604800000},
{anti_entropy_concurrency, 2},
{anti_entropy_tick, 15000},
{anti_entropy_data_dir, "/storage/riak/anti_entropy"},
{anti_entropy_leveldb_opts, [{write_buffer_size, 4194304},
{max_open_files, 20}]},
{mapred_name, "mapred"},
{mapred_2i_pipe, true},
{map_js_vm_count, 16 },
{reduce_js_vm_count, 12 },
{hook_js_vm_count, 20 },
{js_max_vm_mem, 8},
{js_thread_stack, 16},
{js_source_dir, "/etc/riak/mapreduce/js_source"},
{http_url_encoding, on},
{vnode_vclocks, true},
{listkeys_backpressure, true},
{vnode_mailbox_limit, {1, 5000}}
]},
{riak_search, [
{enabled, true}
]},
{merge_index, [
{data_root, "/storage/riak/merge_index"},
{buffer_rollover_size, 1048576},
{max_compact_segments, 20}
]},
{bitcask, [
{data_root, "/storage/riak/bitcask"}
]},
{eleveldb, [
{cache_size, 1024},
{max_open_files, 64},
{data_root, "/storage/riak/leveldb"}
]},
{lager, [
{handlers, [
{lager_file_backend, [
{"/var/log/riak/error.log", error, 10485760,
"$D0", 5},
{"/var/log/riak/console.log", info, 10485760,
"$D0", 5}
]}
] },
{crash_log, "/var/log/riak/crash.log"},
{crash_log_msg_size, 65536},
{crash_log_size, 10485760},
{crash_log_date, "$D0"},
{crash_log_count, 5},
{error_logger_redirect, true}
]},
{riak_sysmon, [
{process_limit, 30},
{port_limit, 2},
{gc_ms_limit, 0},
{heap_word_limit, 40111000},
{busy_port, true},
{busy_dist_port, true}
]},
{sasl, [
{sasl_error_logger, false}
]},
Sorry to bug you with such a long e-mail but I wanted to be as thorough as
possible. I tried raising a few options but it didn't help: map_js_vm_count,
reduce_js_vm_count, js_max_vm_mem
I also tried adding a timeout argument to the map reduce caller code but even
if I set it to 60,000 or more (this is milliseconds), the script is terminating
with timeout error after 10-12 secs. The same behaviour is observed if I use
http instead of pbc.
What seems to be the problem? Is this a matter of configuration? I am surprised
about the fact that the job runs with 20-25 days of data and not more.
thank you for your efforts,
Deyan_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com