 File path: src/couch_replicator/src/couch_replicator_auth_session.erl
 @@ -0,0 +1,545 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%   http://www.apache.org/licenses/LICENSE-2.0
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+% This is the replicator session auth plugin. It implements session based
+% authentication for the replicator. The only public API are the functions from
+% the couch_replicator_auth behaviour. Most of the logic and state is in the
+% gen_server. An instance of a gen_server could be spawned for the source and
+% target endpoints of each replication jobs.
+% The workflow is roughly this:
+%  * On initialization, try to get a cookie in `refresh/1` If an error occurs,
+%    the crash. If `_session` endpoint fails with a 404 (not found), return
+%    `ignore` assuming session authentication is not support or we simply hit a
+%    non-CouchDb server.
+%  * Before each request, auth framework calls `update_headers` API function.
+%    Before updating the headers and returning, check if need to refresh again.
+%    The check looks `next_refresh` time. If that time is set (not `infinity`)
+%    and just expired, then obtain a new cookie, then update headers and
+%    return.
+%  * After each request, auth framework calls `handle_response` function. If
+%    request was successful check if a new cookie was sent by the server in the
+%    `Set-Cookie` header. If it was then then that becomes the current cookie.
+%  * If last request has an auth failure, check if request used a stale cookie
+%    In this case nothing is done, and the client is told to retry. Next time
+%    it updates its headers befor the request it should pick up the latest
+%    cookie.
+%  * If last request failed and cookie was the latest known cookie, schedule a
+%    refresh and tell client to retry. However, if the cookie was just updated,
+%    tell the client to continue such that it will handle the auth failure on
+%    its own via a set of retries with exponential backoffs. This is it to
+%    ensure if something goes wrong and one of the endpoints issues invalid
+%    cookies, replicator won't be stuck in a busy loop refreshing them.
+    initialize/1,
+    update_headers/2,
+    handle_response/4,
+    cleanup/1
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3,
+    format_status/2
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+-type creds() :: {string() | undefined, string() | undefined}.
+% Behavior API callbacks
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+initialize(#httpdb{} = HttpDb) ->
+    case remove_creds(HttpDb) of
+        {ok, User, Pass, HttpDb1} ->
+            case gen_server:start_link(?MODULE, [User, Pass, HttpDb1], []) of
+                {ok, Pid} ->
+                    {ok, HttpDb1, {Pid, 0}};
+                ignore ->
+                    ignore;
+                {error, Error} ->
+                    {error, Error}
+            end;
+        {error, missing_credentials} ->
+            ignore;
+        {error, Error} ->
+            {error, Error}
+    end.
+-spec update_headers(term(), headers()) -> {headers(), term()}.
+update_headers({Pid, Epoch}, Headers) ->
+    Args = {update_headers, Headers, Epoch},
+    {Headers1, Epoch1} = gen_server:call(Pid, Args, infinity),
+    {Headers1, {Pid, Epoch1}}.
+-spec handle_response(term(), code(), headers(), term()) ->
+    {continue | retry, term()}.
+handle_response({Pid, Epoch}, Code, Headers, Body) ->
+    Args =  {handle_response, Code, Headers, Body, Epoch},
+    {Retry, Epoch1} = gen_server:call(Pid, Args, infinity),
+    {Retry, {Pid, Epoch1}}.
+-spec cleanup(term()) -> ok.
+cleanup({Pid, _Epoch}) ->
+    gen_server:call(Pid, stop, infinity).
+%% Definitions
+%% gen_server state
+-record(state, {
+    epoch = 0 :: non_neg_integer(),
+    cookie :: string() | undefined,
+    user :: string() | undefined,
+    pass :: string() | undefined,
+    httpdb_timeout :: integer(),
+    httpdb_pool :: pid(),
+    httpdb_ibrowse_options = [] :: list(),
+    session_url :: string(),
+    next_refresh = infinity :: infinity |  non_neg_integer(),
+    refresh_tstamp = 0 :: non_neg_integer()
+%% gen_server functions
+init([User, Pass, HttpDb]) ->
+    State = #state{
+        user = User,
+        pass = Pass,
+        session_url = get_session_url(HttpDb#httpdb.url),
+        httpdb_pool = HttpDb#httpdb.httpc_pool,
+        httpdb_timeout = HttpDb#httpdb.timeout,
+        httpdb_ibrowse_options = HttpDb#httpdb.ibrowse_options
+    },
+    case refresh(State) of
+        {ok, UpdatedState} ->
+            {ok, UpdatedState};
+        {error, {session_not_supported, _, _}} ->
+            ignore;
+        {error, Error} ->
+            {stop, Error}
+    end.
+terminate(_Reason, _State) ->
+    ok.
+handle_call({update_headers, Headers, _Epoch}, _From, State) ->
+    case maybe_refresh(State) of
+        {ok, State1} ->
+            Cookie = "AuthSession=" ++ State1#state.cookie,
+            Headers1 = [{"Cookie", Cookie} | Headers],
+            {reply, {Headers1, State1#state.epoch}, State1};
+        {error, Error} ->
+            LogMsg = "~p: Stopping session auth plugin because of error ~p",
+            couch_log:error(LogMsg, [?MODULE, Error]),
+            {stop, Error, State}
+    end;
+handle_call({handle_response, Code, Headers, _, Epoch}, _From, State) ->
+    {Retry, State1} = process_response(Code, Headers, Epoch, State),
+    {reply, {Retry, State1#state.epoch}, State1};
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State}.
+handle_cast(Msg, State) ->
+    couch_log:error("~p: Received un-expected cast ~p", [?MODULE, Msg]),
+    {noreply, State}.
+handle_info(Msg, State) ->
+    couch_log:error("~p : Received un-expected message ~p", [?MODULE, Msg]),
+    {noreply, State}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+format_status(_Opt, [_PDict, State]) ->
+    [
+        {epoch, State#state.epoch},
+        {user, State#state.user},
+        {session_url, State#state.session_url},
+        {refresh_tstamp, State#state.refresh_tstamp}
+    ].
+%% Private helper functions
+-spec remove_creds(#httpdb{}) ->
+    {ok, string(), string(), #httpdb{}} | {error, term()}.
+remove_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+    {{HeadersUser, HeadersPass}, HeadersNoCreds} =
+            couch_replicator_utils:remove_basic_auth_from_headers(Headers),
+    case remove_creds_from_url(Url) of
+        {ok, UrlUser, UrlPass, UrlNoCreds} ->
+            case pick_creds({UrlUser, UrlPass}, {HeadersUser, HeadersPass}) of
+                {ok, User, Pass} ->
+                    HttpDb1 = HttpDb#httpdb{
+                        url = UrlNoCreds,
+                        headers = HeadersNoCreds
+                    },
+                    {ok, User, Pass, HttpDb1};
+                {error, Error} ->
+                    {error, Error}
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+% Credentials could be specified in the url and/or in the headers.
+%  * If no credentials specified return error.
+%  * If specified in url but not in headers, pick url creds.
+%  * Otherwise pick headers creds.
+-spec pick_creds(creds(), creds()) ->
+    {ok, string(), string()} | {error, missing_credentials}.
+pick_creds({undefined, _}, {undefined, _}) ->
+    {error, missing_credentials};
+pick_creds({UrlUser, UrlPass}, {undefined, _}) ->
+    {ok, UrlUser, UrlPass};
+pick_creds({_, _}, {HeadersUser, HeadersPass}) ->
+    {ok, HeadersUser, HeadersPass}.
+-spec remove_creds_from_url(string()) ->
+    {ok, string() | undefined, string() | undefined, string()} |
+    {error, term()}.
+remove_creds_from_url(Url) ->
+    case ibrowse_lib:parse_url(Url) of
+        {error, Error} ->
+            {error, Error};
+        #url{username = undefined, password = undefined} ->
+            {ok, undefined, undefined, Url};
+        #url{protocol = Proto, username = User, password = Pass} ->
+            % Excise user and pass parts from the url. Try to keep the host,
+            % port and path as they were in the original.
+            Prefix = lists:concat([Proto, "://", User, ":", Pass, "@"]),
+            Suffix = lists:sublist(Url, length(Prefix) + 1, length(Url) + 1),
+            NoCreds = lists:concat([Proto, "://", Suffix]),
+            {ok, User, Pass, NoCreds}
+    end.
+-spec process_response(non_neg_integer(), headers(),
+    non_neg_integer(), #state{}) -> {retry | continue, #state{}}.
+process_response(403, _Headers, Epoch, State) ->
+    process_auth_failure(Epoch, State);
+process_response(401, _Headers, Epoch, State) ->
+    process_auth_failure(Epoch, State);
+process_response(Code, Headers, _Epoch, State) when Code >= 200, Code < 300 ->
+    % If server noticed cookie is about to time out it can send a new cookie in
+    % the response headers. Take advantage of that and refresh the cookie.
+    State1 = case maybe_update_cookie(Headers, State) of
+        {ok, UpdatedState} ->
+            UpdatedState;
+        {error, cookie_not_found} ->
+            State;
+        {error, Other} ->
+            LogMsg = "~p : Could not parse cookie from response headers ~p",
+            couch_log:error(LogMsg, [?MODULE, Other]),
+            State
+    end,
+    {continue, State1};
+process_response(_Code, _Headers, _Epoch, State) ->
+    {continue, State}.
+-spec process_auth_failure(non_neg_integer(), #state{}) ->
+    {retry | continue, #state{}}.
+process_auth_failure(Epoch, #state{epoch = StateEpoch} = State)
+        when StateEpoch > Epoch ->
+    % This request used an outdated cookie, tell it to immediately retry
+    % and it will pick up the current cookie when its headers are updated
+    {retry, State};
+process_auth_failure(Epoch, #state{epoch = Epoch} = State) ->
+    MinInterval = min_update_interval(),
+    case cookie_age_sec(State, now_sec()) of
+        AgeSec when AgeSec < MinInterval ->
+            % A recently acquired cookie failed. Schedule a refresh and
+            % return `continue` to let httpc's retry apply a backoff
+            {continue, schedule_refresh(now_sec() + MinInterval, State)};
+        _AgeSec ->
+            % Current cookie failed auth. Schedule refresh and ask
+            % httpc to retry the request.
+            {retry, schedule_refresh(now_sec(), State)}
+    end.
+-spec get_session_url(string()) -> string().
+get_session_url(Url) ->
+    #url{
+        protocol = Proto,
+        host = Host,
+        port = Port
+    } = ibrowse_lib:parse_url(Url),
+    WithPort = lists:concat([Proto, "://", Host, ":", Port]),
+    case lists:prefix(WithPort, Url) of
+        true ->
+            % Explicit port specified in the original url
+            WithPort ++ "/_session";
+        false ->
+            % Implicit proto default port was used
+            lists:concat([Proto, "://", Host, "/_session"])
+    end.
+-spec schedule_refresh(non_neg_integer(), #state{}) -> #state{}.
+schedule_refresh(T, #state{next_refresh = Tc} = State) when T < Tc ->
+    State#state{next_refresh = T};
+schedule_refresh(_, #state{} = State) ->
+    State.
+-spec maybe_refresh(#state{}) -> {ok, #state{}} | {error, term()}.
+maybe_refresh(#state{next_refresh = T} = State) ->
+    case now_sec() >= T of
+        true ->
+            refresh(State#state{next_refresh = infinity});
+        false ->
+            {ok, State}
+    end.
+-spec refresh(#state{}) -> {ok, #state{}} | {error, term()}.
+refresh(#state{session_url = Url, user = User, pass = Pass} = State) ->
+    Body = lists:concat(["name=", User, "&password=", Pass]),
+    Headers = [{"Content-Type", "application/x-www-form-urlencoded"}],
+    Result = http_request(State, Url, Headers, post, Body),
+    http_response(Result, State).
+-spec http_request(#state{}, string(), headers(), atom(), string()) ->
+    {ok, string(), headers(), binary()} | {error, term()}.
+http_request(#state{httpdb_pool = Pool} = State, Url, Headers, Method, Body) ->
+    Timeout = State#state.httpdb_timeout,
+    Opts = [
+        {response_format, binary},
+        {inactivity_timeout, Timeout}
+        | State#state.httpdb_ibrowse_options
+    ],
+    {ok, Wrk} = couch_replicator_httpc_pool:get_worker(Pool),
+    try
+        ibrowse:send_req_direct(Wrk, Url, Headers, Method, Body, Opts, Timeout)
+    after
+        ok = couch_replicator_httpc_pool:release_worker(Pool, Wrk)
+    end.
+-spec http_response({ok, string(), headers(), binary()} | {error, term()},
+    #state{}) -> {ok, #state{}} | {error, term()}.
+http_response({ok, "200", Headers, _}, State) ->
+    maybe_update_cookie(Headers, State);
+http_response({ok, "401", _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_request_unauthorized, Url, User}};
+http_response({ok, "403", _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_request_forbidden, Url, User}};
+http_response({ok, "404", _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_not_supported, Url, User}};
+http_response({ok, Code, _, _}, #state{session_url = Url, user = User}) ->
+    {error, {session_unexpected_result, Code, Url, User}};
+http_response({error, Error}, #state{session_url = Url, user = User}) ->
+    {error, {session_request_failed, Url, User, Error}}.
+-spec parse_cookie(list()) -> {ok, string()} | {error, term()}.
+parse_cookie(Headers0) ->
+    Headers = mochiweb_headers:make(Headers0),
+    case mochiweb_headers:get_value("Set-Cookie", Headers) of
+        undefined ->
+            {error, cookie_not_found};
+        CookieHeader ->
+            CookieKVs = mochiweb_cookies:parse_cookie(CookieHeader),
+            CaseInsKVs = mochiweb_headers:make(CookieKVs),
+            case mochiweb_headers:get_value("AuthSession", CaseInsKVs) of
+                undefined ->
+                    {error, cookie_format_invalid};
+                Cookie ->
+                    {ok, Cookie}
+            end
+    end.
+-spec maybe_update_cookie(headers(), #state{}) ->
+    {ok, string()} | {error, term()}.
+maybe_update_cookie(ResponseHeaders, State) ->
+    case parse_cookie(ResponseHeaders) of
+        {ok, Cookie} ->
+            {ok, update_cookie(State, Cookie, now_sec())};
+        {error, Error} ->
+            {error, Error}
+    end.
+-spec update_cookie(#state{}, string(), non_neg_integer()) -> #state{}.
+update_cookie(#state{cookie = Cookie} = State, Cookie, _) ->
+    State;
+update_cookie(#state{epoch = Epoch} = State, Cookie, NowSec) ->
+    State#state{
+        epoch = Epoch + 1,
+        cookie = Cookie,
+        refresh_tstamp = NowSec
+    }.
+-spec cookie_age_sec(#state{}, non_neg_integer()) -> non_neg_integer().
+cookie_age_sec(#state{refresh_tstamp = RefreshTs}, Now) ->
+    case Now - RefreshTs of
+        Diff when Diff >= 0 ->
+            Diff;
+        _Diff ->
+            0  % In the rare chance time jumps backwards
 Review comment:
   `math:max(0, Now - RefreshTs)`?

