I've been experimenting with the sharding features in Sequel and ran  
across a use-case in our system that isn't covered.

Due to the nature of the keys we are using to shard, we do not always  
know all possible shard keys in advance. The Sequel#connect call  
requires the :servers hash to be complete; there isn't any mechanism  
to modify it after calling #connect.

I know it is possible to call Sequel::ConnectionPool#disconnect and  
then reconnect with a modified :servers hash, but I am troubled by the  
documentation for #disconnect.

> Removes all connection currently available on all servers,  
> optionally yielding each connection to the given block. This method  
> has the effect of disconnecting from the database, assuming that no  
> connections are currently being used.

In a threaded system the call to #disconnect might always orphan some  
connections because they are in use by another thread elsewhere. A new  
call to #connect would (???) throw a Sequel::DatabaseConnectionError.  
If I am wrong about this, I would love to be corrected.

If I am right about this race condition, then I would like to propose  
a small modification to the API. Here is a (untested) monkeypatch that  
would add this functionality. I put a '+' next to each line containing  
new code so it is easy to see (I am not showing a few lines of deleted  
code).

If I were to create tests for this and submit as a patch, would this  
kind of functionality be accepted? Is there a superior way to add a  
new sharding server to the connection pool at runtime that I have  
missed?

cr


+class Database
+   def add_server key, value
+    @opts[:server].merge!({key => value})
+    pool.add_server key
+  end
+end

class Sequel::ConnectionPool
   def initialize(opts = {}, &block)
     @max_size = Integer(opts[:max_connections] || 4)
     raise(Sequel::Error, ':max_connections must be positive') if  
@max_size < 1
     @mutex = Mutex.new
     @connection_proc = block
     @disconnection_proc = opts[:disconnection_proc]
     @available_connections = Hash.new{|h,k| h[:default]}
     @allocated = Hash.new{|h,k| h[:default]}
+    @servers = []
+    opts[:servers] ? opts[:servers].keys.each { |key|  
add_server(key) } : add_server(:default)
     @timeout = Integer(opts[:pool_timeout] || 5)
     @sleep_time = Float(opts[:pool_sleep_time] || 0.001)
     @convert_exceptions = opts.include?(:pool_convert_exceptions) ?  
opts[:pool_convert_exceptions] : true
   end

+  def add_server key
+    @servers += key
+    @available_connections[key] = []
+    @allocated[key] = {}
+  end
end

--

You received this message because you are subscribed to the Google Groups 
"sequel-talk" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/sequel-talk?hl=en.


Reply via email to