Add a new config option direct_addressing that will cause the main runner to request :directed subscriptions from the middleware plugins.
This setting is off by default since the default stomp plugin really isn't going to be able to do really intelligent stuff with this and on big networks it will have big impacts on middleware. The proposed ActiveMQ specific plugin will be able to do this in a way that does not have such a huge impact on the middleware. For Stomp we've added a new config option queueprefix that compliments topicprefix and the plugin will simply subscribe to a queue that has a MD5 of the configured identity. We're using the md5 as thats the safest way to ensure there's no illegal characters in the resulting queue names. The intention with this ability is so that we can address individual nodes directly without broadcasting. The use cases for this are - among others: - Web apps where people will be clicking arbitrary hosts, the library will fall back to direct mode in that case - For calls to M::RPC::Client#custom_request where the list of expected replies are some arbitrary low number like 20. In a big network talking to those hosts directly will have a lower impact than the broadcast mode - Add the potential to do batched requests. With the broadcast mode it's a bit all or nothing unless restrictive filters are generated. By combining the broadcast for discovery with directed requests in groups of n hosts we can slowly go through a network rolling out change in managable batches. - This will enable truely pluggable discovery. External resources like BPM systems can supply a list of hosts they wish to talk to and we can do that easily. - Queued requests for off-line machines None of these abilities are in the libraries now, this is just the enabling architecture. Signed-off-by: R.I.Pienaar <[email protected]> --- Local-branch: feature/master/7225 lib/mcollective/config.rb | 9 ++++++++- lib/mcollective/runner.rb | 1 + plugins/mcollective/connector/stomp.rb | 15 ++++++++++----- website/changelog.md | 1 + website/reference/basic/configuration.md | 1 + 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/lib/mcollective/config.rb b/lib/mcollective/config.rb index 6c8e8e4..c23aea9 100644 --- a/lib/mcollective/config.rb +++ b/lib/mcollective/config.rb @@ -9,7 +9,7 @@ module MCollective :classesfile, :rpcauditprovider, :rpcaudit, :configdir, :rpcauthprovider, :rpcauthorization, :color, :configfile, :rpchelptemplate, :rpclimitmethod, :logger_type, :fact_cache_time, :collectives, :main_collective, :ssl_cipher, - :registration_collective + :registration_collective, :direct_addressing, :queueprefix def initialize @configured = false @@ -44,6 +44,8 @@ module MCollective @main_collective = val when "topicprefix" @topicprefix = val + when "queueprefix" + @queueprefix = val when "logfile" @logfile = val when "keeplogs" @@ -62,6 +64,8 @@ module MCollective end when "identity" @identity = val + when "direct_addressing" + val =~ /^1|y/i ? @direct_addressing = true : @direct_addressing = false when "color" val =~ /^1|y/i ? @color = true : @color = false when "daemonize" @@ -130,6 +134,8 @@ module MCollective @registerinterval = 0 @registration_collective = nil @topicsep = "." + @topicprefix = "/topic/" + @queueprefix = "/queue/" @classesfile = "/var/lib/puppet/classes.txt" @rpcaudit = false @rpcauditprovider = "" @@ -149,6 +155,7 @@ module MCollective @collectives = ["mcollective"] @main_collective = @collectives.first @ssl_cipher = "aes-256-cbc" + @direct_addressing = false end def read_plugin_config_dir(dir) diff --git a/lib/mcollective/runner.rb b/lib/mcollective/runner.rb index 73cb565..b2dd601 100644 --- a/lib/mcollective/runner.rb +++ b/lib/mcollective/runner.rb @@ -45,6 +45,7 @@ module MCollective # Starts the main loop, before calling this you should initialize the MCollective::Config singleton. def run Util.subscribe(Util.make_subscriptions("mcollective", :broadcast)) + Util.subscribe(Util.make_subscriptions("mcollective", :directed)) if @config.direct_addressing # Start the registration plugin if interval isn't 0 begin diff --git a/plugins/mcollective/connector/stomp.rb b/plugins/mcollective/connector/stomp.rb index 323f738..f19e8c2 100644 --- a/plugins/mcollective/connector/stomp.rb +++ b/plugins/mcollective/connector/stomp.rb @@ -227,18 +227,23 @@ module MCollective raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request].include?(type) raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) + prefix = @config.topicprefix + case type when :reply - topicsuffix = :reply + suffix = :reply when :broadcast - topicsuffix = :command + suffix = :command when :request - topicsuffix = :command + suffix = :command when :directed - topicsuffix = :command + prefix = @config.queueprefix + # use a md5 since hostnames might have illegal characters that + # the middleware dont understand + suffix = Digest::MD5.hexdigest(@config.identity) end - ["#{@config.topicprefix}#{collective}", agent, topicsuffix].join(@config.topicsep) + ["#{prefix}#{collective}", agent, suffix].join(@config.topicsep) end end end diff --git a/website/changelog.md b/website/changelog.md index ee68e2e..4059b93 100644 --- a/website/changelog.md +++ b/website/changelog.md @@ -11,6 +11,7 @@ title: Changelog |Date|Description|Ticket| |----|-----------|------| +|2011/06/16|Add the ability for nodes to subscribe to per-node queues, off by default|7225| |2011/06/12|Remove assumptions about middleware structure from the core and move it to the connector plugins|7619| |*2011/06/08*|*Release 1.3.0*|7796| |2011/06/07|Exceptions raised during option parsing were not handled and resulted in stack traces|7796| diff --git a/website/reference/basic/configuration.md b/website/reference/basic/configuration.md index 2775b71..9fb5f3b 100644 --- a/website/reference/basic/configuration.md +++ b/website/reference/basic/configuration.md @@ -62,6 +62,7 @@ The server configuration file should be root only readable |rpclimitmethod|The method used for --limit-results. Can be either _first_ or _random_| |fact_cache_time|300|How long to cache fact results for before refreshing from source| |plugin.discovery.timeout|10|Sets the timeout for the discovery agent, useful if facts are very slow| +|direct_addressing|false|Enable this to listen to a per node directed destination for point to point communication| The last example sets a option for the _discovery_ plugin, you can also set this in _/etc/mcollective/plugin.d/discovery.cfg_, in that case you'd just set _timeout=10_ in the file. -- 1.7.1 -- You received this message because you are subscribed to the Google Groups "Puppet Developers" group. To post to this group, send email to [email protected]. To unsubscribe from this group, send email to [email protected]. For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.
