Hello,

Due to the Trac situation, I am forwarding this patch along to this list in the hopes of elicting some feedback.  I will be sure to open a proper ticket when possible.

I have done some extensive work on active record to allow for databases to be defined in terms of "connection pools" rather than simply one database.  Most of the work was completed in connection_specification.rb and is done at this level  to be compatible with any backend you choose.   Also this patch maintains full compatibiltity with the current paradigm so no changes are necessary to current applications/documentation.

Defining a database pool is very easy and follows a common sense convention.  I will assume you are running with RAILS_ENV=development for this example.  You first define your connections as you always have in database.yml .  Your are then able to able to define
  development_read_pool: db1, db2, db3
  development_write_pool: db3, db4, db5  (In general, the name is RAILS_ENV_write_pool so you can test your clusters in development and production with no config changes)

where each is a set of connection names that you have already defined.   ActiveRecord::Base.connection will then return connections from the appropriate pool using round-robin when more than one connection is available to it.  This is handy if you hvae a high traffic website for example and you want to load balance over several slave servers for reading while writing to the one master server consistently.  The syntax for .connection is as follow

ExampleModel.connection            # default "compatibility behavior" always returns a write connection.
ExampleModel.connection(:read)  #  Return a connection from the read pool
ExampleModel.connection (:write)  #  Return a connection from the write pool

I have also changed  a few of the functions in base.rb to utilize the correction pool (for example, find_by_sql calls connection(:read))  This again makes the patch seamless to end applications while allowing them to use the new functionality.

A patch aganist the lastest CVS is attached.  Some rough notes on implementation are also include below - there not 100% complete but give a good idea of what was changed.

This currently passes an ActiveRecord rake with flying colors.

Implementation:

1.  Added two arrays to act as pools for connections related to this connection.

   @@read_connection_pool = {}  
   @@write_connection_pool = {}

   Each is later defined as an array such that

    @@read_connection_pool[name] represenets all of the read connections available to the current
        class. This allows us to stay fully backward compatiable with the old methodology where name
    is @active_connection_name


2.  Define 2 variables to track index of last used connection.  This is used when doing round-robin

    @@last_read_connection = 0
    @@last_write_connection = 0

3.  Define a function for appending connections to pools (append_spec_to_connection
_pools spec).
    If the config has an attribute read_only == true then it is only entered into the read_connection_pool
    and vice versa for the write_connection_pool.  @@defined_connections also is defined at the same time.

    @@write_connection_pool[spec.object_id] always equals @@defined_connections[spec.object_id]


5.  Define a function establish_connection_pools which looks for two variables to be
    set in the configuration:

    RAILS_ENV_read_connection_pool
    RAILS_ENV_write_conenction_pool

    Normally in Rails, this would setup in the YAML file.  Each variable is a comma delimited
    list of connections to use, one for read the other for write. 


6.  Define a function clear_connection_pool which clears the connection pools.

7.  Modify establish_connection
    a.  When passed nil, call establish_connection_pools(RAILS_ENV)
    b.  When passed a ConnectionSpecification, clear all connection pools as well as the
        active_connection_name

8.  Move code from establish connection to ConnectionSpecification constructor so an object can be made
    out of any spec thats passed to it.   (Avoiding breakage of DRY principle)


9.  Remove settings @@defined_connections[name] in establish_connection.  This is now done by
    calling


10.  Define two functions, round robin read and round robin write which
     returns AbstractAdapters from the various pools.

11.  Modify retreive connection so that it takes an id of an object that should be in @@defined_connections
     Remove it's dependency on connection= as this will break things.

12.  Modify connection= to call establish_connection on ConnectionSpecification.

13.  Moidfy self.remove_connection to call clear_connection_pool

14.  Modify active_connection_name to check if a pool exists instead of defined_connections

Patch attached is aganist whats in the subversion repo.

Best Regards,
Stephen Blackstone
Index: 
lib/active_record/connection_adapters/abstract/connection_specification.rb
===================================================================
--- lib/active_record/connection_adapters/abstract/connection_specification.rb  
(revision 4619)
+++ lib/active_record/connection_adapters/abstract/connection_specification.rb  
(working copy)
@@ -4,7 +4,17 @@
   class Base
     class ConnectionSpecification #:nodoc:
       attr_reader :config, :adapter_method
-      def initialize (config, adapter_method)
+      def initialize (config, adapter_method = nil)
+      # If no adapter is found, try to devine it from the config.
+      # This was moved here from establish_connection since it's now
+      # useful in other places.  
+      if adapter_method.nil?
+          config = config.symbolize_keys
+          unless config.key?(:adapter) then raise AdapterNotSpecified, 
"database configuration does not specify adapter" end
+          adapter_method = "#{config[:adapter]}_connection"
+          unless ActiveRecord::Base.respond_to?(adapter_method) then 
+          raise AdapterNotFound, "database configuration specifies nonexistent 
#{config[:adapter]} adapter" end
+       end
         @config, @adapter_method = config, adapter_method
       end
     end
@@ -20,16 +30,26 @@
     # The class -> thread id -> adapter cache. (class -> adapter if not 
allow_concurrency)
     @@active_connections = {}
 
+    # Define a connection pool for doing database reads.
+    @@read_connection_pool = {}
+
+    # Define a connection pool for doing database reads.
+    @@write_connection_pool = {}
+    
+    # Used in looping the connection pools.
+    @@last_read_connection =  {}
+    @@last_write_connection = {}
+
     class << self
       # Retrieve the connection cache.
       def thread_safe_active_connections #:nodoc:
         @@active_connections[Thread.current.object_id] ||= {}
       end
-     
+      
       def single_threaded_active_connections #:nodoc:
         @@active_connections
       end
-     
+
       # pick up the right active_connection method from @@allow_concurrency
       if @@allow_concurrency
         alias_method :active_connections, :thread_safe_active_connections
@@ -52,8 +72,9 @@
       end
       
       def active_connection_name #:nodoc:
+        # This must use name, not active_connection_name
         @active_connection_name ||=
-           if active_connections[name] || @@defined_connections[name]
+           if @@write_connection_pool[name] || @@read_connection_pool[name]
              name
            elsif self == ActiveRecord::Base
              nil
@@ -70,13 +91,24 @@
       # Returns the connection currently associated with the class. This can
       # also be used to "borrow" the connection to do database work unrelated
       # to any of the specific Active Records.
-      def connection
-        if @active_connection_name && (conn = [EMAIL PROTECTED])
+      def connection(type = nil)
+        case type
+          when nil, :write
+            # Provides write connection on nil for backwards compatibility.
+            id = round_robin_write_connection.object_id
+          when :read
+            id = round_robin_read_connection.object_id
+          else
+            # If someone passes garbage, raise error.
+            # they could have mispelt "read" and that could bork their 
database.
+            raise ActiveRecordError, "Connection type can only be read or 
write."
+        end
+        if conn = active_connections[id]
           conn
         else
           # retrieve_connection sets the cache key.
-          conn = retrieve_connection
-          [EMAIL PROTECTED] = conn
+          conn = retrieve_connection(id)
+          active_connections[id] = conn
         end
       end
 
@@ -100,57 +132,174 @@
         end
       end
 
+      protected   
+      def round_robin_read_connection
+        if @@read_connection_pool[active_connection_name].nil? || 
@@read_connection_pool[active_connection_name].size == 0
+          raise ConnectionNotEstablished, "No Connection is available"
+        else
+          @@last_read_connection = (@@last_read_connection + 1) % 
@@read_connection_pool[active_connection_name].size
+          
@@read_connection_pool[active_connection_name].to_a[@@last_read_connection][1]
+        end
+      end
+
+      def round_robin_write_connection
+      # See comments for round_robin_read_connection.
+        if @@write_connection_pool[active_connection_name].nil? || 
@@write_connection_pool[active_connection_name].size == 0
+          raise ConnectionNotEstablished, "No Connection is available"
+        else
+          @@last_write_connection] = (@@last_write_connection + 1) % 
@@write_connection_pool[active_connection_name].size
+          
@@write_connection_pool[active_connection_name].to_a[@@last_write_connection][1]
+        end
+      end
+
       private
-        def clear_cache!(cache, thread_id = nil, &block)
-          if cache
-            if @@allow_concurrency
-              thread_id ||= Thread.current.object_id
-              thread_cache, cache = cache, cache[thread_id]
-              return unless cache
-            end
-
-            cache.each(&block) if block_given?
-            cache.clear
+      def clear_cache!(cache, thread_id = nil, &block)
+        if cache
+          if @@allow_concurrency
+            thread_id ||= Thread.current.object_id
+            thread_cache, cache = cache, cache[thread_id]
+            return unless cache
           end
+          cache.each(&block) if block_given?
+          cache.clear
+        end
         ensure
           if thread_cache && @@allow_concurrency
             thread_cache.delete(thread_id)
           end
+      end
+
+      # Remove stale threads from the cache.
+      def remove_stale_cached_threads!(cache, &block)
+        stale = Set.new(cache.keys)
+        
+        Thread.list.each do |thread|
+          stale.delete(thread.object_id) if thread.alive?
         end
 
-        # Remove stale threads from the cache.
-        def remove_stale_cached_threads!(cache, &block)
-          stale = Set.new(cache.keys)
-
-          Thread.list.each do |thread|
-            stale.delete(thread.object_id) if thread.alive?
-          end
-
-          stale.each do |thread_id|
-            clear_cache!(cache, thread_id, &block)
-          end
+        stale.each do |thread_id|
+          clear_cache!(cache, thread_id, &block)
         end
+      end
         
-        def clear_all_cached_connections!
-          if @@allow_concurrency
-            @@active_connections.each_value do |connection_hash_for_thread|
-              connection_hash_for_thread.each_value {|conn| conn.disconnect! }
-              connection_hash_for_thread.clear
-            end
-          else
-            @@active_connections.each_value {|conn| conn.disconnect! }
+      def clear_all_cached_connections!
+        if @@allow_concurrency
+          @@active_connections.each_value do |connection_hash_for_thread|
+            connection_hash_for_thread.each_value {|conn| conn.disconnect! }
+            connection_hash_for_thread.clear
           end
-          @@active_connections.clear          
+        else
+          @@active_connections.each_value {|conn| conn.disconnect! }
         end
+        @@active_connections.clear          
+      end
     end
 
     # Returns the connection currently associated with the class. This can
     # also be used to "borrow" the connection to do database work that isn't
     # easily done without going straight to SQL.
-    def connection
-      self.class.connection
+    def connection(type = nil)
+      self.class.connection(type)
     end
+    
+    # Add specs to various connection pools.
+    def self.append_spec_to_connection_pools spec
+      # The default behavior allows a connection to be used for both reading 
and writing.
+      # Otherwise, we just load both pools.
+        
+      @@write_connection_pool[active_connection_name] ||= {}
+      @@read_connection_pool[active_connection_name] ||= {}
+      @@last_read_connection[active_connection_name] = 0
+      @@last_write_connection[active_connection_name] = 0
 
+      # Check out what were were passed if its not a ConnectionSpec, try to 
make one.
+      if !spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
+        spec = ConnectionSpecification.new(spec)
+      end
+      
+      if !spec.config.key?(:read_only) && !spec.config[:read_only] != 'true'
+        @@write_connection_pool[active_connection_name][spec.object_id] = spec 
if !@@write_connection_pool[active_connection_name][spec.object_id]
+      end
+      if !spec.config.key?(:write_only) && !spec.config[:write_only] != 'true'
+        @@read_connection_pool[active_connection_name][spec.object_id] = spec 
if !@@read_connection_pool[active_connection_name][spec.object_id]
+      end
+        @@defined_connections[spec.object_id] = spec
+    end
+    
+    def self.clear_connection_pools(klass=self)     
+      if @@write_connection_pool[klass.name].kind_of?(Hash)
+        @@write_connection_pool[klass.name].each_value {|conn| 
+          active_connections[conn.object_id].disconnect! if 
active_connections[conn.object_id] }
+        @@write_connection_pool[klass.name].clear
+      end
+      if @@read_connection_pool[klass.name].kind_of?(Hash)
+        @@read_connection_pool[klass.name].each_value {|conn|
+          active_connections[conn.object_id].disconnect! if 
active_connections[conn.object_id] }
+        @@read_connection_pool[klass.name].clear
+      end      
+      @@read_connection_pool.delete(klass.name)
+      @@write_connection_pool.delete(klass.name)
+      clear_all_cached_connections!
+    end
+  
+   
+    # Connection Pool Support.  Stephen Blackstone <[EMAIL PROTECTED]>
+    # Connection pools are defined in your YAML file as following:
+    #
+    # Connections that are in one pool or the other are automagically assigned 
to
+    # the correct pool and will not insert in the other.
+    #
+    #
+    #
+    #development:
+    #  adapter: mysql
+    #  database: project_development
+    #  username: someuser
+    #  password: happyland
+    #  host: 10.0.0.1
+    #  port: 3306
+    #
+    #development2:
+    #  adapter: mysql
+    #  database: project_development
+    #  username: root
+    #  password: somethingelse
+    #  host: 10.0.0.2
+    #  port: 3306
+    #
+    #  development_write_pool: development
+    #  development_read_pool: development,development2
+    #
+    #  You can theoretically  force a connection to go in one pool or another 
but by putting 
+    #  read_only : true or write_only: true
+    #  However with the current scheme, the only way multiple databases are 
loaded at once
+    #  is when it is done as part of a connection pool. 
+    #
+
+    def self.establish_connection_pool(environment)
+        # Connections that are specifically in a pool override the read/write 
settings.
+        @active_connection_name = name
+        if @read = configurations["#{environment}_read_pool"]
+            @read.split(",").each do |connection| 
+            spec = configurations[connection].clone
+            spec[:read_only] = true      
+            append_spec_to_connection_pools(spec)
+          end
+        end
+        if @write = configurations["#{environment}_write_pool"]
+            @write.split(",").each do  |connection|                         
+            spec = configurations[connection].clone
+            spec[:write_only] = true 
+            append_spec_to_connection_pools(spec)
+          end
+        end
+         if (!@@write_connection_pool[name].nil? and 
!@@read_connection_pool[name].nil? and @@write_connection_pool[name].size > 0 
&& @@read_connection_pool[name].size > 0) 
+          return true 
+        else
+         return false
+        end
+    end
+
     # Establishes the connection to the database. Accepts a hash as input where
     # the :adapter key must be specified with the name of a database adapter 
(in lower-case)
     # example for regular databases (MySQL, Postgresql, etc):
@@ -182,11 +331,13 @@
       case spec
         when nil
           raise AdapterNotSpecified unless defined? RAILS_ENV
-          establish_connection(RAILS_ENV)
+          if !establish_connection_pool(RAILS_ENV) 
+            establish_connection(RAILS_ENV)
+          end
         when ConnectionSpecification
-          clear_active_connection_name
+          clear_active_connection_name 
           @active_connection_name = name
-          @@defined_connections[name] = spec
+          append_spec_to_connection_pools(spec)
         when Symbol, String
           if configuration = configurations[spec.to_s]
             establish_connection(configuration)
@@ -194,12 +345,7 @@
             raise AdapterNotSpecified, "#{spec} database is not configured"
           end
         else
-          spec = spec.symbolize_keys
-          unless spec.key?(:adapter) then raise AdapterNotSpecified, "database 
configuration does not specify adapter" end
-          adapter_method = "#{spec[:adapter]}_connection"
-          unless respond_to?(adapter_method) then raise AdapterNotFound, 
"database configuration specifies nonexistent #{spec[:adapter]} adapter" end
-          remove_connection
-          establish_connection(ConnectionSpecification.new(spec, 
adapter_method))
+          establish_connection(ConnectionSpecification.new(spec))
       end
     end
 
@@ -207,27 +353,31 @@
     # active or defined connections: if it is the latter, it will be
     # opened and set as the active connection for the class it was defined
     # for (not necessarily the current class).
-    def self.retrieve_connection #:nodoc:
+    def self.retrieve_connection(id = nil) #:nodoc:
       # Name is nil if establish_connection hasn't been called for
       # some class along the inheritance chain up to AR::Base yet.
-      if name = active_connection_name
-        if conn = active_connections[name]
+        if id.nil?
+          id = round_robin_write_connection.object_id
+        end
+        if conn = active_connections[id]
           # Verify the connection.
           conn.verify!(@@verification_timeout)
-        elsif spec = @@defined_connections[name]
+        elsif spec = @@defined_connections[id]
           # Activate this connection specification.
-          klass = name.constantize
-          klass.connection = spec
-          conn = active_connections[name]
-        end
-      end
-
+          # Note this no longer calls connection= which causes problems
+          # with read/write pools being clobbered.
+          config = spec.config.reverse_merge(:allow_concurrency => 
@@allow_concurrency)
+           active_connections[id] = self.send(spec.adapter_method, config)
+          conn = active_connections[id]
+        end  
       conn or raise ConnectionNotEstablished
     end
 
     # Returns true if a connection that's accessible to this class have 
already been opened.
     def self.connected?
-      active_connections[active_connection_name] ? true : false
+      return false if active_connections[active_connection_name].nil?
+      return false if 
active_connections[active_connection_name][round_robin_write_connection.object_id].nil?
+      return true
     end
 
     # Remove the connection for this class. This will close the active
@@ -235,21 +385,20 @@
     # can be used as argument for establish_connection, for easy
     # re-establishing of the connection.
     def self.remove_connection(klass=self)
-      spec = @@defined_connections[klass.name]
-      konn = active_connections[klass.name]
-      @@defined_connections.delete_if { |key, value| value == spec }
-      active_connections.delete_if { |key, value| value == konn }
-      konn.disconnect! if konn
-      spec.config if spec
+       # Arbitarily choosing the first spec off the write pool to send back.
+       # This is here to stay backwards compatible
+         spec = @@defined_connections[round_robin_write_connection.object_id] 
if !@@defined_connections[round_robin_write_connection.object_id].nil?
+        clear_connection_pools(klass)
+       spec if spec
     end
 
     # Set the connection for the class.
     def self.connection=(spec) #:nodoc:
       if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
-        active_connections[name] = spec
+        active_connections[active_connection_name] = spec
       elsif spec.kind_of?(ConnectionSpecification)
         config = spec.config.reverse_merge(:allow_concurrency => 
@@allow_concurrency)
-        self.connection = self.send(spec.adapter_method, config)
+        establish_connection(spec.adapter_method, config)
       elsif spec.nil?
         raise ConnectionNotEstablished
       else
Index: lib/active_record/base.rb
===================================================================
--- lib/active_record/base.rb   (revision 4619)
+++ lib/active_record/base.rb   (working copy)
@@ -419,7 +419,7 @@
       #   Post.find_by_sql "SELECT p.*, c.author FROM posts p, comments c 
WHERE p.id = c.post_id"
       #   Post.find_by_sql ["SELECT * FROM posts WHERE author = ? AND created 
> ?", author_id, start_date]
       def find_by_sql(sql)
-        connection.select_all(sanitize_sql(sql), "#{name} Load").collect! { 
|record| instantiate(record) }
+        connection(:read).select_all(sanitize_sql(sql), "#{name} 
Load").collect! { |record| instantiate(record) }
       end
 
       # Returns true if the given +id+ represents the primary key of a record 
in the database, false otherwise.
@@ -506,7 +506,7 @@
       #   Product.count_by_sql "SELECT COUNT(*) FROM sales s, customers c 
WHERE s.customer_id = c.id"
       def count_by_sql(sql)
         sql = sanitize_conditions(sql)
-        connection.select_value(sql, "#{name} Count").to_i
+        connection(:read).select_value(sql, "#{name} Count").to_i
       end
 
       # Increments the specified counter by one. So 
<tt>DiscussionBoard.increment_counter("post_count",
@@ -725,7 +725,7 @@
       # Returns an array of column objects for the table associated with this 
class.
       def columns
         unless @columns
-          @columns = connection.columns(table_name, "#{name} Columns")
+          @columns = connection(:read).columns(table_name, "#{name} Columns")
           @columns.each {|column| column.primary = column.name == primary_key}
         end
         @columns
_______________________________________________
Rails-core mailing list
Rails-core@lists.rubyonrails.org
http://lists.rubyonrails.org/mailman/listinfo/rails-core

Reply via email to