I've been experimenting with the sharding features in Sequel and ran
across a use-case in our system that isn't covered.
Due to the nature of the keys we are using to shard, we do not always
know all possible shard keys in advance. The Sequel#connect call
requires the :servers hash to be complete; there isn't any mechanism
to modify it after calling #connect.
I know it is possible to call Sequel::ConnectionPool#disconnect and
then reconnect with a modified :servers hash, but I am troubled by the
documentation for #disconnect.
> Removes all connection currently available on all servers,
> optionally yielding each connection to the given block. This method
> has the effect of disconnecting from the database, assuming that no
> connections are currently being used.
In a threaded system the call to #disconnect might always orphan some
connections because they are in use by another thread elsewhere. A new
call to #connect would (???) throw a Sequel::DatabaseConnectionError.
If I am wrong about this, I would love to be corrected.
If I am right about this race condition, then I would like to propose
a small modification to the API. Here is a (untested) monkeypatch that
would add this functionality. I put a '+' next to each line containing
new code so it is easy to see (I am not showing a few lines of deleted
code).
If I were to create tests for this and submit as a patch, would this
kind of functionality be accepted? Is there a superior way to add a
new sharding server to the connection pool at runtime that I have
missed?
cr
+class Database
+ def add_server key, value
+ @opts[:server].merge!({key => value})
+ pool.add_server key
+ end
+end
class Sequel::ConnectionPool
def initialize(opts = {}, &block)
@max_size = Integer(opts[:max_connections] || 4)
raise(Sequel::Error, ':max_connections must be positive') if
@max_size < 1
@mutex = Mutex.new
@connection_proc = block
@disconnection_proc = opts[:disconnection_proc]
@available_connections = Hash.new{|h,k| h[:default]}
@allocated = Hash.new{|h,k| h[:default]}
+ @servers = []
+ opts[:servers] ? opts[:servers].keys.each { |key|
add_server(key) } : add_server(:default)
@timeout = Integer(opts[:pool_timeout] || 5)
@sleep_time = Float(opts[:pool_sleep_time] || 0.001)
@convert_exceptions = opts.include?(:pool_convert_exceptions) ?
opts[:pool_convert_exceptions] : true
end
+ def add_server key
+ @servers += key
+ @available_connections[key] = []
+ @allocated[key] = {}
+ end
end
--
You received this message because you are subscribed to the Google Groups
"sequel-talk" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/sequel-talk?hl=en.