jaydoane commented on code in PR #4266:
URL: https://github.com/apache/couchdb/pull/4266#discussion_r1024498891


##########
src/smoosh/src/smoosh_persist.erl:
##########
@@ -0,0 +1,300 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_persist).
+
+-export([
+    unpersist/1,
+    persist/3,
+    check_setup/0
+]).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(SUFFIX, ".smooshq").
+
+% Public API
+
+unpersist(Name) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    Capacity = smoosh_utils:capacity(Name),
+    unpersist(Enabled, Name, Capacity).
+
+persist(Waiting, #{} = Active, #{} = Starting) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    persist(Enabled, Waiting, Active, Starting).
+
+% Validate peristence setup for read/write/delete access. Emit warnings if
+% there are any failures. Call this function once during startup. During
+% runtime errors are ignored. Smoosh persistence is opportunistic, so if we
+% cannot read or write we just move on.
+%
+check_setup() ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    try
+        check_setup(Enabled)
+    catch
+        throw:{fail, Msg, Error} ->
+            LogMsg = "~s : " ++ Msg ++ " failed in directory ~p : ~p",
+            Args = [?MODULE, state_dir(), Error],
+            couch_log:warning(LogMsg, Args),
+            {error, {Msg, Error}}
+    end.
+
+% Private functions
+
+unpersist(false, Name, _Capacity) ->
+    smoosh_priority_queue:new(Name);
+unpersist(true, Name, Capacity) ->
+    Path = file_path(Name),
+    case read(Path) of
+        {ok, Map} -> smoosh_priority_queue:from_map(Name, Capacity, Map);
+        {error, _} -> smoosh_priority_queue:new(Name)
+    end.
+
+persist(false, _Waiting, _Active, _Starting) ->
+    ok;
+persist(true, Waiting, Active, Starting) ->
+    Name = smoosh_priority_queue:name(Waiting),
+    WMap = smoosh_priority_queue:to_map(Waiting),
+    % Starting and active jobs are at priority level `infinity` as they are
+    % already running. We want them to be the first one to continue after

Review Comment:
   Probably can omit "one" or change to "ones" in "We want them to be the first 
one to continue ..." since they are plural.



##########
src/smoosh/src/smoosh_persist.erl:
##########
@@ -0,0 +1,300 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_persist).
+
+-export([
+    unpersist/1,
+    persist/3,
+    check_setup/0
+]).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(SUFFIX, ".smooshq").
+
+% Public API
+
+unpersist(Name) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    Capacity = smoosh_utils:capacity(Name),
+    unpersist(Enabled, Name, Capacity).
+
+persist(Waiting, #{} = Active, #{} = Starting) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    persist(Enabled, Waiting, Active, Starting).
+
+% Validate peristence setup for read/write/delete access. Emit warnings if
+% there are any failures. Call this function once during startup. During
+% runtime errors are ignored. Smoosh persistence is opportunistic, so if we
+% cannot read or write we just move on.
+%
+check_setup() ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    try
+        check_setup(Enabled)
+    catch
+        throw:{fail, Msg, Error} ->
+            LogMsg = "~s : " ++ Msg ++ " failed in directory ~p : ~p",
+            Args = [?MODULE, state_dir(), Error],
+            couch_log:warning(LogMsg, Args),
+            {error, {Msg, Error}}
+    end.
+
+% Private functions
+
+unpersist(false, Name, _Capacity) ->
+    smoosh_priority_queue:new(Name);
+unpersist(true, Name, Capacity) ->
+    Path = file_path(Name),
+    case read(Path) of
+        {ok, Map} -> smoosh_priority_queue:from_map(Name, Capacity, Map);
+        {error, _} -> smoosh_priority_queue:new(Name)
+    end.
+
+persist(false, _Waiting, _Active, _Starting) ->
+    ok;
+persist(true, Waiting, Active, Starting) ->
+    Name = smoosh_priority_queue:name(Waiting),
+    WMap = smoosh_priority_queue:to_map(Waiting),
+    % Starting and active jobs are at priority level `infinity` as they are
+    % already running. We want them to be the first one to continue after
+    % restart. We're relying on infinity sorting higher than float and integer
+    % numeric values here.
+    AMap = maps:map(fun(_, _) -> infinity end, Active),
+    SMap = maps:from_list([{K, infinity} || K <- maps:values(Starting)]),
+    Path = file_path(Name),
+    write(maps:merge(WMap, maps:merge(AMap, SMap)), Path).
+
+check_setup(false) ->
+    disabled;
+check_setup(true) ->
+    StateDir = state_dir(),
+    Path = filename:join(StateDir, "smooshq.test"),
+    Data = #{<<"test">> => 1},
+    case file:read_file_info(StateDir) of
+        {ok, #file_info{access = A}} when A == read; A == read_write ->
+            ok;
+        {ok, #file_info{access = Invalid}} ->
+            throw({fail, "read access", Invalid});
+        {error, Error1} ->
+            throw({fail, "read", Error1})
+    end,
+    case write(Data, Path) of
+        ok -> ok;
+        {error, Error2} -> throw({fail, "write", Error2})
+    end,
+    delete_file(Path).
+
+write(#{} = QData, Path) when is_list(Path), map_size(QData) == 0 ->
+    % Save a few bytes by deleting the persisted queue data if
+    % there are no waiting/starting or active jobs
+    delete_file(Path);
+write(#{} = QData, Path) when is_list(Path) ->
+    Bin = term_to_binary(QData, [compressed, {minor_version, 2}]),
+    TmpPath = tmp_path(Path),
+    case file:write_file(TmpPath, Bin, [raw]) of
+        ok -> file:rename(TmpPath, Path);
+        {error, _} = Error -> Error
+    end.
+
+read(Path) ->
+    case file:read_file(Path) of
+        {ok, Bin} ->
+            try binary_to_term(Bin, [safe]) of
+                #{} = QData -> {ok, QData};
+                _ -> {error, term_not_a_map}
+            catch
+                _:_ ->
+                    {error, invalid_term}
+            end;
+        {error, _} = Error ->
+            Error
+    end.
+
+tmp_path(Path) ->
+    Time = abs(erlang:system_time()),
+    Path ++ "." ++ integer_to_list(Time) ++ ".tmp".
+
+file_path(Name) ->
+    StateDir = filename:absname(state_dir()),
+    filename:join(StateDir, Name ++ ?SUFFIX).
+
+state_dir() ->
+    Dir = config:get("smoosh", "state_dir", "."),
+    filename:absname(Dir).
+
+delete_file(Path) ->
+    % On Erlang 24+ we can avoid using the file server
+    case erlang:function_exported(file, delete, 2) of
+        true -> file:delete(Path, [raw]);
+        false -> file:delete(Path)
+    end.
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+persist_unpersist_test_() ->
+    {
+        foreach,
+        fun() ->
+            meck:expect(config, get, fun(_, _, Default) -> Default end)
+        end,
+        fun(_) ->
+            meck:unload()
+        end,
+        [
+            ?TDEF_FE(t_write_read_delete),
+            ?TDEF_FE(t_fail_write_read_delete),
+            ?TDEF_FE(t_corrupted_read),
+            ?TDEF_FE(t_check_setup),
+            ?TDEF_FE(t_persist_unpersist_disabled),
+            ?TDEF_FE(t_persist_unpersist_enabled),
+            ?TDEF_FE(t_persist_unpersist_errors)
+        ]
+    }.
+
+t_write_read_delete(_) ->
+    Path = file_path("foochan"),
+    Data = #{<<"a">> => 1},
+
+    ?assertEqual(ok, write(Data, Path)),
+    ?assertMatch({ok, _}, file:read_file_info(Path)),
+
+    ReadRes = read(Path),
+    ?assertMatch({ok, #{}}, ReadRes),
+    {ok, ReadData} = ReadRes,
+    ?assertEqual(Data, ReadData),
+
+    ?assertEqual(ok, write(#{}, Path)),
+    ?assertEqual({error, enoent}, file:read_file_info(Path)).
+
+t_fail_write_read_delete(_) ->
+    meck:expect(config, get, fun("smoosh", "state_dir", _) -> "./x" end),
+    Path = file_path("foochan"),
+    ?assertEqual({error, enoent}, write(#{<<"a">> => 1}, Path)),
+    ?assertEqual({error, enoent}, read(Path)),
+    ?assertEqual({error, enoent}, write(#{}, Path)).
+
+t_corrupted_read(_) ->
+    Path = file_path("foochan"),
+    ?assertEqual(ok, write(#{<<"a">> => 1}, Path)),
+
+    ok = file:write_file(Path, term_to_binary(foo), [raw]),
+    ?assertEqual({error, term_not_a_map}, read(Path)),
+
+    ok = file:write_file(Path, <<"42">>, [raw]),
+    ?assertEqual({error, invalid_term}, read(Path)),
+
+    ?assertEqual(ok, write(#{}, Path)),
+    ?assertEqual({error, enoent}, file:read_file_info(Path)).
+
+t_check_setup(_) ->
+    ?assertEqual(disabled, check_setup()),
+
+    meck:expect(config, get_boolean, fun("smoosh", "persist", _) -> true end),
+    ?assertEqual(ok, check_setup()),
+
+    TDir = ?tempfile(),

Review Comment:
   Does this get cleaned up?



##########
src/smoosh/src/smoosh_utils.erl:
##########
@@ -135,3 +93,196 @@ parse_time(String, Default) ->
 
 log_level(Key, Default) when is_list(Key), is_list(Default) ->
     list_to_existing_atom(config:get("smoosh", Key, Default)).
+
+db_channels() ->
+    ConfStr = config:get("smoosh", "db_channels"),
+    channel_list(ConfStr, ?BUILT_IN_DB_CHANNELS).
+
+view_channels() ->
+    Conf = config:get("smoosh", "view_channels"),
+    channel_list(Conf, ?BUILT_IN_VIEW_CHANNELS).
+
+cleanup_channels() ->
+    Conf = config:get("smoosh", "cleanup_channels"),
+    channel_list(Conf, ?BUILT_IN_CLEANUP_CHANNELS).
+
+channel_list(ConfStr, Default) ->
+    DefaultList = split(Default),
+    ConfList = split(ConfStr),
+    lists:usort(DefaultList ++ ConfList).
+
+concurrency(ChannelName) ->
+    list_to_integer(?MODULE:get(ChannelName, "concurrency", "1")).
+
+capacity(ChannelName) ->
+    list_to_integer(?MODULE:get(ChannelName, "capacity", "9999")).

Review Comment:
   Does it make sense to define these defaults somewhere else?



##########
src/smoosh/src/smoosh_channel.erl:
##########
@@ -235,245 +190,146 @@ handle_info(check_window, State) ->
                 State;
             {false, true} ->
                 % resume is always safe even if we did not previously suspend
-                {reply, ok, NewState} = handle_call(resume_and_activate, nil, 
State),
-                NewState;
+                do_resume(State);
+            {true, false} when StrictWindow =:= "true" ->
+                % suspend immediately
+                do_suspend(State);
             {true, false} ->
-                if
-                    StrictWindow =:= "true" ->
-                        {reply, ok, NewState} = handle_call(suspend, nil, 
State),
-                        NewState;
-                    true ->
-                        State#state{paused = true}
-                end
+                % prevent new jobs from starting, active ones keep running
+                State#state{paused = true}
         end,
-    erlang:send_after(60 * 1000, self(), check_window),
+    schedule_check_window(),
     {noreply, FinalState};
-handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) 
->
-    RecActive = recover(active_file_name(Name)),
-    Waiting1 = lists:foldl(
-        fun(DbName, Acc) ->
-            case couch_db:exists(DbName) andalso 
couch_db:is_compacting(DbName) of
-                true ->
-                    Priority = smoosh_server:get_priority(Name, DbName),
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting0,
-        RecActive
-    ),
-    State1 = maybe_start_compaction(State0#state{paused = false, waiting = 
Waiting1}),
-    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-    couch_log:Level(
-        "~p Previously active compaction jobs (if any) have been successfully 
recovered and restarted.",
-        [?MODULE]
-    ),
-    erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
-    {noreply, State1#state{paused = true}};
-handle_info(activate, State) ->
-    {noreply, activate_channel(State)};
-handle_info(persist, State) ->
-    ok = persist_and_reschedule(State),
+handle_info(update_status, #state{} = State) ->
+    schedule_update_status(),
+    {noreply, set_status(State)};
+handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) ->
+    % If a checkpointer process is still running, don't start another one.
+    schedule_checkpoint(),
     {noreply, State};
-handle_info(pause, State) ->
-    {noreply, State#state{paused = true}};
+handle_info(checkpoint, #state{cref = undefined} = State) ->
+    % Start an asyncronous checkpoint process so we don't block the channel
+    #state{waiting = Waiting, active = Active, starting = Starting} = State,
+    Args = [Waiting, Active, Starting],
+    {_, Ref} = spawn_monitor(smoosh_persist, persist, Args),
+    schedule_checkpoint(),
+    {noreply, State#state{cref = Ref}};
 handle_info(unpause, State) ->
     {noreply, maybe_start_compaction(State#state{paused = false})}.
 
-terminate(_Reason, _State) ->
-    ok.
-
 % private functions.
 
-recover(FilePath) ->
-    case do_recover(FilePath) of
-        {ok, List} ->
-            List;
-        error ->
-            []
-    end.
-
-do_recover(FilePath) ->
-    case file:read_file(FilePath) of
-        {ok, Content} ->
-            <<Vsn, Binary/binary>> = Content,
-            try parse_state(Vsn, ?VSN, Binary) of
-                Term ->
-                    Level = smoosh_utils:log_level("compaction_log_level", 
"debug"),
-                    couch_log:Level(
-                        "~p Successfully restored state file ~s", [?MODULE, 
FilePath]
-                    ),
-                    {ok, Term}
-            catch
-                error:Reason ->
-                    couch_log:error(
-                        "~p Invalid state file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
-                    ),
-                    file:delete(FilePath),
-                    error
-            end;
-        {error, enoent} ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level(
-                "~p (~p) State file ~s does not exist. Not restoring.", 
[?MODULE, enoent, FilePath]
-            ),
-            error;
-        {error, Reason} ->
-            couch_log:error(
-                "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
-            ),
-            file:delete(FilePath),
-            error
-    end.
-
-parse_state(1, ?VSN, Binary) ->
-    erlang:binary_to_term(Binary, [safe]);
-parse_state(Vsn, ?VSN, _) ->
-    error({unsupported_version, Vsn}).
-
-persist_and_reschedule(State) ->
-    persist_queue(State),
-    erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
-    ok.
-
-persist_queue(#state{name = Name, active = Active, starting = Starting, 
waiting = Waiting}) ->
-    Active1 = lists:foldl(
-        fun({DbName, _}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Active
-    ),
-    Starting1 = lists:foldl(
-        fun({_, DbName}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Starting
-    ),
-    smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
-    smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
-    smoosh_priority_queue:write_to_file(Waiting).
-
-active_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
-
-starting_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
+unpersist(Name) ->
+    % Insert into the access table with a current
+    % timestamp to prevent the same dbs from being re-enqueued
+    % again after startup startup
+    Waiting = smoosh_persist:unpersist(Name),
+    MapFun = fun(Object, _Priority) ->
+        smoosh_server:update_access(Object)
+    end,
+    maps:map(MapFun, smoosh_priority_queue:to_map(Waiting)),
+    Waiting.
+
+% Periodically publish channel status to avoid having to block on gen_server

Review Comment:
   Maybe "cache in an ets table" instead of "publish"?



##########
src/smoosh/src/smoosh_channel.erl:
##########
@@ -482,94 +338,155 @@ start_compact(State, DbName) when is_binary(DbName) ->
                 couch_db:close(Db)
             end;
         Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "Error starting compaction for ~p: ~p",
-                [smoosh_utils:stringify(DbName), Error]
-            ),
+            LogMsg = "~s : Error starting compaction for ~p: ~p",
+            LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+            couch_log:warnign(LogMsg, LogArgs),

Review Comment:
   s/warnign/warning/ and maybe add test case for this?



##########
src/smoosh/operator_guide.md:
##########
@@ -45,14 +45,33 @@ calculation of _X_ is described in [Priority 
calculation](#priority-calculation)
 
 Both algorithms operate on two main measures:
 
-* **user_bytes**: this is the amount of data the user has in the file. It
-doesn't include storage overhead: old revisions, on-disk btree structure and
-so on.
+* **active_bytes**: this is the amount of data used by btree structure and the
+document bodies in the leaves of the revision tree of each document. It
+includes storage overhea, on-disk btree structure but does not include document

Review Comment:
   s/overhea/overhead/



##########
src/smoosh/test/smoosh_tests.erl:
##########
@@ -3,152 +3,486 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
-%% ==========
-%% Setup
-%% ----------
-
-setup(ChannelType) ->
-    DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    couch_db:close(Db),
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:flush(ChannelPid),
-    ok = config:set("smoosh", "persist", "true", false),
-    ok = config:set(config_section(ChannelType), "min_size", "1", false),
-    ok = config:set(config_section(ChannelType), "min_priority", "1", false),
-    DbName.
-
-teardown(ChannelType, DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
-    ok = config:delete("smoosh", "persist", false),
-    ok = config:delete(config_section(DbName), "min_size", false),
-    ok = config:delete(config_section(DbName), "min_priority", false),
-    meck:unload(),
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:flush(ChannelPid),
-    ok.
-
-config_section(ChannelType) ->
-    "smoosh." ++ ChannelType.
-
-%% ==========
-%% Tests
-%% ----------
-
 smoosh_test_() ->
     {
-        "Testing smoosh",
+        setup,
+        fun setup_all/0,
+        fun teardown_all/1,
         {
-            setup,
-            fun() -> test_util:start_couch([smoosh]) end,
-            fun test_util:stop/1,
+            foreach,
+            fun setup/0,
+            fun teardown/1,
             [
-                channels_tests(),
-                persistence_tests()
+                ?TDEF_FE(t_default_channels),
+                ?TDEF_FE(t_channels_recreated_on_crash),
+                ?TDEF_FE(t_can_create_and_delete_channels),
+                ?TDEF_FE(t_db_is_enqueued_and_compacted),
+                ?TDEF_FE(t_view_is_enqueued_and_compacted),
+                ?TDEF_FE(t_index_cleanup_happens_by_default),
+                ?TDEF_FE(t_index_cleanup_can_be_disabled),
+                ?TDEF_FE(t_suspend_resume),
+                ?TDEF_FE(t_check_window_can_resume),
+                ?TDEF_FE(t_renqueue_on_crashes),
+                ?TDEF_FE(t_update_status_works),
+                ?TDEF_FE(t_checkpointing_works, 15),
+                ?TDEF_FE(t_ignore_checkpoint_resume_if_compacted_already),
+                ?TDEF_FE(t_access_cleaner_restarts),
+                ?TDEF_FE(t_event_handler_restarts),
+                ?TDEF_FE(t_manual_enqueue_api_works),
+                ?TDEF_FE(t_access_cleaner_works)
             ]
         }
     }.
 
-persistence_tests() ->
-    Tests = [
-        fun should_persist_queue/2,
-        fun should_call_recover/2,
-        fun should_not_call_recover/2
-    ],
-    {
-        "Various persistence tests",
-        [
-            make_test_case("ratio_dbs", Tests)
-        ]
-    }.
+setup_all() ->
+    meck:new(smoosh_server, [passthrough]),
+    meck:new(smoosh_channel, [passthrough]),
+    meck:new(fabric, [passthrough]),
+    meck:new(couch_emsort, [passthrough]),
+    Ctx = test_util:start_couch([fabric]),
+    config:set("query_server_config", "commit_freq", "0", false),
+    Ctx.
 
-channels_tests() ->
-    Tests = [
-        fun should_enqueue/2
-    ],
-    {
-        "Various channels tests",
+teardown_all(Ctx) ->
+    catch application:stop(smoosh),
+    config:delete("query_server_config", "commit_freq", false),
+    test_util:stop(Ctx),
+    meck:unload().
+
+setup() ->
+    config:set("smoosh", "persist", "false", false),
+    config:set("smoosh", "wait_secs", "0", false),
+    DbName = ?tempdb(),
+    fabric:create_db(DbName, [{q, 1}]),
+    {ok, _} = create_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+    {ok, _} = create_doc(DbName, <<"doc1">>, 1500000),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    application:start(smoosh),
+    wait_for_channels(),
+    flush(),
+    DbName.
+
+teardown(DbName) ->
+    catch flush(),
+    catch application:stop(smoosh),
+    fabric:delete_db(DbName),
+    meck:reset(smoosh_server),
+    meck:reset(smoosh_channel),
+    meck:reset(couch_emsort),
+    meck:reset(fabric),
+    config:delete("smoosh", "db_channels", false),
+    config:delete("smoosh.ratio_dbs", "min_priority", false),
+    config:delete("smoosh.ratio_views", "min_priority", false),
+    config:delete("smoosh", "view_channels", false),
+    config:delete("smoosh", "cleanup_channels", false),
+    config:delete("smoosh", "wait_secs", false),
+    config:delete("smoosh", "persist", false),
+    config:delete("smoosh", "cleanup_index_files", false).
+
+t_default_channels(_) ->
+    ?assertMatch(
         [
-            make_test_case("ratio_dbs", Tests)
-        ]
-    }.
+            {"index_cleanup", _},
+            {"ratio_dbs", _},
+            {"ratio_views", _},
+            {"slack_dbs", _},
+            {"slack_views", _},
+            {"upgrade_dbs", _},
+            {"upgrade_views", _}
+        ],
+        status()
+    ),
+    % If app hasn't started status won't crash
+    application:stop(smoosh),
+    ?assertEqual([], status()).
 
-make_test_case(Type, Funs) ->
-    {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+t_channels_recreated_on_crash(_) ->
+    RatioDbsPid = get_channel_pid("ratio_dbs"),
+    meck:reset(smoosh_channel),
+    exit(RatioDbsPid, kill),
+    meck:wait(1, smoosh_channel, start_link, 1, 3000),
+    wait_for_channels(7),
+    ?assertMatch([_, {"ratio_dbs", _} | _], status()),
+    ?assertNotEqual(RatioDbsPid, get_channel_pid("ratio_dbs")).
 
-should_enqueue(ChannelType, DbName) ->
-    ?_test(begin
-        ok = grow_db_file(DbName, 300),
-        ok = wait_enqueue(ChannelType, DbName),
-        ?assert(is_enqueued(ChannelType, DbName)),
-        ok
-    end).
+t_can_create_and_delete_channels(_) ->
+    config:set("smoosh", "db_channels", "mychan1", false),
+    config:set("smoosh", "view_channels", "mychan2", false),
+    config:set("smoosh", "cleanup_channels", "mychan3", false),
+    % 7 default ones + 3 new ones
+    wait_for_channels(10),
+    meck:reset(smoosh_channel),
+    config:delete("smoosh", "db_channels", false),
+    config:delete("smoosh", "view_channels", false),
+    config:delete("smoosh", "cleanup_channels", false),
+    wait_for_channels(7).
 
-should_persist_queue(ChannelType, DbName) ->
-    ?_test(begin
-        {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-        ok = grow_db_file(DbName, 300),
-        ok = wait_enqueue(ChannelType, DbName),
-        ok = smoosh_channel:persist(ChannelPid),
-        Q0 = channel_queue(ChannelType),
-        ok = application:stop(smoosh),
-        ok = application:start(smoosh),
-        Q1 = channel_queue(ChannelType),
-        % Assert that queues are not empty
-        ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
-        ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
-        ?assertEqual(Q0, Q1),
-        ok
-    end).
+t_db_is_enqueued_and_compacted(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    ok = wait_compact_start(),
+    ok = wait_normal_down().
 
-should_call_recover(_ChannelType, _DbName) ->
-    ?_test(begin
-        ok = application:stop(smoosh),
-        ok = config:set("smoosh", "persist", "true", false),
-        meck:new(smoosh_priority_queue, [passthrough]),
-        ok = application:start(smoosh),
-        timer:sleep(1000),
-        ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, 
'_')),
-        ok
-    end).
+t_view_is_enqueued_and_compacted(DbName) ->
+    % We don't want index cleanup to interfere for now
+    config:set("smoosh", "cleanup_index_files", "false", false),
+    % Ensure db is compacted
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_normal_down(),
+    % Check view
+    meck:reset(smoosh_channel),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    ok = wait_to_enqueue({DbName, <<"_design/foo">>}),
+    ok = wait_compact_start(),
+    ok = wait_normal_down().
 
-should_not_call_recover(_ChannelType, _DbName) ->
-    ?_test(begin
-        ok = application:stop(smoosh),
-        ok = config:set("smoosh", "persist", "false", false),
-        meck:new(smoosh_priority_queue, [passthrough]),
-        ok = application:start(smoosh),
-        timer:sleep(1000),
-        ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
-        ok
-    end).
+t_index_cleanup_happens_by_default(DbName) ->
+    ?assert(config:get_boolean("smoosh", "cleanup_index_files", true)),
+    % Db compacts
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_normal_down(),
+    % View should compact as well
+    meck:reset(fabric),
+    meck:reset(smoosh_channel),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    % View cleanup should have been invoked
+    meck:wait(fabric, cleanup_index_files, [DbName], 4000).
+
+t_index_cleanup_can_be_disabled(DbName) ->
+    config:set("smoosh", "cleanup_index_files", "false", false),
+    % Db compacts
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_normal_down(),
+    % View should compact as well
+    meck:reset(fabric),
+    meck:reset(smoosh_channel),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    ok = wait_compact_start(),
+    ok = wait_normal_down(),
+    % View cleanup was not called
+    timer:sleep(1000),
+    ?assertEqual(0, meck:num_calls(fabric, cleanup_index_files, 1)).
+
+t_suspend_resume(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    ok = smoosh:suspend(),
+    ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+    % Suspending twice should work too
+    ok = smoosh:suspend(),
+    ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+    ok = smoosh:resume(),
+    ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    % Resuming twice should work too
+    ok = smoosh:resume(),
+    ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    CompPid ! continue,
+    ok = wait_normal_down().
+
+t_check_window_can_resume(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    ok = smoosh:suspend(),
+    ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    get_channel_pid("ratio_dbs") ! check_window,
+    CompPid ! continue,
+    ok = wait_normal_down().
+
+t_renqueue_on_crashes(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    meck:reset(smoosh_channel),
+    CompPid ! {raise, error, boom},
+    ok = wait_to_enqueue(DbName),
+    CompPid2 = wait_db_compactor_pid(),
+    CompPid2 ! continue,
+    ok = wait_normal_down().
+
+t_update_status_works(DbName) ->
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    % status should have 1 starting job, but it may have not been updated yet 
so
+    % we wait until update_status is called
+    wait_update_status(),
+    WaitFun = fun() ->
+        case {1, 0, 0} =:= status("ratio_dbs") of

Review Comment:
   Is this style preferred over e.g.
   ```erlang
   case statu("ratio_dbs") of
       {1, 0, 0} -> ok;
       _ -> wait
   end
   ```
   in this case?



##########
src/smoosh/src/smoosh_channel.erl:
##########
@@ -235,245 +190,146 @@ handle_info(check_window, State) ->
                 State;
             {false, true} ->
                 % resume is always safe even if we did not previously suspend
-                {reply, ok, NewState} = handle_call(resume_and_activate, nil, 
State),
-                NewState;
+                do_resume(State);
+            {true, false} when StrictWindow =:= "true" ->
+                % suspend immediately
+                do_suspend(State);
             {true, false} ->
-                if
-                    StrictWindow =:= "true" ->
-                        {reply, ok, NewState} = handle_call(suspend, nil, 
State),
-                        NewState;
-                    true ->
-                        State#state{paused = true}
-                end
+                % prevent new jobs from starting, active ones keep running
+                State#state{paused = true}
         end,
-    erlang:send_after(60 * 1000, self(), check_window),
+    schedule_check_window(),
     {noreply, FinalState};
-handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) 
->
-    RecActive = recover(active_file_name(Name)),
-    Waiting1 = lists:foldl(
-        fun(DbName, Acc) ->
-            case couch_db:exists(DbName) andalso 
couch_db:is_compacting(DbName) of
-                true ->
-                    Priority = smoosh_server:get_priority(Name, DbName),
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting0,
-        RecActive
-    ),
-    State1 = maybe_start_compaction(State0#state{paused = false, waiting = 
Waiting1}),
-    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-    couch_log:Level(
-        "~p Previously active compaction jobs (if any) have been successfully 
recovered and restarted.",
-        [?MODULE]
-    ),
-    erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
-    {noreply, State1#state{paused = true}};
-handle_info(activate, State) ->
-    {noreply, activate_channel(State)};
-handle_info(persist, State) ->
-    ok = persist_and_reschedule(State),
+handle_info(update_status, #state{} = State) ->
+    schedule_update_status(),
+    {noreply, set_status(State)};
+handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) ->
+    % If a checkpointer process is still running, don't start another one.
+    schedule_checkpoint(),
     {noreply, State};
-handle_info(pause, State) ->
-    {noreply, State#state{paused = true}};
+handle_info(checkpoint, #state{cref = undefined} = State) ->
+    % Start an asyncronous checkpoint process so we don't block the channel
+    #state{waiting = Waiting, active = Active, starting = Starting} = State,
+    Args = [Waiting, Active, Starting],
+    {_, Ref} = spawn_monitor(smoosh_persist, persist, Args),
+    schedule_checkpoint(),
+    {noreply, State#state{cref = Ref}};
 handle_info(unpause, State) ->
     {noreply, maybe_start_compaction(State#state{paused = false})}.
 
-terminate(_Reason, _State) ->
-    ok.
-
 % private functions.
 
-recover(FilePath) ->
-    case do_recover(FilePath) of
-        {ok, List} ->
-            List;
-        error ->
-            []
-    end.
-
-do_recover(FilePath) ->
-    case file:read_file(FilePath) of
-        {ok, Content} ->
-            <<Vsn, Binary/binary>> = Content,
-            try parse_state(Vsn, ?VSN, Binary) of
-                Term ->
-                    Level = smoosh_utils:log_level("compaction_log_level", 
"debug"),
-                    couch_log:Level(
-                        "~p Successfully restored state file ~s", [?MODULE, 
FilePath]
-                    ),
-                    {ok, Term}
-            catch
-                error:Reason ->
-                    couch_log:error(
-                        "~p Invalid state file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
-                    ),
-                    file:delete(FilePath),
-                    error
-            end;
-        {error, enoent} ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level(
-                "~p (~p) State file ~s does not exist. Not restoring.", 
[?MODULE, enoent, FilePath]
-            ),
-            error;
-        {error, Reason} ->
-            couch_log:error(
-                "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
-            ),
-            file:delete(FilePath),
-            error
-    end.
-
-parse_state(1, ?VSN, Binary) ->
-    erlang:binary_to_term(Binary, [safe]);
-parse_state(Vsn, ?VSN, _) ->
-    error({unsupported_version, Vsn}).
-
-persist_and_reschedule(State) ->
-    persist_queue(State),
-    erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
-    ok.
-
-persist_queue(#state{name = Name, active = Active, starting = Starting, 
waiting = Waiting}) ->
-    Active1 = lists:foldl(
-        fun({DbName, _}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Active
-    ),
-    Starting1 = lists:foldl(
-        fun({_, DbName}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Starting
-    ),
-    smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
-    smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
-    smoosh_priority_queue:write_to_file(Waiting).
-
-active_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
-
-starting_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
+unpersist(Name) ->
+    % Insert into the access table with a current
+    % timestamp to prevent the same dbs from being re-enqueued
+    % again after startup startup

Review Comment:
   Extra "startup" at end? Also missing period?



##########
src/smoosh/src/smoosh_utils.erl:
##########
@@ -14,23 +14,40 @@
 -include_lib("couch/include/couch_db.hrl").

Review Comment:
   minor typos in commit body: "guranteed", "changes is"



##########
src/smoosh/src/smoosh_utils.erl:
##########
@@ -135,3 +93,196 @@ parse_time(String, Default) ->
 
 log_level(Key, Default) when is_list(Key), is_list(Default) ->
     list_to_existing_atom(config:get("smoosh", Key, Default)).
+
+db_channels() ->
+    ConfStr = config:get("smoosh", "db_channels"),
+    channel_list(ConfStr, ?BUILT_IN_DB_CHANNELS).
+
+view_channels() ->
+    Conf = config:get("smoosh", "view_channels"),
+    channel_list(Conf, ?BUILT_IN_VIEW_CHANNELS).
+
+cleanup_channels() ->
+    Conf = config:get("smoosh", "cleanup_channels"),
+    channel_list(Conf, ?BUILT_IN_CLEANUP_CHANNELS).
+
+channel_list(ConfStr, Default) ->
+    DefaultList = split(Default),
+    ConfList = split(ConfStr),
+    lists:usort(DefaultList ++ ConfList).
+
+concurrency(ChannelName) ->
+    list_to_integer(?MODULE:get(ChannelName, "concurrency", "1")).
+
+capacity(ChannelName) ->
+    list_to_integer(?MODULE:get(ChannelName, "capacity", "9999")).
+
+% Validate enqueue arg at the front of the API instead of adding ?l2b and ?b2l
+% everywhere internally
+%
+validate_arg({?INDEX_CLEANUP, DbName}) when is_list(DbName) ->
+    validate_arg({?INDEX_CLEANUP, ?l2b(DbName)});
+validate_arg(DbName) when is_list(DbName) ->
+    validate_arg(?l2b(DbName));
+validate_arg({DbName, GroupId}) when is_list(DbName) ->
+    validate_arg({?l2b(DbName), GroupId});
+validate_arg({DbName, GroupId}) when is_list(GroupId) ->
+    validate_arg({DbName, ?l2b(GroupId)});
+validate_arg(DbName) when is_binary(DbName) ->
+    DbName;
+validate_arg({DbName, GroupId}) when is_binary(DbName), is_binary(GroupId) ->
+    {DbName, GroupId};
+validate_arg({?INDEX_CLEANUP, DbName}) when is_binary(DbName) ->
+    {?INDEX_CLEANUP, DbName};
+validate_arg(_) ->
+    error(invalid_smoosh_arg).
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+smoosh_util_validate_test() ->
+    ?assertEqual(<<"x">>, validate_arg(<<"x">>)),
+    ?assertEqual(<<"x">>, validate_arg("x")),
+    ?assertEqual({<<"x">>, <<"y">>}, validate_arg({"x", "y"})),
+    ?assertEqual({<<"x">>, <<"y">>}, validate_arg({<<"x">>, "y"})),
+    ?assertEqual({<<"x">>, <<"y">>}, validate_arg({"x", <<"y">>})),
+    ?assertEqual({<<"x">>, <<"y">>}, validate_arg({<<"x">>, <<"y">>})),
+    ?assertEqual({?INDEX_CLEANUP, <<"x">>}, validate_arg({?INDEX_CLEANUP, 
"x"})),
+    ?assertEqual({?INDEX_CLEANUP, <<"x">>}, validate_arg({?INDEX_CLEANUP, 
<<"x">>})),
+    ?assertError(invalid_smoosh_arg, validate_arg(foo)),
+    ?assertError(invalid_smoosh_arg, validate_arg({foo, bar})),
+    ?assertError(invalid_smoosh_arg, validate_arg({?INDEX_CLEANUP, foo})).
+
+smoosh_utils_test_() ->
+    {
+        foreach,
+        fun() ->
+            meck:new(calendar, [passthrough, unstick]),
+            meck:expect(config, get, fun(_, _, Default) -> Default end)
+        end,
+        fun(_) ->
+            meck:unload()
+        end,
+        [
+            ?TDEF_FE(t_channel_list_default),
+            ?TDEF_FE(t_channel_list_configured),
+            ?TDEF_FE(t_capacity),
+            ?TDEF_FE(t_concurrency),
+            ?TDEF_FE(t_stringify),
+            ?TDEF_FE(t_ignore_db),
+            ?TDEF_FE(t_log_level),
+            ?TDEF_FE(t_allowed_window)
+        ]
+    }.
+
+t_channel_list_default(_) ->
+    ?assertEqual(
+        ["ratio_dbs", "slack_dbs", "upgrade_dbs"],

Review Comment:
   Do you prefer making these explicit rather than reusing the `-define`d 
default values?



##########
src/smoosh/src/smoosh_persist.erl:
##########
@@ -0,0 +1,300 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_persist).
+
+-export([
+    unpersist/1,
+    persist/3,
+    check_setup/0
+]).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(SUFFIX, ".smooshq").
+
+% Public API
+
+unpersist(Name) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    Capacity = smoosh_utils:capacity(Name),
+    unpersist(Enabled, Name, Capacity).
+
+persist(Waiting, #{} = Active, #{} = Starting) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    persist(Enabled, Waiting, Active, Starting).
+
+% Validate peristence setup for read/write/delete access. Emit warnings if
+% there are any failures. Call this function once during startup. During
+% runtime errors are ignored. Smoosh persistence is opportunistic, so if we
+% cannot read or write we just move on.
+%
+check_setup() ->

Review Comment:
   This is a nice feature that could mitigate some operational headaches! Not 
necessarily super cheap, but fast enough if it's only run on startup:
   ```
   ([email protected])6> timer:tc(fun() -> smoosh_persist:check_setup() end).
   {2938,ok}
   ([email protected])7> timer:tc(fun() -> smoosh_persist:check_setup() end).
   {3294,ok}
   ([email protected])8> timer:tc(fun() -> smoosh_persist:check_setup() end).
   {3611,ok}
   ```



##########
src/smoosh/src/smoosh_persist.erl:
##########
@@ -0,0 +1,300 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_persist).
+
+-export([
+    unpersist/1,
+    persist/3,
+    check_setup/0
+]).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(SUFFIX, ".smooshq").
+
+% Public API
+
+unpersist(Name) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    Capacity = smoosh_utils:capacity(Name),
+    unpersist(Enabled, Name, Capacity).
+
+persist(Waiting, #{} = Active, #{} = Starting) ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    persist(Enabled, Waiting, Active, Starting).
+
+% Validate peristence setup for read/write/delete access. Emit warnings if
+% there are any failures. Call this function once during startup. During
+% runtime errors are ignored. Smoosh persistence is opportunistic, so if we
+% cannot read or write we just move on.
+%
+check_setup() ->
+    Enabled = config:get_boolean("smoosh", "persist", false),
+    try
+        check_setup(Enabled)
+    catch
+        throw:{fail, Msg, Error} ->
+            LogMsg = "~s : " ++ Msg ++ " failed in directory ~p : ~p",
+            Args = [?MODULE, state_dir(), Error],
+            couch_log:warning(LogMsg, Args),
+            {error, {Msg, Error}}
+    end.
+
+% Private functions
+
+unpersist(false, Name, _Capacity) ->
+    smoosh_priority_queue:new(Name);
+unpersist(true, Name, Capacity) ->
+    Path = file_path(Name),
+    case read(Path) of
+        {ok, Map} -> smoosh_priority_queue:from_map(Name, Capacity, Map);
+        {error, _} -> smoosh_priority_queue:new(Name)
+    end.
+
+persist(false, _Waiting, _Active, _Starting) ->
+    ok;
+persist(true, Waiting, Active, Starting) ->
+    Name = smoosh_priority_queue:name(Waiting),
+    WMap = smoosh_priority_queue:to_map(Waiting),
+    % Starting and active jobs are at priority level `infinity` as they are
+    % already running. We want them to be the first one to continue after
+    % restart. We're relying on infinity sorting higher than float and integer
+    % numeric values here.
+    AMap = maps:map(fun(_, _) -> infinity end, Active),
+    SMap = maps:from_list([{K, infinity} || K <- maps:values(Starting)]),
+    Path = file_path(Name),
+    write(maps:merge(WMap, maps:merge(AMap, SMap)), Path).
+
+check_setup(false) ->
+    disabled;
+check_setup(true) ->
+    StateDir = state_dir(),
+    Path = filename:join(StateDir, "smooshq.test"),
+    Data = #{<<"test">> => 1},
+    case file:read_file_info(StateDir) of
+        {ok, #file_info{access = A}} when A == read; A == read_write ->
+            ok;
+        {ok, #file_info{access = Invalid}} ->
+            throw({fail, "read access", Invalid});
+        {error, Error1} ->
+            throw({fail, "read", Error1})
+    end,
+    case write(Data, Path) of
+        ok -> ok;
+        {error, Error2} -> throw({fail, "write", Error2})
+    end,
+    delete_file(Path).
+
+write(#{} = QData, Path) when is_list(Path), map_size(QData) == 0 ->
+    % Save a few bytes by deleting the persisted queue data if
+    % there are no waiting/starting or active jobs
+    delete_file(Path);
+write(#{} = QData, Path) when is_list(Path) ->
+    Bin = term_to_binary(QData, [compressed, {minor_version, 2}]),
+    TmpPath = tmp_path(Path),
+    case file:write_file(TmpPath, Bin, [raw]) of
+        ok -> file:rename(TmpPath, Path);
+        {error, _} = Error -> Error
+    end.
+
+read(Path) ->
+    case file:read_file(Path) of
+        {ok, Bin} ->
+            try binary_to_term(Bin, [safe]) of
+                #{} = QData -> {ok, QData};
+                _ -> {error, term_not_a_map}
+            catch
+                _:_ ->
+                    {error, invalid_term}
+            end;
+        {error, _} = Error ->
+            Error
+    end.
+
+tmp_path(Path) ->
+    Time = abs(erlang:system_time()),
+    Path ++ "." ++ integer_to_list(Time) ++ ".tmp".
+
+file_path(Name) ->
+    StateDir = filename:absname(state_dir()),
+    filename:join(StateDir, Name ++ ?SUFFIX).
+
+state_dir() ->
+    Dir = config:get("smoosh", "state_dir", "."),
+    filename:absname(Dir).
+
+delete_file(Path) ->
+    % On Erlang 24+ we can avoid using the file server
+    case erlang:function_exported(file, delete, 2) of
+        true -> file:delete(Path, [raw]);
+        false -> file:delete(Path)
+    end.
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+persist_unpersist_test_() ->
+    {
+        foreach,
+        fun() ->
+            meck:expect(config, get, fun(_, _, Default) -> Default end)
+        end,
+        fun(_) ->
+            meck:unload()
+        end,
+        [
+            ?TDEF_FE(t_write_read_delete),
+            ?TDEF_FE(t_fail_write_read_delete),
+            ?TDEF_FE(t_corrupted_read),
+            ?TDEF_FE(t_check_setup),
+            ?TDEF_FE(t_persist_unpersist_disabled),
+            ?TDEF_FE(t_persist_unpersist_enabled),
+            ?TDEF_FE(t_persist_unpersist_errors)
+        ]
+    }.
+
+t_write_read_delete(_) ->
+    Path = file_path("foochan"),
+    Data = #{<<"a">> => 1},
+
+    ?assertEqual(ok, write(Data, Path)),
+    ?assertMatch({ok, _}, file:read_file_info(Path)),
+
+    ReadRes = read(Path),
+    ?assertMatch({ok, #{}}, ReadRes),
+    {ok, ReadData} = ReadRes,
+    ?assertEqual(Data, ReadData),
+
+    ?assertEqual(ok, write(#{}, Path)),
+    ?assertEqual({error, enoent}, file:read_file_info(Path)).
+
+t_fail_write_read_delete(_) ->
+    meck:expect(config, get, fun("smoosh", "state_dir", _) -> "./x" end),
+    Path = file_path("foochan"),
+    ?assertEqual({error, enoent}, write(#{<<"a">> => 1}, Path)),
+    ?assertEqual({error, enoent}, read(Path)),
+    ?assertEqual({error, enoent}, write(#{}, Path)).
+
+t_corrupted_read(_) ->
+    Path = file_path("foochan"),
+    ?assertEqual(ok, write(#{<<"a">> => 1}, Path)),
+
+    ok = file:write_file(Path, term_to_binary(foo), [raw]),
+    ?assertEqual({error, term_not_a_map}, read(Path)),
+
+    ok = file:write_file(Path, <<"42">>, [raw]),
+    ?assertEqual({error, invalid_term}, read(Path)),
+
+    ?assertEqual(ok, write(#{}, Path)),
+    ?assertEqual({error, enoent}, file:read_file_info(Path)).
+
+t_check_setup(_) ->
+    ?assertEqual(disabled, check_setup()),
+
+    meck:expect(config, get_boolean, fun("smoosh", "persist", _) -> true end),
+    ?assertEqual(ok, check_setup()),
+
+    TDir = ?tempfile(),
+    meck:expect(config, get, fun("smoosh", "state_dir", _) -> TDir end),
+    ?assertEqual({error, {"read", enoent}}, check_setup()),
+
+    Dir = state_dir(),
+    ok = file:make_dir(Dir),
+    % Can't write, only read
+    ok = file:change_mode(Dir, 8#500),
+    ?assertEqual({error, {"write", eacces}}, check_setup()),
+    % Can't read, only write
+    ok = file:change_mode(Dir, 8#300),
+    ?assertEqual({error, {"read access", write}}, check_setup()),
+    ok = file:del_dir_r(Dir).
+
+t_persist_unpersist_disabled(_) ->
+    Name = "chan1",
+    Q = smoosh_priority_queue:new(Name),
+    [K1, K2, K3] = [<<"x">>, {<<"x">>, <<"_design/y">>}, {index_cleanup, 
<<"z">>}],
+    Active = #{K1 => self()},
+    Starting = #{make_ref() => K2},
+    Q1 = smoosh_priority_queue:in(K3, 1.0, 9999, Q),
+
+    ?assertEqual(ok, persist(Q1, Active, Starting)),
+
+    Q2 = unpersist(Name),
+    ?assertEqual(Name, smoosh_priority_queue:name(Q2)),
+    ?assertEqual([{size, 0}], smoosh_priority_queue:info(Q2)).
+
+t_persist_unpersist_enabled(_) ->
+    Name = "chan2",
+    Q = smoosh_priority_queue:new(Name),
+    Keys = [K1, K2, K3] = [<<"x">>, {<<"x">>, <<"y">>}, {index_cleanup, 
<<"z">>}],
+    Active = #{K1 => self()},
+    Starting = #{make_ref() => K2},
+    Q1 = smoosh_priority_queue:in(K3, 1.0, 9999, Q),
+
+    meck:expect(config, get_boolean, fun("smoosh", "persist", _) -> true end),
+    ?assertEqual(ok, persist(Q1, Active, Starting)),
+
+    Q2 = unpersist(Name),
+    ?assertEqual(Name, smoosh_priority_queue:name(Q2)),
+    Info2 = smoosh_priority_queue:info(Q2),
+    ?assertEqual([{size, 3}, {min, 1.0}, {max, infinity}], Info2),
+    ?assertEqual(Keys, drain_q(Q2)),
+
+    % Try to persisted the already unpersisted queue

Review Comment:
   s/persisted/persist/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to