davisp commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111797785
##########
File path: src/couch_replicator/README.md
##########
@@ -0,0 +1,295 @@
+Developer Oriented Replicator Description
+=========================================
+
+This description of scheduling replicator's functionality is mainly geared to
+CouchDB developers. It dives a bit into the internal and explains how
+everything is connected together.
+
+A natural place to start is the top applicatin supervisor:
+`couch_replicator_sup`. It's a `rest_for_one` so if a child process
+terminates, the rest of the childred in the hierarchy following it are also
+terminated. This structure implies a useful constraint -- children lower in
+the list can safely call their siblings which are higher in the list.
+
+A description of each child:
+
+ * `couch_replication_event`: Starts a gen_event publication bus to handle some
+ replication related events. This used for example, to publish cluster
+ membership changes by the `couch_replicator_clustering` process. But is
+ also used in replication tests to minotor for replication events.
+ Notification is performed via the `couch_replicator_notifier:notify/1`
+ function. It's the first (left-most) child because
+ `couch_replicator_clustering` uses it.
+
+ * `couch_replicator_clustering`: This module maintains cluster membership
+ information for the replication application and provides functions to check
+ ownership of replication jobs. A cluster membership change is published via
+ the `gen_event` event server named `couch_replication_event` as previously
+ covered. Published events are `{cluster, stable}` when cluster membership
+ has stabilized, that it, no node membership changes in a given period, and
+ `{cluster, unstable}` which indicates there was a recent change to the
+ cluster membership and now it's considered unstable. Listeners for cluster
+ membership change include `couch_replicator_doc_processor` and
+ `couch_replicator_db_changes`. When doc processor gets an `{cluster,
+ stable}` event it will remove all the replication jobs not belonging to the
+ current node. When `couch_replicator_db_chanages` gets a `{cluster,
+ stable}` event, it will restart the `couch_multidb_changes` process it
+ controls, which will launch an new scan of all the replicator databases.
+
+ * `couch_replicator_connection`: Maintains a global replication connection
+ pool. It allows reusing connections across replication tasks. The Main
+ interface is `acquire/1` and `release/1`. The general idea is once a
+ connection is established, it is kept around for
+ `replicator.connection_close_interval` milliseconds in case another
+ replication task wants to re-use it. It is worth pointing out how linking
+ and monitoring is handled: Workers are linked to the connection pool when
+ they are created. If they crash connection pool listens for the EXIT event
+ and cleans up. Connection pool also monitors owners (by monitoring the the
+ `Pid` from the `From` argument in the call to `acquire/1`) and cleans up if
+ owner dies. Another interesting thing is that connection establishment
+ (creation) happens in the owner process so the pool is not blocked on it.
+
+ * `couch_replicator_rate_limiter` : Implements a rate limiter to handle
+ connection throttling from sources or targets where requests return 429
+ error codes. Uses the Additive Increase / Multiplicative Decrease feedback
+ control algorithm to converge on the channel capacity. Implemented using a
+ 16-way sharded ETS table to maintain connection state. The table sharding
+ code is split out to `couch_replicator_rate_limiter_tables` module. The
+ main idea of the module it so maintain and continually estimate an interval
+ for each connection represented by the `{Method, Url}`. The interval is
+ updated accordingly on each call to `failure/1` or `success/1` calls. A
+ `failure/1` is supposed to be called after a 429 is received and
+ `success/1` when a successful request has been made. Also when no failures
+ are happening the code is ensuring the ETS tables are empty in order to
+ have a lower impact on a running system.
+
+ * `couch_replicator_scheduler` : Scheduler is the core component of the
+ scheduling replicator. It allows handling a larger number of jobs than
+ might be possible to actively run on the cluster. It accomplishes this by
+ switching between jobs (stopping some and starting others) to ensure all
+ make progress. Replication jobs which fail are penalized using exponential
+ backoff. That is, each consecutive failure will double the time penalty.
+ This frees up system resources for more useful work than just continuously
+ trying to run the same subset of failing jobs.
+
+ The main API function is `add_job/1`. Its argument is an instance of
+ `#rep{}` record, which could also be the result of a document update from a
+ _replicator db or it could be the result of a POST to `_replicate`
+ endpoint. Once the replication job is added to the scheduler it doesn't
+ matter much where it originated.
+
+ Each job internally is represented by the `#job{}` record. It contains the
+ original `#rep{}` but also, among a few other things, maintain an event
+ history. The history maintains a sequence of events of each job. These are
+ timestamped and ordered such that the most recent event is at the head.
+ History length is limited based on the `replicator.max_history` config
+ value. The default is 20 entries. History events types are:
+
+ * `added` : job was just added to the scheduler. This is the first event.
+ * `started` : job was started. This was an attempt to run the job.
+ * `stopped` : job was stopped by the scheduler.
+ * `crashed` : job has crashed (instead of stopping cleanly).
+
+ The core of the algorithm is the `reschedule/1` function. That function is
+ called every `replicator.interval` milliseconds (default is 60000 i.e. a
+ minute). During each call scheduler will try to stop some jobs, start some
+ new ones and will also try to keep the maximum amount of jobs running less
+ than `replicator.max_jobs` (deafult is 500). So the functions does these
+ operations (actual code paste):
+
+ ```
+ Running = running_job_count(),
+ Pending = pending_job_count(),
+ stop_excess_jobs(State, Running),
+ start_pending_jobs(State, Running, Pending),
+ rotate_jobs(State, Running, Pending),
+ update_running_jobs_stats(State#state.stats_pid)
+ ```
+
+ `Running` is gathering the total number of currently runnig jobs. `Pending`
+ is the total number of jobs waiting to be run. `stop_excess_jobs` will stop
+ any exceeding `replicator.max_jobs` configured limit. This code takes
+ effect if user reduces `max_jobs` configuration value. `start_pending_jobs`
+ will start any jobs if there is more room available. This will take effect
+ on startup or when user increases `max_jobs` configuration value.
+ `rotate_jobs` is where all the action happens. There scheduler picks
+ `replicator.max_churn` running jobs to stop and then picks the same number
+ of pending jobs to start. The default value of `max_churn` is 20. So by
+ default every minute, 20 running jobs are stopped, and 20 new pending jobs
+ are started.
+
+ Before moving on it is worth pointing out that scheduler treats continuous
+ and non-continuous replications differently. Normal (non-continuous)
+ replications once started will be allowed to run to completion. That
+ behavior is to preserve their semantics of replicating a snapshot of the
+ source database to the target. For example if new documents are added to
+ the source after the replication are started, those updates should not show
+ up on the target database. Stopping and restring a normal replication would
+ violate that constraint. The only exception to the rule is the user
+ explicitly reduces `replicator.max_jobs` configuration value. Even then
+ scheduler will first attempt to stop as many continuous jobs as possible
+ and only if it has no choice left, it will stop normal jobs.
+
+ Keeping that in mind and going back to the scheduling algorithm, the next
+ interesting part is how the scheduler picks which jobs to stop and which
+ ones to start:
+
+ * Stopping: When picking jobs to stop the cheduler will pick longest
+ running continuous jobs first. The sorting callback function to get the
+ longest running jobs is unsurprisingly called `longest_running/2`. To
+ pick the longest running jobs it looks at the most recent `started`
+ event. After it gets a sorted list by longest running, it simply
+ picks first few depending on the value of `max_churn` using
+ `lists:sublist/2`. Then those jobs are stopped.
+
+ * Starting: When starting the scheduler will pick the jobs which have been
+ waiting the longest. Surprisingly, in this case it also looks at the
+ `started` timestamp and picks the jobs which have the oldest `started`
+ timestamp. If there are 3 jobs, A[started=10], B[started=7],
+ C[started=9], then B will be picked first, then C then A. This ensures
+ that jobs are not starved, which is a classic scheduling pitfall.
+
+ In the code, the list of pending jobs is picked slightly differently than
+ how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
+ to iterate over all the pending jobs. As it iterates it tries to keep only
+ up to `max_churn` oldest items in the accumulator. The reason this is done
+ is that there could be a very large number of pending jobs and loading them
+ all in a list (making a copy from ETS) and then sorting it can be quite
+ expensive performance-wise. The tricky part of the iteration is happening
+ in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
+ longest waiting jobs so far. The code has a comment with a helpful example
+ on how this algorithm works.
+
+ The last part is how the scheduler treats jobs which keep crashing. If a
+ job is started but then crashes then that job is considered unhealthy. The
+ main idea is to penalize such jobs such that they are forced to wait an
+ exponentially larger amount of time with each consecutive crash. A central
+ part to this algorithm is determining what forms a sequence of consecutive
+ crashes. If a job starts then quickly crashes, and after next start it
+ crashes again, then that would become a sequence of 2 consecutive crashes.
+ The penalty then would be calcualted by `backoff_micros/1` function where
+ the consecutive crash count would end up as the exponent. However for
+ practical concerns there is also maximum penalty specified and that's the
+ equivalent of 10 consecutive crashes. Timewise it ends up being about 8
+ hours. That means even a job which keep crashing will still get a chance to
+ retry once in 8 hours.
+
+ There is subtlety when calculating consecutive crashes and that is deciding
+ when the sequence stops. That is, figuring out when a job becomes healthy
+ again. Scheduler considers a job healthy again if it started and hasn't
+ crashed in a while. The "in a while" part is a configuration parameter
+ `replicator.health_threshold` defaulting to 2 minutes. This means if job
+ has been crashing, for example 5 times in a row, but then on the 6th
+ attempt it started and ran for more than 2 minutes then it is considered
+ healthy again. Next time it crashes its sequence of consecutive crashes
Review comment:
The next time
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services