jaydoane commented on code in PR #5399: URL: https://github.com/apache/couchdb/pull/5399#discussion_r1962108070
########## src/couch/src/couch_cfile.erl: ########## @@ -0,0 +1,284 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This module can dup(licate)s raw file handles to create file handles which +% allow other proceses to issue pread calls. This lets clients completely +% bypass the couch_file gen_server message queue to do reads. +% +% At the POSIX API level pread() functions are thread-safe so calls can be +% issued in parallel by multiple threads. See these links to find out more +% about dup() and pread(): +% +% - https://www.man7.org/linux/man-pages/man2/dup.2.html +% - https://www.man7.org/linux/man-pages/man2/pread.2.html + +-module(couch_cfile). + +-export([ + dup/1, + pread/2, + pread/3, + close/1, + position/2, + datasync/1, + write/2, + truncate/1, + fd/1, + advise/4 +]). + +% Internal exports +% +-export([ + janitor/0 +]). + +-on_load(init/0). + +-nifs([ + dup_nif/1, + close_nif/1, + close_fd_nif/1, + pread_nif/3, + info_nif/1, + eof_nif/1, + seek_nif/3, + write_nif/2, + datasync_nif/1, + truncate_nif/1 +]). + +-include_lib("kernel/include/file.hrl"). + +% Duplicate an open file handle The dup-ed handle will reference the same "file +% description" as the prim_file raw handle. After duplicating, the original +% prim_file handle can be closed. +% +% Handles returned from dup/1 follow the standard Erlang/OTP #file_descriptor{} +% "protocol", so they can be be transparently used by regular `file` module for +% pread, write, truncate and position calls. +% +dup(#file_descriptor{module = prim_file} = Fd) -> + case fd(Fd) of + {ok, FdInt} -> + case dup_nif(FdInt) of + {ok, Ref} -> make_handle(Fd, Ref); + {error, _} = Error -> Error + end; + {error, _} = Error -> + Error + end; +dup(_) -> + {error, einval}. + +close(#file_descriptor{module = ?MODULE} = Fd) -> + close_nif(owner_handle(Fd)); +close(_) -> + {error, einval}. + +pread(#file_descriptor{module = ?MODULE} = Fd, Pos, Len) -> + pread_nif(handle(Fd), Pos, Len); +pread(_, _, _) -> + {error, einval}. + +pread(#file_descriptor{module = ?MODULE} = Fd, LocNums) -> + pread_list(handle(Fd), LocNums, []); +pread(_, _) -> + {error, einval}. + +% Only position(Fd, eof|Pos) are supported. The variant eof one can be +% used by other processes. Only the owner can change position via the lseek API +% call Readers (non-owners) can still call file:position(Fd, eof) to get the Review Comment: ```suggestion % call. Readers (non-owners) can still call file:position(Fd, eof) to get the ``` ########## src/couch/src/couch_file.erl: ########## @@ -190,11 +195,50 @@ pread_binaries(Fd, PosList) -> ZipFun = fun(Pos, {IoList, Checksum}) -> verify_checksum(Fd, Pos, iolist_to_binary(IoList), Checksum, false) end, - case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of + case pread_iolists(Fd, PosList) of {ok, DataAndChecksums} -> {ok, lists:zipwith(ZipFun, PosList, DataAndChecksums)}; Error -> Error end. +pread_iolists(Fd, PosList) -> + IoqPriority = erlang:get(io_priority), + IoqMsg = {pread_iolists, PosList}, + case {get_cfile(Fd), cfile_skip_ioq()} of + {undefined, _} -> + % No cfile, that's fine, do what we always did + ioq:call(Fd, IoqMsg, IoqPriority); + {#file{} = CFile, true} -> + % Skip the IOQ if we have a cfile handle. Use this option on a + % system with enough RAM for the page cache and plenty of IO + % bandwidth + parallel_pread(CFile, PosList); + {#file{} = CFile, false} -> + % Use parallel preads only if the request would be bypassed by the + % IOQ. All three compatible ioqs (the two from the + % apache/couchdb-ioq, and the source default one) currently do not + % know how to call a function, they all expect to send a + % '$gen_call' message to a gen_server. Until we figure out how + % teach the IOQ(s) to call an MFA we can let them keep calling the + % main file descriptor gen_server. + % Review Comment: ```suggestion ``` ########## src/couch/src/couch_file.erl: ########## @@ -190,11 +195,50 @@ pread_binaries(Fd, PosList) -> ZipFun = fun(Pos, {IoList, Checksum}) -> verify_checksum(Fd, Pos, iolist_to_binary(IoList), Checksum, false) end, - case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of + case pread_iolists(Fd, PosList) of {ok, DataAndChecksums} -> {ok, lists:zipwith(ZipFun, PosList, DataAndChecksums)}; Error -> Error end. +pread_iolists(Fd, PosList) -> + IoqPriority = erlang:get(io_priority), + IoqMsg = {pread_iolists, PosList}, + case {get_cfile(Fd), cfile_skip_ioq()} of + {undefined, _} -> + % No cfile, that's fine, do what we always did + ioq:call(Fd, IoqMsg, IoqPriority); + {#file{} = CFile, true} -> + % Skip the IOQ if we have a cfile handle. Use this option on a + % system with enough RAM for the page cache and plenty of IO + % bandwidth + parallel_pread(CFile, PosList); + {#file{} = CFile, false} -> + % Use parallel preads only if the request would be bypassed by the + % IOQ. All three compatible ioqs (the two from the Review Comment: An elegant workaround to this IOQ limitation! ########## src/couch/test/eunit/couch_cfile_tests.erl: ########## @@ -0,0 +1,534 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_cfile_tests). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(CONCURRENT_READER_JITTER_MSEC, 5). + +couch_cfile_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + case os:type() of + {win32, _} -> + [ + ?TDEF_FE(t_unsupported) + ]; + {_, _} -> + [ + ?TDEF_FE(t_basics), + ?TDEF_FE(t_pread_and_position), + ?TDEF_FE(t_pread_from_other_procesess), + ?TDEF_FE(t_write), + ?TDEF_FE(t_datasync), + ?TDEF_FE(t_position_and_truncate), + ?TDEF_FE(t_advise), + ?TDEF_FE(t_invalid_fd), + ?TDEF_FE(t_fd), + ?TDEF_FE(t_cannot_dup_cfile_handle), + ?TDEF_FE(t_gc_is_closing_file_handles), + ?TDEF_FE(t_monitor_is_closing_file_handles), + ?TDEF_FE(t_janitor_proc_is_up), + ?TDEF_FE(t_concurrent_reads_512b), + ?TDEF_FE(t_concurrent_reads_4kb), + ?TDEF_FE(t_concurrent_reads_1mb) + ] + end + }. + +setup() -> + ?tempfile(). + +teardown(Path) -> + catch file:delete(Path). + +open_raw(Path) -> + % Use the options couch_file is using + {ok, Fd} = file:open(Path, [binary, append, raw, read]), + ok = file:write(Fd, <<"abcd">>), + Fd. + +t_basics(Path) -> + Fd = open_raw(Path), + Res = couch_cfile:dup(Fd), + ?assertMatch({ok, _}, Res), + {ok, CFd} = Res, + ?assertEqual({ok, <<"ab">>}, couch_cfile:pread(CFd, 0, 2)), + file:close(Fd), + % Check dup-ing a closed raw file descriptor + ?assertEqual({error, einval}, couch_cfile:dup(Fd)), + ?assertEqual({ok, [<<"ab">>]}, couch_cfile:pread(CFd, [{0, 2}])), + ?assertMatch({ok, Int} when is_integer(Int), couch_cfile:fd(CFd)), + ?assertEqual(ok, couch_cfile:close(CFd)), + ?assertEqual({error, einval}, couch_cfile:pread(CFd, [{0, 2}])). + +t_pread_and_position(Path) -> + % Note: we'll be reading using 'file' module functions even for cfile + % handles. We're specifically acting like an OTP file layer to ensure we + % don't have to duplicate our couch_file code + + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + + % Check positions + {ok, Eof} = file:position(Fd, eof), + {ok, CFdEof} = file:position(CFd, eof), + ?assertEqual(Eof, CFdEof), + + % Basic preads + ?assertEqual({ok, []}, file:pread(CFd, [])), + ?assertEqual({ok, [eof]}, file:pread(CFd, [{0, 0}])), + ?assertEqual(eof, file:pread(CFd, 0, 0)), + ?assertEqual({ok, [<<"a">>]}, file:pread(CFd, [{0, 1}])), + ?assertEqual({ok, <<"a">>}, file:pread(CFd, 0, 1)), + ?assertEqual({ok, <<"d">>}, file:pread(CFd, 3, 1)), + ?assertEqual({ok, <<"d">>}, file:pread(CFd, 3, 2)), + ?assertEqual({ok, [<<"a">>, eof]}, file:pread(CFd, [{0, 1}, {4, 1}])), + ?assertEqual({error, einval}, couch_cfile:pread(junk, [{0, 1}])), + ?assertEqual({error, badarg}, file:pread(CFd, junk)), + + % Most of all we care that behavior matches with wtih file:pread/1 for + % any combination of valid/invalid/eof ranges + [ + ?assertEqual(file:pread(Fd, P, L), file:pread(CFd, P, L)) + || P <- lists:seq(-1, Eof + 1) ++ [1 bsl 42], + L <- lists:seq(-1, Eof + 1) ++ [1 bsl 42] + ], + + % Positions and preads are updated after a write + ok = file:write(Fd, <<"ef">>), + + % Check new positions + {ok, Eof1} = file:position(Fd, eof), + {ok, CFdEof1} = file:position(CFd, eof), + ?assertEqual(Eof1, CFdEof1), + + [ + ?assertEqual(file:pread(Fd, P, L), file:pread(CFd, P, L)) + || P <- lists:seq(-1, Eof1 + 1), L <- lists:seq(-1, Eof1 + 1) + ], + + % File trunctions also is reflected in position and preads + {ok, 4} = file:position(Fd, 4), + ok = file:truncate(Fd), + + ?assertEqual({ok, 4}, file:position(Fd, eof)), + ?assertEqual({ok, 4}, file:position(CFd, eof)), + + {ok, 3} = file:position(Fd, 3), + {ok, 3} = file:position(CFd, 3), + + ok = file:truncate(CFd), + + ?assertEqual({ok, 3}, file:position(Fd, eof)), + ?assertEqual({ok, 3}, file:position(CFd, eof)), + + [ + ?assertEqual(file:pread(Fd, [{P, L}]), file:pread(CFd, [{P, L}])) + || P <- lists:seq(-1, 4), L <- lists:seq(-1, 4) + ], + + % Test closing behavior + ok = file:close(Fd), + ?assertEqual({error, einval}, file:pread(Fd, 0, 1)), + ?assertEqual({error, einval}, file:position(Fd, eof)), + + % Can still read from our dup-ed handle + ?assertEqual({ok, <<"a">>}, file:pread(CFd, 0, 1)), + ?assertEqual({ok, 3}, file:position(CFd, eof)), + + ok = file:close(CFd), + ?assertEqual({error, einval}, file:pread(CFd, 0, 1)), + ?assertEqual({error, einval}, file:position(CFd, eof)). + +t_pread_from_other_procesess(Path) -> + % Note: we'll be reading using 'file' module functions even for cfile + % handles. We're specifically acting like an OTP file layer to ensure we + % don't have to duplicate our couch_file code + + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + + Proc = spawn_proc(), + + {ok, Eof} = file:position(Fd, eof), + + ?assertEqual({ok, Eof}, file:position(CFd, eof)), + ?assertEqual({ok, Eof}, proc_run(Proc, file, position, [CFd, eof])), + + % Closing original raw fd should still keep ours open and we should still + % be able to read from it from this or other processes + ok = file:close(Fd), + + ?assertEqual({ok, [<<"a">>]}, file:pread(CFd, [{0, 1}])), + ?assertEqual({ok, [<<"a">>]}, proc_run(Proc, file, pread, [CFd, [{0, 1}]])), + + % Fd works from other process, just for completeness + {ok, FdInt} = couch_cfile:fd(CFd), + ?assertEqual({ok, FdInt}, proc_run(Proc, couch_cfile, fd, [CFd])), + + ok = file:close(CFd), + ?assertEqual({error, einval}, file:pread(CFd, [{0, 1}])), + ?assertEqual({error, einval}, proc_run(Proc, file, pread, [CFd, [{0, 1}]])), + + kill_proc(Proc). + +t_datasync(Path) -> + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + ok = file:close(Fd), + + {ok, Pos} = file:position(CFd, eof), + ?assertEqual(ok, file:datasync(CFd)), + ?assertEqual(ok, file:write(CFd, <<"x">>)), + ?assertEqual(ok, file:datasync(CFd)), + {ok, Pos1} = file:position(CFd, eof), + ?assertEqual(Pos + 1, Pos1), + ?assertEqual({ok, <<"x">>}, file:pread(CFd, Pos, 1)), + + % Try something larger + TwoMBs = <<<<"y">> || _ <- lists:seq(1, 1 bsl 21)>>, + ?assertEqual(ok, file:write(CFd, TwoMBs)), + ?assertEqual(ok, file:datasync(CFd)), + {ok, Pos2} = file:position(CFd, eof), + ?assertEqual(Pos1 + (1 bsl 21), Pos2), + + % 10 in a row + lists:foreach( + fun(_) -> + ?assertEqual(ok, file:datasync(CFd)) + end, + lists:seq(1, 10) + ), + + % Others can't datasync + Proc = spawn_proc(), + Expect = {exc, error, not_on_controlling_process}, + ?assertEqual(Expect, proc_run(Proc, file, datasync, [CFd])), + kill_proc(Proc), + + % Can't datasync after closing + ok = file:close(CFd), + ?assertEqual({error, einval}, file:datasync(CFd)). + +t_write(Path) -> + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + ok = file:close(Fd), + + {ok, Pos} = file:position(CFd, eof), + + ?assertEqual(ok, file:write(CFd, <<"x">>)), + {ok, Pos1} = file:position(CFd, eof), + ?assertEqual(Pos + 1, Pos1), + ?assertEqual({ok, <<"x">>}, file:pread(CFd, Pos, 1)), + TwoMBs = <<<<"y">> || _ <- lists:seq(1, 1 bsl 21)>>, + ?assertEqual(ok, file:write(CFd, TwoMBs)), + {ok, Pos2} = file:position(CFd, eof), + ?assertEqual(Pos1 + (1 bsl 21), Pos2), + + {ok, ReadTwoMBs} = file:pread(CFd, Pos1, 1 bsl 21), + ?assertEqual(byte_size(TwoMBs), byte_size(ReadTwoMBs)), + ?assertEqual(TwoMBs, ReadTwoMBs), + + % Others can't write + Proc = spawn_proc(), + Expect = {exc, error, not_on_controlling_process}, + ?assertEqual(Expect, proc_run(Proc, file, write, [CFd, <<"y">>])), + kill_proc(Proc), + + % Can't write after closing + ?assertEqual(ok, file:close(CFd)), + ?assertEqual({error, einval}, file:write(CFd, <<"z">>)). + +t_position_and_truncate(Path) -> + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + ok = file:close(Fd), + + {ok, Pos} = file:position(CFd, eof), + ?assert(Pos > 0), + ?assertEqual({ok, 0}, file:position(CFd, 0)), + ?assertEqual(ok, file:truncate(CFd)), + {ok, Pos1} = file:position(CFd, eof), + ?assertEqual(eof, file:pread(CFd, 0, 1)), + ?assertEqual(Pos1, 0), + + ok = file:write(CFd, <<"abc">>), + ?assertEqual({ok, 1}, file:position(CFd, 1)), + ?assertEqual(ok, file:truncate(CFd)), + ?assertEqual({ok, <<"a">>}, file:pread(CFd, 0, 10)), + + Proc = spawn_proc(), + + % Others can't do absolute position changes or truncate + Expect = {exc, error, not_on_controlling_process}, + ?assertEqual(Expect, proc_run(Proc, file, position, [CFd, 1])), + ?assertEqual(Expect, proc_run(Proc, file, truncate, [CFd])), + + % Others can call position(Fd, eof) to get the file size + ?assertEqual({ok, 1}, proc_run(Proc, file, position, [CFd, eof])), + + kill_proc(Proc), + + % After closing, can't truncate or position + ok = file:close(CFd), + ?assertEqual({error, einval}, file:position(CFd, 42)), + ?assertEqual({error, einval}, file:truncate(CFd)). + +t_advise(Path) -> + % This is one optional so not implemented as a nif + % we just check that it behaves reasonably + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + ok = file:close(Fd), + + ?assertEqual(ok, file:advise(CFd, 42, 42, dont_need)), + + % Others can't call it + Proc = spawn_proc(), + Expect = {exc, error, not_on_controlling_process}, + Args = [CFd, 42, 42, dont_need], + ?assertEqual(Expect, proc_run(Proc, file, advise, Args)), + kill_proc(Proc). + +t_invalid_fd(_Path) -> + ?assertEqual({error, einval}, couch_cfile:dup(junk)), + ?assertEqual({error, einval}, couch_cfile:pread(junk, 1, 1)), + ?assertEqual({error, einval}, couch_cfile:close(junk)), + ?assertEqual({error, einval}, couch_cfile:fd(junk)), + ?assertEqual({error, einval}, couch_cfile:position(junk, eof)). + +t_fd(Path) -> + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + + {ok, FdInt} = couch_cfile:fd(Fd), + {ok, CFdInt} = couch_cfile:fd(CFd), + ?assert(is_integer(FdInt) andalso FdInt > -1), + ?assert(is_integer(CFdInt) andalso CFdInt > -1), + + ?assertEqual({error, einval}, couch_cfile:fd(potato)), + + % We can't say a whole lot more just that both are + % not equal since they are both open and one is dup-ed + % from the other + ?assertNotEqual(FdInt, CFdInt), + + ok = file:close(Fd), + ok = file:close(CFd), + + % Here we check our sanity-checker: after handles are closed we cannot get + % any access to them. In the sanity checker we access the int fds after + % dup-ing in order to assert that we still have access to the same file + % handles we started with. + ?assertEqual({error, einval}, couch_cfile:fd(Fd)), + ?assertEqual({error, einval}, couch_cfile:fd(CFd)). + +t_janitor_proc_is_up(Path) -> + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + couch_cfile:close(CFd), + ok = file:close(Fd), + ?assertEqual(true, is_process_alive(whereis(couch_cfile))). + +t_unsupported(Fd) -> + ?assertEqual({error, einval}, couch_cfile:dup(Fd)), + ?assertEqual({error, einval}, couch_cfile:pread(Fd, 1, 1)), + ?assertEqual({error, einval}, couch_cfile:close(Fd)), + ?assertEqual({error, einval}, couch_cfile:fd(Fd)), + ?assertEqual({error, einval}, couch_cfile:position(Fd, eof)). + +t_cannot_dup_cfile_handle(Path) -> + Fd = open_raw(Path), + {ok, CFd0} = couch_cfile:dup(Fd), + ok = file:close(Fd), + ?assertEqual({error, einval}, couch_cfile:dup(CFd0)). + +t_gc_is_closing_file_handles(Path) -> + Fd = open_raw(Path), + {ok, FdInt} = couch_cfile:fd(Fd), + % Since we'll be checking the janitor, send it some junk message + % it should cope with them by dropping them (like the OTP one) + whereis(couch_cfile) ! {some_junk, message}, + Cnt = 750, + {_, Ref} = spawn_monitor(fun() -> + Fd1 = open_raw(Path), + lists:foreach(fun(_) -> couch_cfile:dup(Fd1) end, lists:seq(1, Cnt)) + end), + receive + {'DOWN', Ref, _, _, _} -> ok + end, + % According the dup() docs: + % + % "The new file descriptor number is guaranteed to be the lowest-numbered + % file descriptor that was unused in the calling process." + % + % Unless during the test something else opened another Cnt descriptors, if + % we open another one we should get something lower than FdInt + Cnt + {ok, Fd2} = couch_cfile:dup(Fd), + {ok, FdInt1} = couch_cfile:fd(Fd2), + ?assert(FdInt1 =< FdInt + Cnt), + ok = file:close(Fd), + ok = file:close(Fd2). + +t_monitor_is_closing_file_handles(Path) -> + Proc = spawn_proc(), + {ok, Fd} = proc_run(Proc, file, open, [Path, [binary, append, raw, read]]), + ?assertError(not_on_controlling_process, couch_cfile:dup(Fd)), + {ok, CFd} = proc_run(Proc, couch_cfile, dup, [Fd]), + ?assertEqual(eof, file:pread(CFd, 0, 1)), + kill_proc(Proc), + ?assertEqual({error, einval}, file:pread(CFd, 0, 1)). + +t_concurrent_reads_512b(Path) -> + Fd = cfile(Path), + Eof = write(Fd, 0, 512), + ReadersPidRefs = spawn_readers(20, Fd, Eof), + timer:sleep(2000), Review Comment: ``` module 'couch_cfile_tests' couch_cfile_tests:31: -couch_cfile_test_/0-fun-32- (t_basics)...ok couch_cfile_tests:32: -couch_cfile_test_/0-fun-30- (t_pread_and_position)...[0.008 s] ok couch_cfile_tests:33: -couch_cfile_test_/0-fun-28- (t_pread_from_other_procesess)...ok couch_cfile_tests:34: -couch_cfile_test_/0-fun-26- (t_write)...[0.074 s] ok couch_cfile_tests:35: -couch_cfile_test_/0-fun-24- (t_datasync)...[0.072 s] ok couch_cfile_tests:36: -couch_cfile_test_/0-fun-22- (t_position_and_truncate)...[0.001 s] ok couch_cfile_tests:37: -couch_cfile_test_/0-fun-20- (t_advise)...ok couch_cfile_tests:38: -couch_cfile_test_/0-fun-18- (t_invalid_fd)...ok couch_cfile_tests:39: -couch_cfile_test_/0-fun-16- (t_fd)...ok couch_cfile_tests:40: -couch_cfile_test_/0-fun-14- (t_cannot_dup_cfile_handle)...ok couch_cfile_tests:41: -couch_cfile_test_/0-fun-12- (t_gc_is_closing_file_handles)...[0.007 s] ok couch_cfile_tests:42: -couch_cfile_test_/0-fun-10- (t_monitor_is_closing_file_handles)...ok couch_cfile_tests:43: -couch_cfile_test_/0-fun-8- (t_janitor_proc_is_up)...ok couch_cfile_tests:44: -couch_cfile_test_/0-fun-6- (t_concurrent_reads_512b)...[2.001 s] ok couch_cfile_tests:45: -couch_cfile_test_/0-fun-4- (t_concurrent_reads_4kb)...[2.002 s] ok couch_cfile_tests:46: -couch_cfile_test_/0-fun-2- (t_concurrent_reads_1mb)...[2.057 s] ok [done in 6.278 s] ``` Seems like the 6 seconds of sleeping is the majority of time cost of these tests. Do you think lowering it would make the tests less robust? ########## src/couch/test/eunit/couch_cfile_tests.erl: ########## @@ -0,0 +1,534 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_cfile_tests). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(CONCURRENT_READER_JITTER_MSEC, 5). + +couch_cfile_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + case os:type() of + {win32, _} -> + [ + ?TDEF_FE(t_unsupported) + ]; + {_, _} -> + [ + ?TDEF_FE(t_basics), + ?TDEF_FE(t_pread_and_position), + ?TDEF_FE(t_pread_from_other_procesess), + ?TDEF_FE(t_write), + ?TDEF_FE(t_datasync), + ?TDEF_FE(t_position_and_truncate), + ?TDEF_FE(t_advise), + ?TDEF_FE(t_invalid_fd), + ?TDEF_FE(t_fd), + ?TDEF_FE(t_cannot_dup_cfile_handle), + ?TDEF_FE(t_gc_is_closing_file_handles), + ?TDEF_FE(t_monitor_is_closing_file_handles), + ?TDEF_FE(t_janitor_proc_is_up), + ?TDEF_FE(t_concurrent_reads_512b), + ?TDEF_FE(t_concurrent_reads_4kb), + ?TDEF_FE(t_concurrent_reads_1mb) + ] + end + }. + +setup() -> + ?tempfile(). + +teardown(Path) -> + catch file:delete(Path). + +open_raw(Path) -> + % Use the options couch_file is using + {ok, Fd} = file:open(Path, [binary, append, raw, read]), + ok = file:write(Fd, <<"abcd">>), + Fd. + +t_basics(Path) -> + Fd = open_raw(Path), + Res = couch_cfile:dup(Fd), + ?assertMatch({ok, _}, Res), + {ok, CFd} = Res, + ?assertEqual({ok, <<"ab">>}, couch_cfile:pread(CFd, 0, 2)), + file:close(Fd), + % Check dup-ing a closed raw file descriptor + ?assertEqual({error, einval}, couch_cfile:dup(Fd)), + ?assertEqual({ok, [<<"ab">>]}, couch_cfile:pread(CFd, [{0, 2}])), + ?assertMatch({ok, Int} when is_integer(Int), couch_cfile:fd(CFd)), + ?assertEqual(ok, couch_cfile:close(CFd)), + ?assertEqual({error, einval}, couch_cfile:pread(CFd, [{0, 2}])). + +t_pread_and_position(Path) -> + % Note: we'll be reading using 'file' module functions even for cfile + % handles. We're specifically acting like an OTP file layer to ensure we + % don't have to duplicate our couch_file code + + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + + % Check positions + {ok, Eof} = file:position(Fd, eof), + {ok, CFdEof} = file:position(CFd, eof), + ?assertEqual(Eof, CFdEof), + + % Basic preads + ?assertEqual({ok, []}, file:pread(CFd, [])), + ?assertEqual({ok, [eof]}, file:pread(CFd, [{0, 0}])), + ?assertEqual(eof, file:pread(CFd, 0, 0)), + ?assertEqual({ok, [<<"a">>]}, file:pread(CFd, [{0, 1}])), + ?assertEqual({ok, <<"a">>}, file:pread(CFd, 0, 1)), + ?assertEqual({ok, <<"d">>}, file:pread(CFd, 3, 1)), + ?assertEqual({ok, <<"d">>}, file:pread(CFd, 3, 2)), + ?assertEqual({ok, [<<"a">>, eof]}, file:pread(CFd, [{0, 1}, {4, 1}])), + ?assertEqual({error, einval}, couch_cfile:pread(junk, [{0, 1}])), + ?assertEqual({error, badarg}, file:pread(CFd, junk)), + + % Most of all we care that behavior matches with wtih file:pread/1 for + % any combination of valid/invalid/eof ranges + [ + ?assertEqual(file:pread(Fd, P, L), file:pread(CFd, P, L)) + || P <- lists:seq(-1, Eof + 1) ++ [1 bsl 42], + L <- lists:seq(-1, Eof + 1) ++ [1 bsl 42] + ], + + % Positions and preads are updated after a write + ok = file:write(Fd, <<"ef">>), + + % Check new positions + {ok, Eof1} = file:position(Fd, eof), + {ok, CFdEof1} = file:position(CFd, eof), + ?assertEqual(Eof1, CFdEof1), + + [ + ?assertEqual(file:pread(Fd, P, L), file:pread(CFd, P, L)) + || P <- lists:seq(-1, Eof1 + 1), L <- lists:seq(-1, Eof1 + 1) + ], + + % File trunctions also is reflected in position and preads Review Comment: ```suggestion % File truncation also is reflected in position and preads ``` ########## src/couch/src/couch_cfile.erl: ########## @@ -0,0 +1,284 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This module can dup(licate)s raw file handles to create file handles which +% allow other proceses to issue pread calls. This lets clients completely +% bypass the couch_file gen_server message queue to do reads. +% +% At the POSIX API level pread() functions are thread-safe so calls can be +% issued in parallel by multiple threads. See these links to find out more +% about dup() and pread(): +% +% - https://www.man7.org/linux/man-pages/man2/dup.2.html +% - https://www.man7.org/linux/man-pages/man2/pread.2.html + +-module(couch_cfile). + +-export([ + dup/1, + pread/2, + pread/3, + close/1, + position/2, + datasync/1, + write/2, + truncate/1, + fd/1, + advise/4 +]). + +% Internal exports +% +-export([ + janitor/0 +]). + +-on_load(init/0). + +-nifs([ + dup_nif/1, + close_nif/1, + close_fd_nif/1, + pread_nif/3, + info_nif/1, + eof_nif/1, + seek_nif/3, + write_nif/2, + datasync_nif/1, + truncate_nif/1 +]). + +-include_lib("kernel/include/file.hrl"). + +% Duplicate an open file handle The dup-ed handle will reference the same "file +% description" as the prim_file raw handle. After duplicating, the original +% prim_file handle can be closed. +% +% Handles returned from dup/1 follow the standard Erlang/OTP #file_descriptor{} +% "protocol", so they can be be transparently used by regular `file` module for +% pread, write, truncate and position calls. +% +dup(#file_descriptor{module = prim_file} = Fd) -> + case fd(Fd) of + {ok, FdInt} -> + case dup_nif(FdInt) of + {ok, Ref} -> make_handle(Fd, Ref); + {error, _} = Error -> Error + end; + {error, _} = Error -> + Error + end; +dup(_) -> + {error, einval}. + +close(#file_descriptor{module = ?MODULE} = Fd) -> + close_nif(owner_handle(Fd)); +close(_) -> + {error, einval}. + +pread(#file_descriptor{module = ?MODULE} = Fd, Pos, Len) -> + pread_nif(handle(Fd), Pos, Len); +pread(_, _, _) -> + {error, einval}. + +pread(#file_descriptor{module = ?MODULE} = Fd, LocNums) -> + pread_list(handle(Fd), LocNums, []); +pread(_, _) -> + {error, einval}. + +% Only position(Fd, eof|Pos) are supported. The variant eof one can be +% used by other processes. Only the owner can change position via the lseek API +% call Readers (non-owners) can still call file:position(Fd, eof) to get the +% size of the file but they'll get it via the fstat call. +% +position(#file_descriptor{module = ?MODULE, data = Data} = Fd, eof) -> + #{owner := Owner} = Data, + case self() =:= Owner of + true -> seek_nif(owner_handle(Fd), eof, 0); + false -> eof_nif(handle(Fd)) + end; +position(#file_descriptor{module = ?MODULE} = Fd, Pos) when is_integer(Pos), Pos >= 0 -> + seek_nif(owner_handle(Fd), bof, Pos); +position(_, _) -> + {error, einval}. + +datasync(#file_descriptor{module = ?MODULE} = Fd) -> + datasync_nif(owner_handle(Fd)); +datasync(_) -> + {error, einval}. + +write(#file_descriptor{module = ?MODULE} = Fd, IOData) -> + write_1(owner_handle(Fd), erlang:iolist_to_iovec(IOData)). + +truncate(#file_descriptor{module = ?MODULE} = Fd) -> + truncate_nif(owner_handle(Fd)). + +% Can use this for debugging to inspect the raw (integer) file descriptors +% +fd(#file_descriptor{module = prim_file} = RawFd) -> + case prim_file:get_handle(RawFd) of + <<FdInt:32/native-signed-integer>> -> {ok, FdInt}; + _ -> {error, einval} + end; +fd(#file_descriptor{module = ?MODULE, data = Data}) -> + #{handle := Ref} = Data, + case info_nif(Ref) of + {ok, {FdInt, _}} -> {ok, FdInt}; + {error, _} = Error -> Error + end; +fd(_) -> + {error, einval}. + +% Since this is optional to implement we skip it for now +% +advise(#file_descriptor{module = ?MODULE} = Fd, Offset, Length, Advice) when + is_integer(Offset) andalso Offset >= 0, + is_integer(Length) andalso Length >= 0, + is_atom(Advice) +-> + % Check the owner at least. If/when we implement this, only the owner will + % get to call it. + _ = owner_handle(Fd), + ok; +advise(_, _, _, _) -> + {error, einval}. + +% Internal helpers + +make_handle(#file_descriptor{module = prim_file} = Orig, Ref) -> + Data = #{handle => Ref, owner => self()}, + Dup = #file_descriptor{module = ?MODULE, data = Data}, + case sanity_check(Orig, Dup) of + true -> + {ok, Dup}; + false -> + close_nif(Ref), + {error, einval} + end. + +sanity_check(#file_descriptor{} = Orig, #file_descriptor{} = Dup) -> + % Compare original and dup-ed saved origin fds. This should run after the + % dup call. Not sure how this could fail (somehow the raw fd crashed and + % re-opened by someone else right before dup-ing?) but it's better to be + % safe than sorry here. Another important bit is re-fetching both + % descriptors implicitly is asserting they haven't closed in the meantime. + case fd(Orig) of + {ok, Fd1} when is_integer(Fd1), Fd1 > -1 -> + case info_nif(owner_handle(Dup)) of + {ok, {_, Fd2}} when is_integer(Fd2), Fd2 > -1 -> Fd1 =:= Fd2; + {ok, {_, _}} -> false; + {error, _} -> false + end; + _ -> + false Review Comment: Is it worth adding logging for these error cases, or do expect that they will never actually happen in real life? ########## src/couch/src/couch_cfile.erl: ########## @@ -0,0 +1,284 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This module can dup(licate)s raw file handles to create file handles which Review Comment: ```suggestion % This module can dup(licate) raw file handles to create file handles which ``` ########## src/couch/test/eunit/couch_cfile_tests.erl: ########## @@ -0,0 +1,534 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_cfile_tests). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(CONCURRENT_READER_JITTER_MSEC, 5). + +couch_cfile_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + case os:type() of + {win32, _} -> + [ + ?TDEF_FE(t_unsupported) + ]; + {_, _} -> + [ + ?TDEF_FE(t_basics), + ?TDEF_FE(t_pread_and_position), + ?TDEF_FE(t_pread_from_other_procesess), + ?TDEF_FE(t_write), + ?TDEF_FE(t_datasync), + ?TDEF_FE(t_position_and_truncate), + ?TDEF_FE(t_advise), + ?TDEF_FE(t_invalid_fd), + ?TDEF_FE(t_fd), + ?TDEF_FE(t_cannot_dup_cfile_handle), + ?TDEF_FE(t_gc_is_closing_file_handles), + ?TDEF_FE(t_monitor_is_closing_file_handles), + ?TDEF_FE(t_janitor_proc_is_up), + ?TDEF_FE(t_concurrent_reads_512b), + ?TDEF_FE(t_concurrent_reads_4kb), + ?TDEF_FE(t_concurrent_reads_1mb) + ] + end + }. + +setup() -> + ?tempfile(). + +teardown(Path) -> + catch file:delete(Path). + +open_raw(Path) -> + % Use the options couch_file is using + {ok, Fd} = file:open(Path, [binary, append, raw, read]), + ok = file:write(Fd, <<"abcd">>), + Fd. + +t_basics(Path) -> + Fd = open_raw(Path), + Res = couch_cfile:dup(Fd), + ?assertMatch({ok, _}, Res), + {ok, CFd} = Res, + ?assertEqual({ok, <<"ab">>}, couch_cfile:pread(CFd, 0, 2)), + file:close(Fd), + % Check dup-ing a closed raw file descriptor + ?assertEqual({error, einval}, couch_cfile:dup(Fd)), + ?assertEqual({ok, [<<"ab">>]}, couch_cfile:pread(CFd, [{0, 2}])), + ?assertMatch({ok, Int} when is_integer(Int), couch_cfile:fd(CFd)), + ?assertEqual(ok, couch_cfile:close(CFd)), + ?assertEqual({error, einval}, couch_cfile:pread(CFd, [{0, 2}])). + +t_pread_and_position(Path) -> + % Note: we'll be reading using 'file' module functions even for cfile + % handles. We're specifically acting like an OTP file layer to ensure we + % don't have to duplicate our couch_file code + + Fd = open_raw(Path), + {ok, CFd} = couch_cfile:dup(Fd), + + % Check positions + {ok, Eof} = file:position(Fd, eof), + {ok, CFdEof} = file:position(CFd, eof), + ?assertEqual(Eof, CFdEof), + + % Basic preads + ?assertEqual({ok, []}, file:pread(CFd, [])), + ?assertEqual({ok, [eof]}, file:pread(CFd, [{0, 0}])), + ?assertEqual(eof, file:pread(CFd, 0, 0)), + ?assertEqual({ok, [<<"a">>]}, file:pread(CFd, [{0, 1}])), + ?assertEqual({ok, <<"a">>}, file:pread(CFd, 0, 1)), + ?assertEqual({ok, <<"d">>}, file:pread(CFd, 3, 1)), + ?assertEqual({ok, <<"d">>}, file:pread(CFd, 3, 2)), + ?assertEqual({ok, [<<"a">>, eof]}, file:pread(CFd, [{0, 1}, {4, 1}])), + ?assertEqual({error, einval}, couch_cfile:pread(junk, [{0, 1}])), + ?assertEqual({error, badarg}, file:pread(CFd, junk)), + + % Most of all we care that behavior matches with wtih file:pread/1 for Review Comment: ```suggestion % Most of all we care that behavior matches file:pread/1 for ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
