(inline replies)
On 04-11-2011 00:21, Jon Meredith wrote:
Hi Erik,
Apologies it's taken so long to get back to you.
No worries; a few days is not long to wait for a clear and complete answer.
(Besides, the theme *was* latency... ;-) )
Durable writes:
You're interpreting it correctly. DW means that the storage backend
has accepted the write. As the backends are pluggable and
configurable so that affects what durable means. For bitcask you can
control the sync strategy and there are similar controls for innodb.
For the memory backend there is no durable write. With hindsight it
would have been better to have used something like accepted write (AW)
and write (W) rather than W/DW, but we're fairly stuck with it now.
I agree - naming is important though. I'm afraid I'm guilty of
propagating my initial misconceptions of the semantics of DW, which were
mostly due to how I though it "must be"...
Could you perhaps clarify on the Basho wiki that "commit to durable
storage" means that the backend has received the data, but that whether
this means that the data have also been persisted is up to the backend
in question - that the data might still be in the hands of software,
rather than hardware?
(e.g. at http://wiki.basho.com/Basic-Riak-API-Operations.html)
Combining writes/acceptance is a very interesting idea going forward,
but doesn't fit well with the sync nature of the backend API we have
currently.
No, I understand that.
Even so, I'll hazard the following suggestion as to how a
backwards-compatible API change might be:
To the present put/5 signature:
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()],
binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
add the return option
{ok, state(), buffered | persisted}
and add a flushing function:
-spec persist(state()) ->
{ok, state()}.
This function is only ever called if put() returns {ok, _, buffered}.
With this change, if riak_kv_vnode ever receives an {ok, _, buffered}
return value, it can set up a persist timer.
(I say riak_kv_vnode, but in the first iteration it could in principle
just be a wrapper backend.)
Parallel reads:
With 1.0.0 we've introduced a thread pool to increase the
concurrency vnodes can use for listing keys. I'd like to improve on
read concurrency. The current architecture ensures that a key is only
updated by a single thread which makes writing backend drivers
simpler. We either need to add support to the k/v vnode to ensure the
property is true when being updated in parallel or change the backend
drivers to be tolerant of it.
The performance numbers are interesting. How many vnodes were you
simulating?
For the 30-45% numbers, I had three "vnodes", each accessing a separate
1GB+ file.
Elaborating a bit:
Results from a Ubuntu Linux (2.6.32) laptop - note that results are
rather I/O scheduler dependent:
/---- Scheduler=CFQ ("Completely fair queueing")
serial : 24.7593 seconds total
serial_sorted : 16.5154 seconds total // 50% tp
improvement
parallel : 24.5351 seconds total// 1% tp
improvement
// Note: performance seems to be hindered by the "Fair" part of the
scheduling algorithm,
// which tries to give separate processes equal bandwidth to the disk.
/---- Scheduler=Anticipatory
serial : 24.7583 seconds total
serial_sorted : 16.3723 seconds total // 51% tp
improvement
parallel : 19.3194 seconds total // 28% tp
improvement
/---- Scheduler=Deadline
serial : 21.7606 seconds total
serial_sorted : 17.5556 seconds total // 24% tp
improvement
parallel : 16.3663 seconds total// 33% tp
improvement
/---- Scheduler=Noop
serial : 21.7646 seconds total
serial_sorted : 17.4545 seconds total// 25% tp
improvement
parallel : 15.7670 seconds total// 38% tp
improvement
These are from a single-run test; I note that the 'parallel' numbers are
in the lower end of the range I quoted.
Test code is attached. (Much of the code isn't used, as there was a bit
of exploration involved.)
The above results were produced by running
parread:multi_vnode_test_main(["/tmp/erk1G","/tmp/erk1Gb","/tmp/erk1Gc"]).
where the erkXX are separate files of each 1GB. (Each is used by one
"vnode", so this is a 3-vnode-test.)
The "clear_disk_cache_for_testing_purposes" script is of course system
dependent; this one needs to be suid root (or the test program run as
root, but don't do that unless you have reviewed the code well).
/Erik
Jon.
On Mon, Oct 31, 2011 at 10:38 AM, Erik Søe Sørensen <[email protected]
<mailto:[email protected]>> wrote:
The following is a couple of questions (and suggestions) regarding the
technical sides of Riak performance and reliability.
The questions have been prompted by reading Riak source code and by
discussions within our company.
I suppose the common thread here is "latency hiding"...
Durable Writes.
---------------
The default for bitcask's 'sync_strategy' setting is not to flush
to disk explicitly at all.
This means that a 'Durable Write' isn't actually durable; the
difference between 'W' and 'DW' replies is whether the write has
made it past Riak, to the OS - but not through the OS and down to
disk.
Is this correct?
What I'd have expected, as a reaonably-performing alternative, is
that Riak would flush periodically - say, after at most W_flush
writes or MS_flush milliseconds, and send 'dw' replies for all of
the relevant requests (those written since last flush) at once
after the flush has been completed.
This would combine 'real' DW semantics with reasonable performance
(and is how I have handled a similar problem; my conceptions about
what is right and proper may of course be influenced by my own
coding history...).
(For kicks, MS_flush might even be dynamically determined by how
long the flush operations tend to take; the typical value of a
flush duration, multiplied by a small constant, would probably be
a fitting value.)
Parallel Reads.
---------------
Within a vnode, bitcask read operations happen in serial.
Is there any reason for reads not happening in parallel?
For map/reduce operations, in particular, I imagine this might
make a difference, by giving the OS the opportunity to schedule
disk accesses so as to reduce seek time.
(Unless of course Riak itself reorders the keys before reading,
but I don't believe this is the case - especially since the order
would depend on the backend: for bitcask, by time; for innostore,
by key order, for instance.)
Of course, if each host has multiple vnodes, there will be some
parallellity even with serialized reads within each bitcask.
Regards,
Erik Søe Sørensen
_______________________________________________
riak-users mailing list
[email protected] <mailto:[email protected]>
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
--
Jon Meredith
Platform Engineering Manager
Basho Technologies, Inc.
[email protected] <mailto:[email protected]>
#!/bin/bash
echo 3 | sudo tee /proc/sys/vm/drop_caches || echo "ERROR: $?" >&2
-module(parread).
-compile(export_all).
%%% To use this test, you need a script called
%%% clear_disk_cache_for_testing_purposes
%%% which clears the disk caches so that all reads are actual disk reads.
multi_vnode_test_main(FileNames) ->
BlockSize = 1024,
N = 1000,
TestFuns = [{serial, fun test/3},
{serial_sorted, fun stest/3},
{parallel, fun p2test/3}],
R = [begin
io:format("Trying ~s...\n", [Name]),
clear_disk_cache(),
Before = os:timestamp(),
multi_vnode_test(TestFun, FileNames, BlockSize, N),
After = os:timestamp(),
{Name, timer:now_diff(After, Before) * 1.0e-6}
end
|| {Name,TestFun} <- TestFuns],
[io:format("~-30s: ~8.4f seconds total\n", [Name,TotalSecs])
|| {Name, TotalSecs} <- R],
ok.
clear_disk_cache() ->
os:cmd("./clear_disk_cache_for_testing_purposes").
%%====================
test2(FileName, BlockSize, N, P) ->
[spawn(fun() ->
{A,B,C} = now(),
random:seed(A,B,C),
io:format("~p\n", [test(FileName, BlockSize, N)])
end)
|| _ <- lists:seq(1,P)].
rtest2(FileName, BlockSize, N, P) ->
[spawn(fun() ->
{A,B,C} = now(),
random:seed(A,B,C),
io:format("~p\n", [rtest(FileName, BlockSize, N)])
end)
|| _ <- lists:seq(1,P)].
p2test2(FileName, BlockSize, N, P) ->
[spawn(fun() ->
{A,B,C} = now(),
random:seed(A,B,C),
io:format("~p\n", [p2test(FileName, BlockSize, N)])
end)
|| _ <- lists:seq(1,P)].
multi_vnode_test(TestFun, FileNames, BlockSize, N) ->
R = parmap(fun(FileName) ->
{A,B,C} = now(),
random:seed(A,B,C),
TestFun(FileName, BlockSize, N)
end,
FileNames),
R.
parmap(Fun, Args) ->
Parent = self(),
Pids = [spawn(fun() -> Parent ! {self(), Fun(A)} end) || A <- Args],
Res = [receive {P,R} -> R end || P <- Pids],
io:format("parmap(~p,~p) ->\n ~p\n", [Fun, Args, Res]),
Res.
%%====================
test(FileName, BlockSize, N) -> % Test sync. non-raw reads.
test(FileName, BlockSize, N, fun loop/3, [], sync_non_raw).
rtest(FileName, BlockSize, N) -> % Test sync. raw reads.
test(FileName, BlockSize, N, fun loop/3, [raw], sync_raw).
ptest(FileName, BlockSize, N) -> % Test async. non-raw reads.
test(FileName, BlockSize, N, fun ploop/3, [], async_non_raw).
stest(FileName, BlockSize, N) -> % Test async. non-raw reads.
test(FileName, BlockSize, N, fun sloop/3, [], async_non_raw).
gtest(FileName, BlockSize, N) -> % Test async. non-raw reads.
test(FileName, BlockSize, N, fun gloop/3, [], grouped).
test(FileName, BlockSize, N, LoopFun, OpenOpts, Tag) ->
{ok, Fd} = file:open(FileName, [read, binary | OpenOpts]),
{ok, Sz} = file:position(Fd, eof),
Reqs = request_set(Sz-BlockSize, N),
Before = os:timestamp(),
LoopFun(Fd, BlockSize, Reqs),
After = os:timestamp(),
file:close(Fd),
Elapsed = timer:now_diff(After,Before),
{result, Tag, Elapsed / 1.0e6, float(Elapsed) / N}.
p2test(FileName, BlockSize, N) -> % Test async. non-raw reads.
test_by_filename(FileName, BlockSize, N, fun p2loop/3, [], parallel_by_filename).
test_by_filename(FileName, BlockSize, N, LoopFun, OpenOpts, Tag) ->
{ok, Fd} = file:open(FileName, [read, binary | OpenOpts]),
{ok, Sz} = file:position(Fd, eof),
Reqs = request_set(Sz-BlockSize, N),
Before = os:timestamp(),
LoopFun(FileName, BlockSize, Reqs),
After = os:timestamp(),
file:close(Fd),
Elapsed = timer:now_diff(After,Before),
{result, Tag, Elapsed / 1.0e6, float(Elapsed) / N}.
%%==========
sloop(Fd, BlockSize, Reqs) ->
loop(Fd, BlockSize, lists:sort(Reqs)).
loop(_Fd, _BlockSize, []) -> ok;
loop(Fd, BlockSize, [Pos|Rest]) ->
read_block_synchronously(Fd, Pos, BlockSize),
loop(Fd, BlockSize, Rest).
%%==========
gloop(_Fd, _BlockSize, []) -> ok;
gloop(Fd, BlockSize, Reqs) ->
{Group, Rest} = try lists:split(50, Reqs)
catch error:badarg -> {Reqs, []}
end,
read_block_group(Fd, [{Pos, BlockSize} || Pos <- Group]),
gloop(Fd, BlockSize, Rest).
%%==========
ploop(Fd, BlockSize, Reqs) ->
ploop(Fd, BlockSize, 0, Reqs).
ploop(_Fd, _BlockSize, 0, []) -> ok;
ploop(Fd, BlockSize, Outstanding, []) ->
wait_for_outstanding(),
ploop(Fd, BlockSize, Outstanding-1, []);
ploop(Fd, BlockSize, Outstanding, [Pos|Rest]=Reqs) ->
if Outstanding >= 50 ->
%% No slots free.
wait_for_outstanding(),
ploop(Fd, BlockSize, Outstanding-1, Reqs);
true ->
read_block_asynchronously(Fd, Pos, BlockSize),
ploop(Fd, BlockSize, Outstanding+1, Rest)
end.
%%==========
p2loop(Filename, BlockSize, Reqs) ->
p2loop(Filename, BlockSize, 0, Reqs).
p2loop(_Filename, _BlockSize, 0, []) -> ok;
p2loop(Filename, BlockSize, Outstanding, []) ->
wait_for_outstanding(),
p2loop(Filename, BlockSize, Outstanding-1, []);
p2loop(Filename, BlockSize, Outstanding, [Pos|Rest]=Reqs) ->
if Outstanding >= 50 ->
%% No slots free.
wait_for_outstanding(),
p2loop(Filename, BlockSize, Outstanding-1, Reqs);
true ->
read_block_asynchronously_from_filename(Filename, Pos, BlockSize),
p2loop(Filename, BlockSize, Outstanding+1, Rest)
end.
%%==========
request_set(StartLimit, N) ->
[random:uniform(StartLimit) || _ <- lists:seq(1,N)].
read_block_synchronously(Fd, Pos, BlockSize) ->
{ok, _Data} = file:pread(Fd, Pos, BlockSize).
read_block_group(Fd, Group) ->
io:format("DB| group size: ~p\n", [length(Group)]),
{ok, _Datas} = file:pread(Fd, Group).
read_block_asynchronously(Fd, Pos, BlockSize) ->
Owner = self(),
Pid = spawn(fun() ->
%% io:format("DB| started: ~p @ ~p\n", [self(), Pos]),
try file:pread(Fd, Pos, BlockSize)
of {ok, Data} ->
%% io:format("DB| done: ~p @ ~p\n", [self(), Pos]),
Owner ! {read_done, self(), Data};
Error1 -> io:format("DB| error: ~p :: ~p\n", [self(), Error1])
catch _:Error ->
io:format("DB| error: ~p :: ~p\n", [self(), Error])
end
end),
Pid.
read_block_asynchronously_from_filename(FileName, Pos, BlockSize) ->
Owner = self(),
Pid = spawn(fun() ->
{ok, Fd} = file:open(FileName, [read, binary, raw]),
%% io:format("DB| started: ~p @ ~p\n", [self(), Pos]),
try file:pread(Fd, Pos, BlockSize)
of {ok, Data} ->
%% io:format("DB| done: ~p @ ~p\n", [self(), Pos]),
Owner ! {read_done, self(), Data};
Error1 -> io:format("DB| error: ~p :: ~p\n", [self(), Error1])
catch _:Error ->
io:format("DB| error: ~p :: ~p\n", [self(), Error])
end
end),
Pid.
wait_for_outstanding() ->
receive {read_done, _Ref, _Data} -> ok
after 5000 -> error(timeout)
end.
_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com