Due to the Trac situation, I am forwarding this patch along to this list in the hopes of elicting some feedback. I will be sure to open a proper ticket when possible.
I have done some extensive work on active record to allow for databases to be defined in terms of "connection pools" rather than simply one database. Most of the work was completed in connection_specification.rb and is done at this level to be compatible with any backend you choose. Also this patch maintains full compatibiltity with the current paradigm so no changes are necessary to current applications/documentation.
Defining a database pool is very easy and follows a common sense convention. I will assume you are running with RAILS_ENV=development for this example. You first define your connections as you always have in database.yml . Your are then able to able to define
development_read_pool: db1, db2, db3
development_write_pool: db3, db4, db5 (In general, the name is RAILS_ENV_write_pool so you can test your clusters in development and production with no config changes)
where each is a set of connection names that you have already defined. ActiveRecord::Base.connection will then return connections from the appropriate pool using round-robin when more than one connection is available to it. This is handy if you hvae a high traffic website for example and you want to load balance over several slave servers for reading while writing to the one master server consistently. The syntax for .connection is as follow
ExampleModel.connection # default "compatibility behavior" always returns a write connection.
ExampleModel.connection(:read) # Return a connection from the read pool
ExampleModel.connection (:write) # Return a connection from the write pool
I have also changed a few of the functions in base.rb to utilize the correction pool (for example, find_by_sql calls connection(:read)) This again makes the patch seamless to end applications while allowing them to use the new functionality.
A patch aganist the lastest CVS is attached. Some rough notes on implementation are also include below - there not 100% complete but give a good idea of what was changed.
This currently passes an ActiveRecord rake with flying colors.
Implementation:
1. Added two arrays to act as pools for connections related to this connection.
@@read_connection_pool = {}
@@write_connection_pool = {}
Each is later defined as an array such that
@@read_connection_pool[name] represenets all of the read connections available to the current
class. This allows us to stay fully backward compatiable with the old methodology where name
is @active_connection_name
2. Define 2 variables to track index of last used connection. This is used when doing round-robin
@@last_read_connection = 0
@@last_write_connection = 0
3. Define a function for appending connections to pools (append_spec_to_connection
_pools spec).
If the config has an attribute read_only == true then it is only entered into the read_connection_pool
and vice versa for the write_connection_pool. @@defined_connections also is defined at the same time.
@@write_connection_pool[spec.object_id] always equals @@defined_connections[spec.object_id]
5. Define a function establish_connection_pools which looks for two variables to be
set in the configuration:
RAILS_ENV_read_connection_pool
RAILS_ENV_write_conenction_pool
Normally in Rails, this would setup in the YAML file. Each variable is a comma delimited
list of connections to use, one for read the other for write.
6. Define a function clear_connection_pool which clears the connection pools.
7. Modify establish_connection
a. When passed nil, call establish_connection_pools(RAILS_ENV)
b. When passed a ConnectionSpecification, clear all connection pools as well as the
active_connection_name
8. Move code from establish connection to ConnectionSpecification constructor so an object can be made
out of any spec thats passed to it. (Avoiding breakage of DRY principle)
9. Remove settings @@defined_connections[name] in establish_connection. This is now done by
calling
10. Define two functions, round robin read and round robin write which
returns AbstractAdapters from the various pools.
11. Modify retreive connection so that it takes an id of an object that should be in @@defined_connections
Remove it's dependency on connection= as this will break things.
12. Modify connection= to call establish_connection on ConnectionSpecification.
13. Moidfy self.remove_connection to call clear_connection_pool
14. Modify active_connection_name to check if a pool exists instead of defined_connections
Patch attached is aganist whats in the subversion repo.
Best Regards,
Stephen Blackstone
If the config has an attribute read_only == true then it is only entered into the read_connection_pool
and vice versa for the write_connection_pool. @@defined_connections also is defined at the same time.
@@write_connection_pool[spec.object_id] always equals @@defined_connections[spec.object_id]
5. Define a function establish_connection_pools which looks for two variables to be
set in the configuration:
RAILS_ENV_read_connection_pool
RAILS_ENV_write_conenction_pool
Normally in Rails, this would setup in the YAML file. Each variable is a comma delimited
list of connections to use, one for read the other for write.
6. Define a function clear_connection_pool which clears the connection pools.
7. Modify establish_connection
a. When passed nil, call establish_connection_pools(RAILS_ENV)
b. When passed a ConnectionSpecification, clear all connection pools as well as the
active_connection_name
8. Move code from establish connection to ConnectionSpecification constructor so an object can be made
out of any spec thats passed to it. (Avoiding breakage of DRY principle)
9. Remove settings @@defined_connections[name] in establish_connection. This is now done by
calling
10. Define two functions, round robin read and round robin write which
returns AbstractAdapters from the various pools.
11. Modify retreive connection so that it takes an id of an object that should be in @@defined_connections
Remove it's dependency on connection= as this will break things.
12. Modify connection= to call establish_connection on ConnectionSpecification.
13. Moidfy self.remove_connection to call clear_connection_pool
14. Modify active_connection_name to check if a pool exists instead of defined_connections
Patch attached is aganist whats in the subversion repo.
Best Regards,
Stephen Blackstone
Index: lib/active_record/connection_adapters/abstract/connection_specification.rb =================================================================== --- lib/active_record/connection_adapters/abstract/connection_specification.rb (revision 4619) +++ lib/active_record/connection_adapters/abstract/connection_specification.rb (working copy) @@ -4,7 +4,17 @@ class Base class ConnectionSpecification #:nodoc: attr_reader :config, :adapter_method - def initialize (config, adapter_method) + def initialize (config, adapter_method = nil) + # If no adapter is found, try to devine it from the config. + # This was moved here from establish_connection since it's now + # useful in other places. + if adapter_method.nil? + config = config.symbolize_keys + unless config.key?(:adapter) then raise AdapterNotSpecified, "database configuration does not specify adapter" end + adapter_method = "#{config[:adapter]}_connection" + unless ActiveRecord::Base.respond_to?(adapter_method) then + raise AdapterNotFound, "database configuration specifies nonexistent #{config[:adapter]} adapter" end + end @config, @adapter_method = config, adapter_method end end @@ -20,16 +30,26 @@ # The class -> thread id -> adapter cache. (class -> adapter if not allow_concurrency) @@active_connections = {} + # Define a connection pool for doing database reads. + @@read_connection_pool = {} + + # Define a connection pool for doing database reads. + @@write_connection_pool = {} + + # Used in looping the connection pools. + @@last_read_connection = {} + @@last_write_connection = {} + class << self # Retrieve the connection cache. def thread_safe_active_connections #:nodoc: @@active_connections[Thread.current.object_id] ||= {} end - + def single_threaded_active_connections #:nodoc: @@active_connections end - + # pick up the right active_connection method from @@allow_concurrency if @@allow_concurrency alias_method :active_connections, :thread_safe_active_connections @@ -52,8 +72,9 @@ end def active_connection_name #:nodoc: + # This must use name, not active_connection_name @active_connection_name ||= - if active_connections[name] || @@defined_connections[name] + if @@write_connection_pool[name] || @@read_connection_pool[name] name elsif self == ActiveRecord::Base nil @@ -70,13 +91,24 @@ # Returns the connection currently associated with the class. This can # also be used to "borrow" the connection to do database work unrelated # to any of the specific Active Records. - def connection - if @active_connection_name && (conn = [EMAIL PROTECTED]) + def connection(type = nil) + case type + when nil, :write + # Provides write connection on nil for backwards compatibility. + id = round_robin_write_connection.object_id + when :read + id = round_robin_read_connection.object_id + else + # If someone passes garbage, raise error. + # they could have mispelt "read" and that could bork their database. + raise ActiveRecordError, "Connection type can only be read or write." + end + if conn = active_connections[id] conn else # retrieve_connection sets the cache key. - conn = retrieve_connection - [EMAIL PROTECTED] = conn + conn = retrieve_connection(id) + active_connections[id] = conn end end @@ -100,57 +132,174 @@ end end + protected + def round_robin_read_connection + if @@read_connection_pool[active_connection_name].nil? || @@read_connection_pool[active_connection_name].size == 0 + raise ConnectionNotEstablished, "No Connection is available" + else + @@last_read_connection = (@@last_read_connection + 1) % @@read_connection_pool[active_connection_name].size + @@read_connection_pool[active_connection_name].to_a[@@last_read_connection][1] + end + end + + def round_robin_write_connection + # See comments for round_robin_read_connection. + if @@write_connection_pool[active_connection_name].nil? || @@write_connection_pool[active_connection_name].size == 0 + raise ConnectionNotEstablished, "No Connection is available" + else + @@last_write_connection] = (@@last_write_connection + 1) % @@write_connection_pool[active_connection_name].size + @@write_connection_pool[active_connection_name].to_a[@@last_write_connection][1] + end + end + private - def clear_cache!(cache, thread_id = nil, &block) - if cache - if @@allow_concurrency - thread_id ||= Thread.current.object_id - thread_cache, cache = cache, cache[thread_id] - return unless cache - end - - cache.each(&block) if block_given? - cache.clear + def clear_cache!(cache, thread_id = nil, &block) + if cache + if @@allow_concurrency + thread_id ||= Thread.current.object_id + thread_cache, cache = cache, cache[thread_id] + return unless cache end + cache.each(&block) if block_given? + cache.clear + end ensure if thread_cache && @@allow_concurrency thread_cache.delete(thread_id) end + end + + # Remove stale threads from the cache. + def remove_stale_cached_threads!(cache, &block) + stale = Set.new(cache.keys) + + Thread.list.each do |thread| + stale.delete(thread.object_id) if thread.alive? end - # Remove stale threads from the cache. - def remove_stale_cached_threads!(cache, &block) - stale = Set.new(cache.keys) - - Thread.list.each do |thread| - stale.delete(thread.object_id) if thread.alive? - end - - stale.each do |thread_id| - clear_cache!(cache, thread_id, &block) - end + stale.each do |thread_id| + clear_cache!(cache, thread_id, &block) end + end - def clear_all_cached_connections! - if @@allow_concurrency - @@active_connections.each_value do |connection_hash_for_thread| - connection_hash_for_thread.each_value {|conn| conn.disconnect! } - connection_hash_for_thread.clear - end - else - @@active_connections.each_value {|conn| conn.disconnect! } + def clear_all_cached_connections! + if @@allow_concurrency + @@active_connections.each_value do |connection_hash_for_thread| + connection_hash_for_thread.each_value {|conn| conn.disconnect! } + connection_hash_for_thread.clear end - @@active_connections.clear + else + @@active_connections.each_value {|conn| conn.disconnect! } end + @@active_connections.clear + end end # Returns the connection currently associated with the class. This can # also be used to "borrow" the connection to do database work that isn't # easily done without going straight to SQL. - def connection - self.class.connection + def connection(type = nil) + self.class.connection(type) end + + # Add specs to various connection pools. + def self.append_spec_to_connection_pools spec + # The default behavior allows a connection to be used for both reading and writing. + # Otherwise, we just load both pools. + + @@write_connection_pool[active_connection_name] ||= {} + @@read_connection_pool[active_connection_name] ||= {} + @@last_read_connection[active_connection_name] = 0 + @@last_write_connection[active_connection_name] = 0 + # Check out what were were passed if its not a ConnectionSpec, try to make one. + if !spec.kind_of?(ActiveRecord::Base::ConnectionSpecification) + spec = ConnectionSpecification.new(spec) + end + + if !spec.config.key?(:read_only) && !spec.config[:read_only] != 'true' + @@write_connection_pool[active_connection_name][spec.object_id] = spec if !@@write_connection_pool[active_connection_name][spec.object_id] + end + if !spec.config.key?(:write_only) && !spec.config[:write_only] != 'true' + @@read_connection_pool[active_connection_name][spec.object_id] = spec if !@@read_connection_pool[active_connection_name][spec.object_id] + end + @@defined_connections[spec.object_id] = spec + end + + def self.clear_connection_pools(klass=self) + if @@write_connection_pool[klass.name].kind_of?(Hash) + @@write_connection_pool[klass.name].each_value {|conn| + active_connections[conn.object_id].disconnect! if active_connections[conn.object_id] } + @@write_connection_pool[klass.name].clear + end + if @@read_connection_pool[klass.name].kind_of?(Hash) + @@read_connection_pool[klass.name].each_value {|conn| + active_connections[conn.object_id].disconnect! if active_connections[conn.object_id] } + @@read_connection_pool[klass.name].clear + end + @@read_connection_pool.delete(klass.name) + @@write_connection_pool.delete(klass.name) + clear_all_cached_connections! + end + + + # Connection Pool Support. Stephen Blackstone <[EMAIL PROTECTED]> + # Connection pools are defined in your YAML file as following: + # + # Connections that are in one pool or the other are automagically assigned to + # the correct pool and will not insert in the other. + # + # + # + #development: + # adapter: mysql + # database: project_development + # username: someuser + # password: happyland + # host: 10.0.0.1 + # port: 3306 + # + #development2: + # adapter: mysql + # database: project_development + # username: root + # password: somethingelse + # host: 10.0.0.2 + # port: 3306 + # + # development_write_pool: development + # development_read_pool: development,development2 + # + # You can theoretically force a connection to go in one pool or another but by putting + # read_only : true or write_only: true + # However with the current scheme, the only way multiple databases are loaded at once + # is when it is done as part of a connection pool. + # + + def self.establish_connection_pool(environment) + # Connections that are specifically in a pool override the read/write settings. + @active_connection_name = name + if @read = configurations["#{environment}_read_pool"] + @read.split(",").each do |connection| + spec = configurations[connection].clone + spec[:read_only] = true + append_spec_to_connection_pools(spec) + end + end + if @write = configurations["#{environment}_write_pool"] + @write.split(",").each do |connection| + spec = configurations[connection].clone + spec[:write_only] = true + append_spec_to_connection_pools(spec) + end + end + if (!@@write_connection_pool[name].nil? and !@@read_connection_pool[name].nil? and @@write_connection_pool[name].size > 0 && @@read_connection_pool[name].size > 0) + return true + else + return false + end + end + # Establishes the connection to the database. Accepts a hash as input where # the :adapter key must be specified with the name of a database adapter (in lower-case) # example for regular databases (MySQL, Postgresql, etc): @@ -182,11 +331,13 @@ case spec when nil raise AdapterNotSpecified unless defined? RAILS_ENV - establish_connection(RAILS_ENV) + if !establish_connection_pool(RAILS_ENV) + establish_connection(RAILS_ENV) + end when ConnectionSpecification - clear_active_connection_name + clear_active_connection_name @active_connection_name = name - @@defined_connections[name] = spec + append_spec_to_connection_pools(spec) when Symbol, String if configuration = configurations[spec.to_s] establish_connection(configuration) @@ -194,12 +345,7 @@ raise AdapterNotSpecified, "#{spec} database is not configured" end else - spec = spec.symbolize_keys - unless spec.key?(:adapter) then raise AdapterNotSpecified, "database configuration does not specify adapter" end - adapter_method = "#{spec[:adapter]}_connection" - unless respond_to?(adapter_method) then raise AdapterNotFound, "database configuration specifies nonexistent #{spec[:adapter]} adapter" end - remove_connection - establish_connection(ConnectionSpecification.new(spec, adapter_method)) + establish_connection(ConnectionSpecification.new(spec)) end end @@ -207,27 +353,31 @@ # active or defined connections: if it is the latter, it will be # opened and set as the active connection for the class it was defined # for (not necessarily the current class). - def self.retrieve_connection #:nodoc: + def self.retrieve_connection(id = nil) #:nodoc: # Name is nil if establish_connection hasn't been called for # some class along the inheritance chain up to AR::Base yet. - if name = active_connection_name - if conn = active_connections[name] + if id.nil? + id = round_robin_write_connection.object_id + end + if conn = active_connections[id] # Verify the connection. conn.verify!(@@verification_timeout) - elsif spec = @@defined_connections[name] + elsif spec = @@defined_connections[id] # Activate this connection specification. - klass = name.constantize - klass.connection = spec - conn = active_connections[name] - end - end - + # Note this no longer calls connection= which causes problems + # with read/write pools being clobbered. + config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency) + active_connections[id] = self.send(spec.adapter_method, config) + conn = active_connections[id] + end conn or raise ConnectionNotEstablished end # Returns true if a connection that's accessible to this class have already been opened. def self.connected? - active_connections[active_connection_name] ? true : false + return false if active_connections[active_connection_name].nil? + return false if active_connections[active_connection_name][round_robin_write_connection.object_id].nil? + return true end # Remove the connection for this class. This will close the active @@ -235,21 +385,20 @@ # can be used as argument for establish_connection, for easy # re-establishing of the connection. def self.remove_connection(klass=self) - spec = @@defined_connections[klass.name] - konn = active_connections[klass.name] - @@defined_connections.delete_if { |key, value| value == spec } - active_connections.delete_if { |key, value| value == konn } - konn.disconnect! if konn - spec.config if spec + # Arbitarily choosing the first spec off the write pool to send back. + # This is here to stay backwards compatible + spec = @@defined_connections[round_robin_write_connection.object_id] if !@@defined_connections[round_robin_write_connection.object_id].nil? + clear_connection_pools(klass) + spec if spec end # Set the connection for the class. def self.connection=(spec) #:nodoc: if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) - active_connections[name] = spec + active_connections[active_connection_name] = spec elsif spec.kind_of?(ConnectionSpecification) config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency) - self.connection = self.send(spec.adapter_method, config) + establish_connection(spec.adapter_method, config) elsif spec.nil? raise ConnectionNotEstablished else Index: lib/active_record/base.rb =================================================================== --- lib/active_record/base.rb (revision 4619) +++ lib/active_record/base.rb (working copy) @@ -419,7 +419,7 @@ # Post.find_by_sql "SELECT p.*, c.author FROM posts p, comments c WHERE p.id = c.post_id" # Post.find_by_sql ["SELECT * FROM posts WHERE author = ? AND created > ?", author_id, start_date] def find_by_sql(sql) - connection.select_all(sanitize_sql(sql), "#{name} Load").collect! { |record| instantiate(record) } + connection(:read).select_all(sanitize_sql(sql), "#{name} Load").collect! { |record| instantiate(record) } end # Returns true if the given +id+ represents the primary key of a record in the database, false otherwise. @@ -506,7 +506,7 @@ # Product.count_by_sql "SELECT COUNT(*) FROM sales s, customers c WHERE s.customer_id = c.id" def count_by_sql(sql) sql = sanitize_conditions(sql) - connection.select_value(sql, "#{name} Count").to_i + connection(:read).select_value(sql, "#{name} Count").to_i end # Increments the specified counter by one. So <tt>DiscussionBoard.increment_counter("post_count", @@ -725,7 +725,7 @@ # Returns an array of column objects for the table associated with this class. def columns unless @columns - @columns = connection.columns(table_name, "#{name} Columns") + @columns = connection(:read).columns(table_name, "#{name} Columns") @columns.each {|column| column.primary = column.name == primary_key} end @columns
_______________________________________________ Rails-core mailing list Rails-core@lists.rubyonrails.org http://lists.rubyonrails.org/mailman/listinfo/rails-core