This patch updates dbomatic, taskomatic and host-register to use the new C++ wrapped ruby QMF bindings. It also fixes a couple of bugs along the way including the 0 cpu bug for host-register. This is a compilation of work done by myself and Arjun Roy.
Signed-off-by: Ian Main <[email protected]> --- src/db-omatic/db_omatic.rb | 111 ++++++------- src/host-browser/host-register.rb | 337 ++++++++++++++++++++----------------- src/libvirt-list.rb | 31 +++-- src/matahari-list.rb | 33 +++-- src/task-omatic/task_storage.rb | 10 +- src/task-omatic/taskomatic.rb | 81 +++++---- 6 files changed, 323 insertions(+), 280 deletions(-) diff --git a/src/db-omatic/db_omatic.rb b/src/db-omatic/db_omatic.rb index c400097..686ad71 100755 --- a/src/db-omatic/db_omatic.rb +++ b/src/db-omatic/db_omatic.rb @@ -3,18 +3,18 @@ $: << File.join(File.dirname(__FILE__), "../dutils") $: << File.join(File.dirname(__FILE__), ".") -require "rubygems" -require "qpid" +require 'rubygems' require 'monitor' require 'dutils' require 'daemons' require 'optparse' require 'logger' require 'vnc' +require 'qmf' +require 'socket' include Daemonize - # This sad and pathetic readjustment to ruby logger class is # required to fix the formatting because rails does the same # thing but overrides it to just the message. @@ -29,12 +29,9 @@ end $logfile = '/var/log/ovirt-server/db-omatic.log' - -class DbOmatic < Qpid::Qmf::Console - +class DbOmatic < Qmf::ConsoleHandler # Use monitor mixin for mutual exclusion around checks to heartbeats # and updates to objects/heartbeats. - include MonitorMixin def initialize() @@ -77,7 +74,6 @@ class DbOmatic < Qpid::Qmf::Console begin ensure_credentials - database_connect server, port = nil @@ -91,8 +87,17 @@ class DbOmatic < Qpid::Qmf::Console end @logger.info "Connecting to amqp://#{server}:#{port}" - @session = Qpid::Qmf::Session.new(:console => self, :manage_connections => true) - @broker = @session.add_broker("amqp://#{server}:#{port}", :mechanism => 'GSSAPI') + @settings = Qmf::ConnectionSettings.new + @settings.host = server + @settings.port = port +# @settings.mechanism = 'GSSAPI' +# @settings.service = 'qpidd' + @settings.sendUserId = false + + @connection = Qmf::Connection.new(@settings) + @qmfc = Qmf::Console.new(self) + @broker = @qmfc.add_connection(@connection) + @broker.wait_for_stable db_init_cleanup rescue Exception => ex @@ -101,10 +106,8 @@ class DbOmatic < Qpid::Qmf::Console end end - def ensure_credentials() get_credentials('qpidd') - Thread.new do while true do sleep(3600) @@ -195,7 +198,7 @@ class DbOmatic < Qpid::Qmf::Console if state == Vm::STATE_STOPPED @logger.info "VM has moved to stopped, clearing VM attributes." - qmf_vm = @session.object(:class => "domain", 'uuid' => vm.uuid) + qmf_vm = @qmfc.object(Qmf::Query.new(:class => "domain", 'uuid' => vm.uuid)) if qmf_vm @logger.info "Deleting VM #{vm.description}." result = qmf_vm.undefine @@ -207,9 +210,9 @@ class DbOmatic < Qpid::Qmf::Console # If we are running, update the node that the domain is running on elsif state == Vm::STATE_RUNNING @logger.info "VM is running, determine the node it is running on" - qmf_vm = @session.object(:class => "domain", 'uuid' => vm.uuid) + qmf_vm = @qmfc.object(Qmf::Query.new(:class => "domain", 'uuid' => vm.uuid)) if qmf_vm - qmf_host = @session.object(:class => "node", :object_id => qmf_vm.node) + qmf_host = @qmfc.object(Qmf::Query.new(:class => "node", :object_id => qmf_vm.node)) db_host = Host.find(:first, :conditions => ['hostname = ?', qmf_host.hostname]) @logger.info "VM #{vm.description} is running on node #{db_host.hostname}" vm.host_id = db_host.id @@ -273,7 +276,7 @@ class DbOmatic < Qpid::Qmf::Console # Double check to make sure this host is still up. begin - qmf_host = @session.object(:class => 'node', 'hostname' => host_info['hostname']) + qmf_host = @qmfc.objects(Qmf::Query.new(:class => "node", 'hostname' => host_info['hostname'])) if !qmf_host @logger.info "Host #{host_info['hostname']} is not up after waiting 20 seconds, skipping dead VM check." else @@ -301,16 +304,23 @@ class DbOmatic < Qpid::Qmf::Console end end - def object_props(broker, obj) - target = obj.schema.klass_key.package + def object_update(obj, hasProps, hasStats) + target = obj.object_class.package_name + type = obj.object_class.class_name return if target != "com.redhat.libvirt" - type = obj.schema.klass_key.klass_name + if hasProps + update_props(obj, type) + end + if hasStats + update_stats(obj, type) + end + end + def update_props(obj, type) # I just sync this whole thing because there shouldn't be a lot of contention here.. synchronize do values = @cached_objects[obj.object_id.to_s] - new_object = false if values == nil @@ -318,8 +328,7 @@ class DbOmatic < Qpid::Qmf::Console # Save the agent and broker bank so that we can tell what objects # are expired when the heartbeat for them stops. - values[:broker_bank] = obj.object_id.broker_bank - values[:agent_bank] = obj.object_id.agent_bank + values[:agent_key] = obj.object_id.agent_key values[:obj_key] = obj.object_id.to_s values[:class_type] = type values[:timed_out] = false @@ -370,53 +379,48 @@ class DbOmatic < Qpid::Qmf::Console end end - def object_stats(broker, obj) - target = obj.schema.klass_key.package - return if target != "com.redhat.libvirt" - type = obj.schema.klass_key.klass_name - + def update_stats(obj, type) synchronize do values = @cached_objects[obj.object_id.to_s] - if !values + if values == nil values = {} @cached_objects[obj.object_id.to_s] = values - - values[:broker_bank] = obj.object_id.broker_bank - values[:agent_bank] = obj.object_id.agent_bank + values[:agent_key] = obj.object_id.agent_key values[:class_type] = type values[:timed_out] = false values[:synced] = false end + obj.statistics.each do |key, newval| if values[key.to_s] != newval values[key.to_s] = newval - #puts "new value for statistic #{key} : #{newval}" end end end end - def heartbeat(agent, timestamp) - puts "heartbeat from agent #{agent}" + def agent_heartbeat(agent, timestamp) + puts "heartbeat from agent #{agent.key}" return if agent == nil synchronize do - bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}" - @heartbeats[bank_key] = [agent, timestamp] + @heartbeats[agent.key] = [agent, timestamp] end end + def agent_added(agent) + @logger.info("Agent connected: #{agent.key}") + end - def del_agent(agent) + def agent_deleted(agent) agent_disconnected(agent) end # This method marks objects associated with the given agent as timed out/invalid. Called either # when the agent heartbeats out, or we get a del_agent callback. def agent_disconnected(agent) + puts "agent_disconnected: #{agent.key}" @cached_objects.keys.each do |objkey| - if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and - @cached_objects[objkey][:agent_bank] == agent.agent_bank - + if @cached_objects[objkey][:agent_key] == agent.key values = @cached_objects[objkey] if values[:timed_out] == false @logger.info "Marking object of type #{values[:class_type]} with key #{objkey} as timed out." @@ -430,8 +434,7 @@ class DbOmatic < Qpid::Qmf::Console values[:timed_out] = true end end - bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}" - @heartbeats.delete(bank_key) + @heartbeats.delete(agent.key) end # The opposite of above, this is called when an agent is alive and well and makes sure @@ -439,9 +442,7 @@ class DbOmatic < Qpid::Qmf::Console def agent_connected(agent) @cached_objects.keys.each do |objkey| - if @cached_objects[objkey][:broker_bank] == agent.broker.broker_bank and - @cached_objects[objkey][:agent_bank] == agent.agent_bank - + if @cached_objects[objkey][:agent_key] == agent.key values = @cached_objects[objkey] if values[:timed_out] == true or values[:synced] == false if values[:class_type] == 'node' @@ -482,7 +483,7 @@ class DbOmatic < Qpid::Qmf::Console # them to stopped. VMs that exist as QMF objects will get set appropriately when the objects # appear on the bus. begin - qmf_vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid) + qmf_vm = @qmfc.object(Qmf::Query.new(:class => "domain", 'uuid' => db_vm.uuid)) if qmf_vm == nil set_stopped = true end @@ -498,15 +499,6 @@ class DbOmatic < Qpid::Qmf::Console end end - def broker_connected(broker) - @logger.info "Connected to broker." - end - - def broker_disconnected(broker) - @logger.error "Broker disconnected." - end - - # This is the mainloop that is called into as a separate thread. This just loops through # and makes sure all the agents are still reporting. If they aren't they get marked as # down. @@ -527,7 +519,7 @@ class DbOmatic < Qpid::Qmf::Console s = timestamp / 1000000000 delta = t - s - puts "Checking time delta for agent #{agent} - #{delta}" + puts "Checking time delta for agent #{agent.key} - #{delta}" if delta > 30 # No heartbeat for 30 seconds.. deal with dead/disconnected agent. @@ -545,15 +537,10 @@ class DbOmatic < Qpid::Qmf::Console end end - def main() - + Thread.abort_on_exception = true dbsync = DbOmatic.new() - - # Call into mainloop.. dbsync.check_heartbeats() - end main() - diff --git a/src/host-browser/host-register.rb b/src/host-browser/host-register.rb index 06d8553..e57b077 100755 --- a/src/host-browser/host-register.rb +++ b/src/host-browser/host-register.rb @@ -4,12 +4,13 @@ $: << File.join(File.dirname(__FILE__), "../dutils") $: << File.join(File.dirname(__FILE__), ".") require 'rubygems' -require 'qpid' require 'monitor' require 'dutils' require 'daemons' require 'optparse' require 'logger' +require 'qmf' +require 'socket' include Daemonize @@ -27,13 +28,17 @@ end $logfile = '/var/log/ovirt-server/host-register.log' -class HostRegister < Qpid::Qmf::Console +class HostRegister < Qmf::ConsoleHandler # Use monitor mixin for mutual exclusion around checks to heartbeats # and updates to objects/heartbeats. include MonitorMixin + # def initialize: Takes no parameters + # On initialize, we get a connection to the database. + # We then query the name and address of the qpidd server + # using dnsmasq records, and connect to qpidd. def initialize() super() @cached_hosts = {} @@ -78,7 +83,6 @@ class HostRegister < Qpid::Qmf::Console begin ensure_credentials - database_connect server, port = nil @@ -92,8 +96,17 @@ class HostRegister < Qpid::Qmf::Console end @logger.info "Connecting to amqp://#{server}:#{port}" - @session = Qpid::Qmf::Session.new(:console => self, :manage_connections => true) - @broker = @session.add_broker("amqp://#{server}:#{port}", :mechanism => 'GSSAPI') + @settings = Qmf::ConnectionSettings.new + @settings.host = server + @settings.port = port + # @settings.mechanism = 'GSSAPI' + # @settings.service = 'qpidd' + @settings.sendUserId = false + + @connection = Qmf::Connection.new(@settings) + @qmfc = Qmf::Console.new(self) + @broker = @qmfc.add_connection(@connection) + @broker.wait_for_stable rescue Exception => ex @logger.error "Error in hostregister: #{ex}" @@ -101,6 +114,7 @@ class HostRegister < Qpid::Qmf::Console end end + ###### Utility Methods ###### def debugputs(msg) puts msg if @debug == true and @do_daemon == false end @@ -116,20 +130,66 @@ class HostRegister < Qpid::Qmf::Console end end - def broker_connected(broker) - @logger.info 'Connected to broker.' + ###### QMF Callbacks ###### + def agent_heartbeat(agent, timestamp) + return if agent == nil + synchronize do + bank_key = "#{agent.agent_bank}.#{agent.broker_bank}" + @heartbeats[bank_key] = [agent, timestamp] + end end - def broker_disconnected(broker) - @logger.error 'Broker disconnected.' + def agent_added(agent) + agent_bank = agent.agent_bank + broker_bank = agent.broker_bank + key = "#{agent_bank}.#{broker_bank}" + + puts "AGENT ADDED: #{key}" + debugputs "Agent #{agent_bank}.#{broker_bank} connected!" + agent_connected(agent_bank, broker_bank) + + host_list = @qmfc.objects(:package => 'com.redhat.matahari', :class => 'host') + puts "host_list length is #{host_list.length}" + host_list.each do |host| + if host.object_id.agent_bank == agent_bank + # Grab the cpus and nics associated before we take any locks + cpu_info = @qmfc.objects(:package => 'com.redhat.matahari', :class => 'cpu', 'host' => host.object_id) + nic_info = @qmfc.objects(:package => 'com.redhat.matahari', :class => 'nic', 'host' => host.object_id) + + # And pass it on to the real handler + update_host(host, cpu_info, nic_info) + end + end end - def agent_disconnected(agent) + def agent_deleted(agent) + agent_bank = agent.agent_bank + broker_bank = agent.broker_bank + key = "#{agent_bank}.#{broker_bank}" + + debugputs "Agent #{key} disconnected!" + @heartbeats.delete(key) + agent_disconnected(agent_bank, broker_bank) + end + + def object_update(obj, hasProps, hasStats) + target = obj.object_class.package_name + type = obj.object_class.class_name + return if target != 'com.redhat.matahari' or type != 'host' or hasProps == false + + # Fix a race where the properties of an object are published by a reconnecting + # host (thus marking it active) right before the heartbeat timer considers it dead + # (and marks it inactive) + @heartbeats.delete("#{obj.object_id.agent_bank}.#{obj.object_id.broker_bank}") + end # def object_props + + ###### Handlers for QMF Callbacks ###### + def agent_disconnected(agent_bank, broker_bank) synchronize do - debugputs "Marking objects for agent #{agent.broker.broker_bank}.#{agent.agent_bank} inactive" + debugputs "Marking objects for agent #{broker_bank}.#{agent_bank} inactive" @cached_hosts.keys.each do |objkey| - if @cached_hosts[objkey][:broker_bank] == agent.broker.broker_bank and - @cached_hosts[objkey][:agent_bank] == agent.agent_bank + if @cached_hosts[objkey][:broker_bank] == broker_bank and + @cached_hosts[objkey][:agent_bank] == agent_bank cached_host = @cached_hosts[objkey] cached_host[:active] = false @@ -139,12 +199,12 @@ class HostRegister < Qpid::Qmf::Console end # synchronize do end - def agent_connected(agent) + def agent_connected(agent_bank, broker_bank) synchronize do - debugputs "Marking objects for agent #{agent.broker.broker_bank}.#{agent.agent_bank} active" + debugputs "Marking objects for agent #{broker_bank}.#{agent_bank} active" @cached_hosts.keys.each do |objkey| - if @cached_hosts[objkey][:broker_bank] == agent.broker.broker_bank and - @cached_hosts[objkey][:agent_bank] == agent.agent_bank + if @cached_hosts[objkey][:broker_bank] == broker_bank and + @cached_hosts[objkey][:agent_bank] == agent_bank cached_host = @cached_hosts[objkey] cached_host[:active] = true @@ -154,123 +214,10 @@ class HostRegister < Qpid::Qmf::Console end # synchronize do end - def update_cpus(host_qmf, host_db, cpu_info) - - @logger.info "Updating CPU info for host #{host_qmf.hostname}" - debugputs "Broker reports #{cpu_info.length} cpus for host #{host_qmf.hostname}" - - # delete an existing CPUs and create new ones based on the data - @logger.info "Deleting any existing CPUs for host #{host_qmf.hostname}" - Cpu.delete_all(['host_id = ?', host_db.id]) - - @logger.info "Saving new CPU records for host #{host_qmf.hostname}" - cpu_info.each do |cpu| - flags = (cpu.flags.length > 255) ? "#{cpu.flags[0..251]}..." : cpu.flags - detail = Cpu.new( - 'cpu_number' => cpu.cpunum, - 'core_number' => cpu.corenum, - 'number_of_cores' => cpu.numcores, - 'vendor' => cpu.vendor, - 'model' => cpu.model.to_s, - 'family' => cpu.family.to_s, - 'cpuid_level' => cpu.cpuid_lvl, - 'speed' => cpu.speed.to_s, - 'cache' => cpu.cache.to_s, - 'flags' => flags) - - host_db.cpus << detail - - debugputs "Added new CPU for #{host_qmf.hostname}: " - debugputs "CPU # : #{cpu.cpunum}" - debugputs "Core # : #{cpu.corenum}" - debugputs "Total Cores : #{cpu.numcores}" - debugputs "Vendor : #{cpu.vendor}" - debugputs "Model : #{cpu.model}" - debugputs "Family : #{cpu.family}" - debugputs "Cpuid_lvl : #{cpu.cpuid_lvl}" - debugputs "Speed : #{cpu.speed}" - debugputs "Cache : #{cpu.cache}" - debugputs "Flags : #{flags}" - end - - @logger.info "Saved #{cpu_info.length} cpus for #{host_qmf.hostname}" - end - - def update_nics(host_qmf, host_db, nic_info) - - # Update the NIC details for this host: - # -if the NIC exists, then update the IP address - # -if the NIC does not exist, create it - # -any nic not in this list is deleted - - @logger.info "Updating NIC records for host #{host_qmf.hostname}" - debugputs "Broker reports #{nic_info.length} NICs for host" - - nics = Array.new - nics_to_delete = Array.new - - host_db.nics.each do |nic| - found = false - - nic_info.each do |detail| - # if we have a match, then update the database and remove - # the received data to avoid creating a dupe later - @logger.info "Searching for existing record for: #{detail.macaddr.upcase} in host #{host_qmf.hostname}" - if detail.macaddr.upcase == nic.mac - @logger.info "Updating details for: #{detail.interface} [#{nic.mac}]}" - nic.bandwidth = detail.bandwidth - nic.interface_name = detail.interface - nic.save! - found = true - nic_info.delete(detail) - end - end - - # if the record wasn't found, then remove it from the database - unless found - @logger.info "Marking NIC for removal: #{nic.interface_name} [#{nic.mac}]" - nics_to_delete << nic - end - end - - debugputs "Deleting #{nics_to_delete.length} NICs that are no longer part of host #{host_qmf.hostname}" - nics_to_delete.each do |nic| - @logger.info "Removing NIC: #{nic.interface_name} [#{nic.mac}]" - host_db.nics.delete(nic) - end - - # iterate over any nics left and create new records for them. - debugputs "Adding new records for #{nic_info.length} NICs to host #{host_qmf.hostname}" - nic_info.each do |nic| - detail = Nic.new( - 'mac' => nic.macaddr.upcase, - 'bandwidth' => nic.bandwidth, - 'interface_name' => nic.interface, - 'usage_type' => 1) - - host_db.nics << detail - - @logger.info "Added NIC #{nic.interface} with MAC #{nic.macaddr} to host #{host_qmf.hostname}" - end - end - - def object_props(broker, obj) - target = obj.schema.klass_key.package - type = obj.schema.klass_key.klass_name - return if target != 'com.redhat.matahari' or type != 'host' - - # Fix a race where the properties of an object are published by a reconnecting - # host (thus marking it active) right before the heartbeat timer considers it dead - # (and marks it inactive) - @heartbeats.delete("#{obj.object_id.agent_bank}.#{obj.object_id.broker_bank}") - + def update_host(obj, cpu_info, nic_info) already_cache = false already_in_db = false - # Grab the cpus and nics associated before we take any locks - cpu_info = @session.objects(:class => 'cpu', 'host' => obj.object_id) - nic_info = @session.objects(:class => 'nic', 'host' => obj.object_id) - synchronize do cached_host = @cached_hosts[obj.object_id.to_s] host = Host.find(:first, :conditions => ['hostname = ?', obj.hostname]) @@ -318,7 +265,7 @@ class HostRegister < Qpid::Qmf::Console 'memory' => obj.memory, 'is_disabled' => 0, 'hardware_pool' => HardwarePool.get_default_pool, - # Let host-status mark it available when it + # Let db-omatic mark it available when it # successfully connects to it via libvirt. 'state' => Host::STATE_UNAVAILABLE) @@ -330,10 +277,11 @@ class HostRegister < Qpid::Qmf::Console debugputs "memory: #{obj.memory}" rescue Exception => error - @logger.error "Error while creating record: #{error.message}" - # We haven't added the host to the db, and it isn't cached, so we just - # return without having done anything. To retry, the host will have to - # restart its agent. + @logger.error "Error when creating record: #{error.message}" + @logger.error "Restart matahari on host #{obj.hostname}" + # We haven't added the host to the db, and it isn't cached, + # so we just return without having done anything. To retry, + # the host will have to restart its agent. return end else @@ -394,27 +342,106 @@ class HostRegister < Qpid::Qmf::Console cached_host['hypervisor'] = obj.hypervisor cached_host['arch'] = obj.arch end # synchronize do - end # def object_props + end # end update_host - def heartbeat(agent, timestamp) - return if agent == nil - synchronize do - bank_key = "#{agent.agent_bank}.#{agent.broker.broker_bank}" - @heartbeats[bank_key] = [agent, timestamp] + def update_cpus(host_qmf, host_db, cpu_info) + + @logger.info "Updating CPU info for host #{host_qmf.hostname}" + debugputs "Broker reports #{cpu_info.length} cpus for host #{host_qmf.hostname}" + + # delete an existing CPUs and create new ones based on the data + @logger.info "Deleting any existing CPUs for host #{host_qmf.hostname}" + Cpu.delete_all(['host_id = ?', host_db.id]) + + @logger.info "Saving new CPU records for host #{host_qmf.hostname}" + cpu_info.each do |cpu| + flags = (cpu.flags.length > 255) ? "#{cpu.flags[0..251]}..." : cpu.flags + detail = Cpu.new( + 'cpu_number' => cpu.cpunum, + 'core_number' => cpu.corenum, + 'number_of_cores' => cpu.numcores, + 'vendor' => cpu.vendor, + 'model' => cpu.model.to_s, + 'family' => cpu.family.to_s, + 'cpuid_level' => cpu.cpuid_lvl, + 'speed' => cpu.speed.to_s, + 'cache' => cpu.cache.to_s, + 'flags' => flags) + + host_db.cpus << detail + + debugputs "Added new CPU for #{host_qmf.hostname}: " + debugputs "CPU # : #{cpu.cpunum}" + debugputs "Core # : #{cpu.corenum}" + debugputs "Total Cores : #{cpu.numcores}" + debugputs "Vendor : #{cpu.vendor}" + debugputs "Model : #{cpu.model}" + debugputs "Family : #{cpu.family}" + debugputs "Cpuid_lvl : #{cpu.cpuid_lvl}" + debugputs "Speed : #{cpu.speed}" + debugputs "Cache : #{cpu.cache}" + debugputs "Flags : #{flags}" end - end - def new_agent(agent) - key = "#{agent.agent_bank}.#{agent.broker.broker_bank}" - debugputs "Agent #{key} connected!" - agent_connected(agent) + @logger.info "Saved #{cpu_info.length} cpus for #{host_qmf.hostname}" end - def del_agent(agent) - key = "#{agent.agent_bank}.#{agent.broker.broker_bank}" - debugputs "Agent #{key} disconnected!" - @heartbeats.delete(key) - agent_disconnected(agent) + def update_nics(host_qmf, host_db, nic_info) + + # Update the NIC details for this host: + # -if the NIC exists, then update the IP address + # -if the NIC does not exist, create it + # -any nic not in this list is deleted + + @logger.info "Updating NIC records for host #{host_qmf.hostname}" + debugputs "Broker reports #{nic_info.length} NICs for host" + + nics = Array.new + nics_to_delete = Array.new + + host_db.nics.each do |nic| + found = false + + nic_info.each do |detail| + # if we have a match, then update the database and remove + # the received data to avoid creating a dupe later + @logger.info "Searching for existing record for: #{detail.macaddr.upcase} in host #{host_qmf.hostname}" + if detail.macaddr.upcase == nic.mac + @logger.info "Updating details for: #{detail.interface} [#{nic.mac}]}" + nic.bandwidth = detail.bandwidth + nic.interface_name = detail.interface + nic.save! + found = true + nic_info.delete(detail) + end + end + + # if the record wasn't found, then remove it from the database + unless found + @logger.info "Marking NIC for removal: #{nic.interface_name} [#{nic.mac}]" + nics_to_delete << nic + end + end + + debugputs "Deleting #{nics_to_delete.length} NICs that are no longer part of host #{host_qmf.hostname}" + nics_to_delete.each do |nic| + @logger.info "Removing NIC: #{nic.interface_name} [#{nic.mac}]" + host_db.nics.delete(nic) + end + + # iterate over any nics left and create new records for them. + debugputs "Adding new records for #{nic_info.length} NICs to host #{host_qmf.hostname}" + nic_info.each do |nic| + detail = Nic.new( + 'mac' => nic.macaddr.upcase, + 'bandwidth' => nic.bandwidth, + 'interface_name' => nic.interface, + 'usage_type' => 1) + + host_db.nics << detail + + @logger.info "Added NIC #{nic.interface} with MAC #{nic.macaddr} to host #{host_qmf.hostname}" + end end def check_heartbeats() @@ -436,7 +463,10 @@ class HostRegister < Qpid::Qmf::Console # No heartbeat for 30 seconds.. deal with dead/disconnected agent. debugputs "Agent #{key} timed out!" @heartbeats.delete(key) - agent_disconnected(agent) + + agent_bank = agent.agent_bank + broker_bank = agent.broker_bank + agent_disconnected(agent_bank, broker_bank) end end @@ -461,6 +491,7 @@ class HostRegister < Qpid::Qmf::Console end # Class HostRegister def main() + Thread.abort_on_exception = true hostreg = HostRegister.new() hostreg.check_heartbeats() end diff --git a/src/libvirt-list.rb b/src/libvirt-list.rb index 54e8b7e..c81926a 100755 --- a/src/libvirt-list.rb +++ b/src/libvirt-list.rb @@ -2,21 +2,30 @@ $: << File.join(File.dirname(__FILE__), "./dutils") -require "rubygems" -require "qpid" -require "dutils" +require 'rubygems' +require 'dutils' +require 'qmf' +require 'socket' get_credentials('qpidd') server, port = get_srv('qpidd', 'tcp') raise "Unable to determine qpid server from DNS SRV record" if not server -srv = "amqp://#{server}:#{port}" -puts "Connecting to #{srv}.." -s = Qpid::Qmf::Session.new() -b = s.add_broker(srv, :mechanism => 'GSSAPI') +puts "Connecting to #{server}, #{port}" -nodes = s.objects(:class => "node") +settings = Qmf::ConnectionSettings.new +settings.host = server +settings.port = port +# settings.mechanism = 'GSSAPI' +# settings.service = 'qpidd' + +connection = Qmf::Connection.new(settings) +qmfc = Qmf::Console.new +broker = qmfc.add_connection(connection) +broker.wait_for_stable + +nodes = qmfc.objects(Qmf::Query.new(:class => "node")) nodes.each do |node| puts "node: #{node.hostname}" for (key, val) in node.properties @@ -24,7 +33,7 @@ nodes.each do |node| end # Find any domains that on the current node. - domains = s.objects(:class => "domain", 'node' => node.object_id) + domains = qmfc.objects(Qmf::Query.new(:class => "domain", 'node' => node.object_id)) domains.each do |domain| r = domain.getXMLDesc() puts "getXMLDesc() status: #{r.status}" @@ -39,7 +48,7 @@ nodes.each do |node| end end - pools = s.objects(:class => "pool", 'node' => node.object_id) + pools = qmfc.objects(Qmf::Query.new(:class => "pool", 'node' => node.object_id)) pools.each do |pool| puts " pool: #{pool.name}" for (key, val) in pool.properties @@ -54,7 +63,7 @@ nodes.each do |node| end # Find volumes that are part of the pool. - volumes = s.objects(:class => "volume", 'pool' => pool.object_id) + volumes = qmfc.objects(Qmf::Query.new(:class => "volume", 'pool' => pool.object_id)) volumes.each do |volume| puts " volume: #{volume.name}" for (key, val) in volume.properties diff --git a/src/matahari-list.rb b/src/matahari-list.rb index ff714c5..8795019 100755 --- a/src/matahari-list.rb +++ b/src/matahari-list.rb @@ -2,21 +2,30 @@ $: << File.join(File.dirname(__FILE__), "./dutils") -require "rubygems" -require "qpid" -require "dutils" +require 'rubygems' +require 'dutils' +require 'qmf' +require 'socket' get_credentials('qpidd') server, port = get_srv('qpidd', 'tcp') raise "Unable to determine qpid server from DNS SRV record" if not server -srv = "amqp://#{server}:#{port}" -puts "Connecting to #{srv}.." -s = Qpid::Qmf::Session.new() -b = s.add_broker(srv, :mechanism => 'GSSAPI') +puts "Connecting to #{server}, #{port}" -hosts = s.objects(:class => "host") +settings = Qmf::ConnectionSettings.new +settings.host = server +settings.port = port +# settings.mechanism = 'GSSAPI' +# settings.service = 'qpidd' + +connection = Qmf::Connection.new(settings) +qmfc = Qmf::Console.new +broker = qmfc.add_connection(connection) +broker.wait_for_stable + +hosts = qmfc.objects(Qmf::Query.new(:class => 'host')) hosts.each do |host| puts "HOST: #{host.hostname}" for (key, val) in host.properties @@ -24,18 +33,18 @@ hosts.each do |host| end # List cpus for current host - cpus = s.objects(:class => "cpu", 'host' => host.object_id) + cpus = qmfc.objects(Qmf::Query.new(:class => 'cpu', 'host' => host.object_id)) cpus.each do |cpu| - puts " CPU:" + puts ' CPU:' for (key, val) in cpu.properties puts " property: #{key}, #{val}" end end # cpus.each # List nics for current host - nics = s.objects(:class => "nic", 'host' => host.object_id) + nics = qmfc.objects(Qmf::Query.new(:class => 'nic', 'host' => host.object_id)) nics.each do |nic| - puts " NIC: " + puts ' NIC: ' for (key, val) in nic.properties puts " property: #{key}, #{val}" end diff --git a/src/task-omatic/task_storage.rb b/src/task-omatic/task_storage.rb index 77b0166..d698777 100644 --- a/src/task-omatic/task_storage.rb +++ b/src/task-omatic/task_storage.rb @@ -73,7 +73,7 @@ def task_storage_cobbler_setup(db_vm) unless found # Create a new transient NFS storage volume # This volume is *not* persisted. - image_volume = StorageVolume.factory("NFS", :filename => filename) + image_volume = StorageVolume.factory("NFS", :filename => filename, :key => filename) image_volume.storage_pool image_pool = StoragePool.factory(StoragePool::NFS) @@ -116,13 +116,14 @@ class LibvirtPool @xml.root.elements["target"].add_element("path") end - def connect(session, node) - pools = session.objects(:class => 'pool', 'node' => node.object_id) + def connect(qmfc, node) + pools = qmfc.objects(:class => 'pool', 'node' => node.object_id) pools.each do |pool| result = pool.getXMLDesc raise "Error getting xml description of pool: #{result.text}" unless result.status == 0 xml_desc = result.description + if self.xmlequal?(Document.new(xml_desc).root) @remote_pool = pool @logger.debug("Found existing storage pool #{pool.name} on host: #{node.hostname}") @@ -134,7 +135,8 @@ class LibvirtPool @logger.debug("Defining new storage pool: #[email protected]_s} on host: #{node.hostname}") result = node.storagePoolDefineXML(@xml.to_s, :timeout => 60 * 10) raise "Error creating pool: #{result.text}" unless result.status == 0 - @remote_pool = session.object(:object_id => result.pool) + @remote_pool = qmfc.object(:object_id => result.pool) + obj_list = qmfc.objects(:object_id => result.pool) raise "Error finding newly created remote pool." unless @remote_pool # we need this because we don't want to "build" LVM pools, which would diff --git a/src/task-omatic/taskomatic.rb b/src/task-omatic/taskomatic.rb index ece60dc..13cf5af 100755 --- a/src/task-omatic/taskomatic.rb +++ b/src/task-omatic/taskomatic.rb @@ -23,7 +23,7 @@ $: << File.join(File.dirname(__FILE__), "../dutils") $: << File.join(File.dirname(__FILE__), ".") require 'rubygems' -require 'qpid' +require 'qmf' require 'monitor' require 'dutils' require 'optparse' @@ -115,10 +115,15 @@ class TaskOmatic sleepy *= 2 if sleepy < 120 end - @session = Qpid::Qmf::Session.new(:manage_connections => true) - @logger.info "Connecting to amqp://#{server}:#{port}" - @broker = @session.add_broker("amqp://#{server}:#{port}", :mechanism => 'GSSAPI') + settings = Qmf::ConnectionSettings.new + settings.host = server + settings.port = port + settings.sendUserId = false + @connection = Qmf::Connection.new(settings) + @qmfc = Qmf::Console.new + @broker = @qmfc.add_connection(@connection) + @broker.wait_for_stable end def ensure_credentials() @@ -141,13 +146,13 @@ class TaskOmatic # vm won't be returned. I think that's supposed to be for migration # but it could break creation of VMs in certain conditions.. - vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid) + vm = @qmfc.object(:class => "domain", 'uuid' => db_vm.uuid) db_vm.vm_resource_pool.get_hardware_pool.hosts.each do |curr| # Now each of 'curr' is in the right hardware pool.. # now we check them out. - node = @session.object(:class => "node", 'hostname' => curr.hostname) + node = @qmfc.object(:class => "node", 'hostname' => curr.hostname) next unless node # So now we expect if the node was found it's alive and well, then @@ -205,12 +210,12 @@ class TaskOmatic # activate the underlying physical device, and then do the logical one if db_volume[:type] == "LvmStorageVolume" phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume, @logger) - phys_libvirt_pool.connect(@session, node) + phys_libvirt_pool.connect(@qmfc, node) end @logger.debug "Verifying mount of pool #{db_pool.ip_addr}:#{db_pool.type}:#{db_pool.target}:#{db_pool.export_path}" libvirt_pool = LibvirtPool.factory(db_pool, @logger) - libvirt_pool.connect(@session, node) + libvirt_pool.connect(@qmfc, node) # OK, the pool should be all set. The last thing we need to do is get # the path based on the volume key @@ -220,12 +225,12 @@ class TaskOmatic @logger.debug "Pool mounted: #{pool.name}; state: #{pool.state}" - volume = @session.object(:class => 'volume', + volume = @qmfc.object(:class => 'volume', 'key' => volume_key, 'storagePool' => pool.object_id) if volume == nil @logger.info "Unable to find volume by key #{volume_key} attached to pool #{pool.name}, trying by filename..." - volume = @session.object(:class => 'volume', + volume = @qmfc.object(:class => 'volume', 'name' => db_volume.filename, 'storagePool' => pool.object_id) raise "Unable to find volume by key (#{volume_key}) or filename (#{db_volume.filename}), giving up." unless volume @@ -254,11 +259,11 @@ class TaskOmatic # This is rather silly because we only destroy pools if there are no # more vms on the node. We should be reference counting the pools # somehow so we know when they are no longer in use. - vms = @session.objects(:class => 'domain', 'node' => node.object_id) + vms = @qmfc.objects(:class => 'domain', 'node' => node.object_id) if vms.length > 0 return end - pools = @session.objects(:class => 'pool', 'node' => node.object_id) + pools = @qmfc.objects(:class => 'pool', 'node' => node.object_id) # We do this in two passes, first undefine/destroys LVM pools, then # we do physical pools. @@ -281,13 +286,13 @@ class TaskOmatic def task_shutdown_or_destroy_vm(task, action) @logger.info "starting task_shutdown_or_destroy_vm" db_vm = task.vm - vm = @session.object(:class => 'domain', 'uuid' => db_vm.uuid) + vm = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid) if !vm @logger.error "VM already shut down?" return end - node = @session.object(:object_id => vm.node) + node = @qmfc.object(:object_id => vm.node) raise "Unable to get node that vm is on??" unless node if vm.state == "shutdown" or vm.state == "shutoff" @@ -337,7 +342,7 @@ class TaskOmatic @logger.info "starting task_start_vm" db_vm = find_vm(task, false) - vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid) + vm = @qmfc.object(:class => "domain", 'uuid' => db_vm.uuid) if vm case vm.state @@ -351,7 +356,7 @@ class TaskOmatic end db_host = find_capable_host(db_vm) - node = @session.object(:class => "node", 'hostname' => db_host.hostname) + node = @qmfc.object(:class => "node", 'hostname' => db_host.hostname) raise "Unable to find host #{db_host.hostname} to create VM on." unless node @logger.info("VM will be started on node #{node.hostname}") @@ -400,7 +405,7 @@ class TaskOmatic result = node.domainDefineXML(xml.to_s) raise "Error defining virtual machine: #{result.text}" unless result.status == 0 - domain = @session.object(:object_id => result.domain) + domain = @qmfc.object(:object_id => result.domain) raise "Cannot find domain on host #{db_host.hostname}, cannot start virtual machine." unless domain result = domain.create @@ -432,7 +437,7 @@ class TaskOmatic def task_suspend_vm(task) @logger.info "starting task_suspend_vm" db_vm = task.vm - dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid) + dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid) raise "Unable to locate VM to suspend" unless dom if dom.state != "running" and dom.state != "blocked" @@ -450,7 +455,7 @@ class TaskOmatic def task_resume_vm(task) @logger.info "starting task_resume_vm" db_vm = task.vm - dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid) + dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid) raise "Unable to locate VM to resume" unless dom if dom.state == "running" @@ -478,7 +483,7 @@ class TaskOmatic # need to put it on the storage server and mark it in the database # where the image is stored. db_vm = task.vm - dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid) + dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid) raise "Unable to locate VM to save" unless dom filename = "/tmp/#{dom.uuid}.save" @@ -495,7 +500,7 @@ class TaskOmatic # FIXME: This is also broken, see task_save_vm FIXME. db_vm = task.vm - dom = @session.object(:class => 'domain', 'uuid' => db_vm.uuid) + dom = @qmfc.object(:class => 'domain', 'uuid' => db_vm.uuid) raise "Unable to locate VM to restore" unless dom filename = "/tmp/#{dom.uuid}.save" @@ -508,9 +513,9 @@ class TaskOmatic def migrate(db_vm, dest = nil) - vm = @session.object(:class => "domain", 'uuid' => db_vm.uuid) + vm = @qmfc.object(:class => "domain", 'uuid' => db_vm.uuid) raise "Unable to find VM to migrate" unless vm - src_node = @session.object(:object_id => vm.node) + src_node = @qmfc.object(:object_id => vm.node) raise "Unable to find node that VM is on??" unless src_node @logger.info "Migrating domain lookup complete, domain is #{vm}" @@ -528,7 +533,7 @@ class TaskOmatic db_dst_host = find_capable_host(db_vm) end - dest_node = @session.object(:class => 'node', 'hostname' => db_dst_host.hostname) + dest_node = @qmfc.object(:class => 'node', 'hostname' => db_dst_host.hostname) raise "Unable to find host #{db_dst_host.hostname} to migrate to." unless dest_node volumes = [] @@ -589,7 +594,7 @@ class TaskOmatic next end puts "searching for node with hostname #{host.hostname}" - node = @session.object(:class => 'node', 'hostname' => host.hostname) + node = @qmfc.object(:class => 'node', 'hostname' => host.hostname) puts "node returned is #{node}" return node if node end @@ -643,13 +648,13 @@ class TaskOmatic @logger.info("refresh being done on node #{node.hostname}") phys_libvirt_pool = LibvirtPool.factory(db_pool_phys, @logger) - phys_libvirt_pool.connect(@session, node) + phys_libvirt_pool.connect(@qmfc, node) db_pool_phys.state = StoragePool::STATE_AVAILABLE db_pool_phys.save! begin # First we do the physical volumes. - volumes = @session.objects(:class => 'volume', + volumes = @qmfc.objects(:class => 'volume', 'storagePool' => phys_libvirt_pool.remote_pool.object_id) volumes.each do |volume| storage_volume = StorageVolume.factory(db_pool_phys.get_type_label) @@ -696,9 +701,9 @@ class TaskOmatic physical_vol.save! lvm_libvirt_pool = LibvirtPool.factory(lvm_db_pool, @logger) - lvm_libvirt_pool.connect(@session, node) + lvm_libvirt_pool.connect(@qmfc, node) - lvm_volumes = @session.objects(:class => 'volume', + lvm_volumes = @qmfc.objects(:class => 'volume', 'storagePool' => lvm_libvirt_pool.remote_pool.object_id) lvm_volumes.each do |lvm_volume| @@ -733,16 +738,16 @@ class TaskOmatic begin if db_volume[:type] == "LvmStorageVolume" phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume, @logger) - phys_libvirt_pool.connect(@session, node) + phys_libvirt_pool.connect(@qmfc, node) end begin libvirt_pool = LibvirtPool.factory(db_pool, @logger) begin - libvirt_pool.connect(@session, node) + libvirt_pool.connect(@qmfc, node) volume_id = libvirt_pool.create_vol(*db_volume.volume_create_params) - volume = @session.object(:object_id => volume_id) + volume = @qmfc.object(:object_id => volume_id) raise "Unable to find newly created volume" unless volume @logger.debug " volume:" @@ -776,7 +781,7 @@ class TaskOmatic # I currently refresh ALL storage pools at this time as it # shouldn't be a long operation and it doesn't hurt to refresh # them once in a while. - pools = @session.objects(:class => 'pool') + pools = @qmfc.objects(:class => 'pool') pools.each do |pool| result = pool.refresh @logger.info "Problem refreshing pool (you can probably ignore this): #{result.text}" unless result.status == 0 @@ -798,16 +803,16 @@ class TaskOmatic begin if db_volume[:type] == "LvmStorageVolume" phys_libvirt_pool = get_libvirt_lvm_pool_from_volume(db_volume, @logger) - phys_libvirt_pool.connect(@session, node) + phys_libvirt_pool.connect(@qmfc, node) @logger.info "connected to lvm pool.." end begin libvirt_pool = LibvirtPool.factory(db_pool, @logger) - libvirt_pool.connect(@session, node) + libvirt_pool.connect(@qmfc, node) begin - volume = @session.object(:class => 'volume', + volume = @qmfc.object(:class => 'volume', 'storagePool' => libvirt_pool.remote_pool.object_id, 'key' => db_volume.key) @logger.error "Unable to find volume to delete" unless volume @@ -861,7 +866,7 @@ class TaskOmatic was_disconnected = false loop do - if not @broker.connected? + if not @connection.connected? @logger.info("Cannot implement tasks, not connected to broker. Sleeping.") sleep(@sleeptime * 3) was_disconnected = true @@ -870,7 +875,7 @@ class TaskOmatic @logger.info("Reconnected, resuming task checking..") if was_disconnected was_disconnected = false - @session.object(:class => 'agent') + @qmfc.object(:class => 'agent') tasks = Array.new begin -- 1.6.2.5 _______________________________________________ Ovirt-devel mailing list [email protected] https://www.redhat.com/mailman/listinfo/ovirt-devel
