Author: tross
Date: Tue Dec  9 14:15:20 2008
New Revision: 724911

URL: http://svn.apache.org/viewvc?rev=724911&view=rev
Log:
Port features and bug-fixes from Python API to Ruby API

Modified:
    incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb

Modified: incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=724911&r1=724910&r2=724911&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (original)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Tue Dec  9 14:15:20 2008
@@ -20,6 +20,7 @@
 
 require 'socket'
 require 'monitor'
+require 'thread'
 require 'uri'
 require 'time'
 
@@ -63,6 +64,9 @@
   end
 
   class BrokerURL
+
+    attr_reader :host, :port, :auth_name, :auth_pass, :auth_mech
+
     def initialize(text)
       uri = URI.parse(text)
 
@@ -163,23 +167,26 @@
         raise ArgumentError, "user_bindings can't be set unless rcv_objects is 
set and a console is provided"
       end
 
-      #raise NotImplementedError, @manage_connections
     end
 
     def to_s
-      "QMF Console Session Manager (brokers connected: [EMAIL PROTECTED])"
+      "QMF Console Session Manager (brokers: [EMAIL PROTECTED])"
+    end
+
+    def managedConnections?
+      return @manage_connections
     end
 
     # Connect to a Qpid broker.  Returns an object of type Broker
     def add_broker(target="amqp://localhost")
-      uri = URI.parse(target)
-      broker = Broker.new(self, uri.host, uri.port, "PLAIN", uri.user, 
uri.password)
+      url = BrokerURL.new(target)
+      broker = Broker.new(self, url.host, url.port, url.auth_mech, 
url.auth_name, url.auth_pass)
       unless broker.connected? || @manage_connections
         raise broker.error
       end
 
       @brokers << broker
-      objects(:broker => broker, :class => "agent")
+      objects(:broker => broker, :class => "agent") unless @manage_connections
       return broker
     end
 
@@ -216,20 +223,32 @@
       end
       @brokers.each do |broker|
         args = { :exchange => "qpid.management",
-          :queue => broker.topicName,
+          :queue => broker.topic_name,
           :binding_key => "console.obj.*.*.#{package_name}.#" }
         broker.amqpSession.exchange_bind(args)
       end
     end
 
-    def bind_class(klass_key)
+    def bind_class(package_name, class_name)
+      unless @user_bindings && @rcv_objects
+        raise "userBindings option not set for Session"
+      end
+      @brokers.each do |broker|
+        args = { :exchange => "qpid.management",
+          :queue => broker.topic_name,
+          :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
+        broker.amqpSession.exchange_bind(args)
+      end
+    end
+
+    def bind_class_key(klass_key)
       unless @user_bindings && @rcv_objects
         raise "userBindings option not set for Session"
       end
       pname, cname, hash = klass_key
       @brokers.each do |broker|
         args = { :exchange => "qpid.management",
-          :queue => broker.topicName,
+          :queue => broker.topic_name,
           :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
         broker.amqpSession.exchange_bind(args)
       end
@@ -290,7 +309,18 @@
         end
         agent_list << agent
       else
-        broker_list.each { |broker| agent_list += broker.agents }
+        if kwargs.include?(:object_id)
+          oid = kwargs[:object_id]
+          broker_list.each { |broker|
+            broker.agents.each { |agent|
+              if oid.broker_bank == agent.broker_bank && oid.agent_bank == 
agent.agent_bank
+                agent_list << agent
+              end
+            }
+          }
+        else
+          broker_list.each { |broker| agent_list += broker.agents }
+        end
       end
 
       cname = nil
@@ -310,7 +340,7 @@
       map = {}
       @select = []
       if kwargs.include?(:object_id)
-        map["_objectId"] = kwargs[:object_id].to_str
+        map["_objectid"] = kwargs[:object_id].to_s
       else
         map["_class"] = cname
         map["_package"] = pname if pname
@@ -477,7 +507,7 @@
         end
         agent = broker.agent(broker_bank, agent_bank)
         timestamp = codec.read_uint64
-        @console.heartbeat(agent, timestamp)
+        @console.heartbeat(agent, timestamp) if agent
       end
     end
 
@@ -516,7 +546,7 @@
       end
 
       object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat)
-      if pname == "org.apache.qpid.broker" && cname == "agent"
+      if pname == "org.apache.qpid.broker" && cname == "agent" && prop
         broker.update_agent(object)
       end
 
@@ -665,6 +695,7 @@
     end
   end
 
+  # A ClassKey uniquely identifies a class from the schema.
   class ClassKey
     attr_reader :package, :klass_name, :hash
 
@@ -1075,6 +1106,67 @@
     end
   end
 
+  class ManagedConnection
+
+    DELAY_MIN = 1
+    DELAY_MAX = 128
+    DELAY_FACTOR = 2
+    include MonitorMixin
+
+    def initialize(broker)
+      super()
+      @broker = broker
+      @cv = new_cond
+      @is_cancelled = false
+    end
+
+    # Main body of the running thread.
+    def start
+      @thread = Thread.new {
+        delay = DELAY_MIN
+        while true
+          begin
+            @broker.try_to_connect
+            synchronize do
+              while [EMAIL PROTECTED] and @broker.connected?
+                @cv.wait
+                Thread.exit if @is_cancelled
+                delay = DELAY_MIN
+              end
+            end
+
+          rescue Qpid::Session::Closed, Qpid::Session::Detached, 
SystemCallError
+            delay *= DELAY_FACTOR if delay < DELAY_MAX
+          end
+
+          synchronize do
+            @cv.wait(delay)
+            Thread.exit if @is_cancelled
+          end
+        end
+      }
+    end
+
+    # Tell this thread to stop running and return.
+    def stop
+      synchronize do
+        @is_cancelled = true
+        @cv.signal
+      end
+    end
+
+    # Notify the thread that the connection was lost.
+    def disconnected
+      synchronize do
+        @cv.signal
+      end
+    end
+
+    def join
+      @thread.join
+    end
+  end
+
   class Broker
 
     SYNC_TIME = 60
@@ -1083,11 +1175,11 @@
 
     attr_accessor :error
 
-    attr_reader :amqp_session_id, :amqp_session, :conn
+    attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, 
:topic_name
 
-    attr_accessor :broker_id, :sync_result, :broker_bank
+    attr_accessor :broker_id, :sync_result
 
-    def initialize(session, host, port, auth_mech, auth_user, auth_pass)
+    def initialize(session, host, port, auth_mech, auth_name, auth_pass)
       super()
 
       # For debugging..
@@ -1096,8 +1188,9 @@
       @session  = session
       @host     = host
       @port     = port
-      @auth_user = auth_user
+      @auth_name = auth_name
       @auth_pass = auth_pass
+      @broker_bank = 1
       @agents   = {}
       @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
       @topic_bound = false
@@ -1108,10 +1201,15 @@
       @reqs_outstanding = 1
       @error     = nil
       @broker_id  = nil
-      @broker_bank = 1
       @is_connected = false
       @conn = nil
-      try_to_connect
+      if @session.managedConnections?
+        @thread = ManagedConnection.new(self)
+        @thread.start
+      else
+        @thread = nil
+        try_to_connect
+      end
     end
 
     def connected?
@@ -1228,6 +1326,10 @@
     end
 
     def shutdown
+      if @thread
+        @thread.stop
+        @thread.join
+      end
       if connected?
         @amqp_session.incoming("rdest").stop
         if @session.console
@@ -1235,20 +1337,16 @@
         end
         @amqp_session.close
         @is_connected = false
-      else
-        raise "Broker already disconnected"
       end
     end
 
-    private
-
     def try_to_connect
       #begin
       @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
       # FIXME: Need sth for Qpid::Util::connect
 
       @conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
-                                   :username => @auth_user,
+                                   :username => @auth_name,
                                    :password => @auth_pass)
       @conn.start
       @reply_name = "reply-%s" % amqp_session_id
@@ -1303,16 +1401,10 @@
       set_header(codec, ?B)
       msg = message(codec.encoded)
       emit(msg)
-      # FIXME: These exceptions are bogus here
-      #rescue socket.error => e
-      #   @error = "Socket Error %s - %s" % [e[0], e[1]]
-      #rescue Closed => e
-      #    @error = "Connect Failed %d - %s" % [e[0], e[1]]
-      #rescue ConnectionFailed => e
-      #    @error = "Connect Failed %d - %s" % [e[0], e[1]]
-      #end
     end
 
+    private
+
     # Check the header of a management message and extract the opcode and
     # class
     def check_header(codec)
@@ -1357,6 +1449,7 @@
       synchronize { @cv.signal if @sync_in_flight }
       @session.handle_error(@error)
       @session.handle_broker_disconnect(self)
+      @thread.disconnected if @thread
     end
   end
 
@@ -1369,6 +1462,10 @@
       @label  = label
     end
 
+    def broker_bank
+      @broker.broker_bank
+    end
+
     def to_s
       "Agent at bank %d.%d (%s)" % [EMAIL PROTECTED], @agent_bank, @label]
     end


Reply via email to