Hi Team, I promised to post the script I adapted for cloning errata from RHN to my Spacewalk server. I'm sad to say that it's not a completed work at this point, but does the basic things. Some things that are missing and/or are caveats:
1) Updating errata that affect another channel, should you add a channel that might be affected by an existing errata. 2) I've been a little lazy and have hard-coded my own RHN credentials in the script, rather than asking the user for the info, passing it as an argument, or reading it from a file. I'll get around to it later. 3) I would avoid errata publishing for the time-being. The functionality exists in the script, but it's not right. If you pull down an errata that's for both RHEL 4 & 5 (for instance).. you'll end up with RHEL 5 packages in your RHEL 4 channels. I need to work this out. 4) This is my first attempt at anything with Python.. so it's probably quite horrible. I will be updating this as I develop more, not sure if the Spacewalk maintainers want to add something like this to their list of user-contributed scripts... but I'd be happy to maintain it there, if so. I'm looking for feedback, suggestions... kicks. Whatever. :) Thanks, -- Andy Speagle "THE Student" - UCATS Wichita State University
#!/bin/env python # Script that uses RHN API to clone RHN Errata to Satellite # or Spacewalk server. # Copyright (C) 2008 Red Hat # # Author: Andy Speagle ([email protected]) # # This script was written based on the "rhn-clone-errata.py" # script written by: Lars Jonsson ([email protected]) # # (THANKS!) # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # import xmlrpclib from optparse import OptionParser from time import time, localtime, strftime from sets import Set import sys import os class RHNServer: def __init__(self,servername,user,passwd): self.rhnServerName = servername self.login = user self.password = passwd self.rhnUrl = 'https://'+self.rhnServerName+'/rpc/api' self.server = xmlrpclib.Server(self.rhnUrl) self.rhnSession = self.rhnLogin(self.login,self.password,0) def rhnLogin(self, login, password, retry): try: rhnSession=self.server.auth.login(login,password) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode==-20: self.rhnLogin(login,password,retry) else: print "Failed to login",f raise except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.rhnLogin(login,password, (retry + 1)) return rhnSession def getErrataDetails(self,advisory,retry): details = [] try: details = self.server.errata.getDetails(self.rhnSession,advisory) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.getDetails(self.rhnSession,advisory) elif f.faultCode == -208: if options.verbose: print "Errata %s Doesn't Exist on %s ..." % (advisory,self.rhnServerName) return [] else: raise except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.getErrataDetails(advisory, (retry + 1)) return details def getErrataKeywords(self,advisory,retry): keywords = [] try: keywords = self.server.errata.listKeywords(self.rhnSession,advisory) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.listKeywords(self.rhnSession,advisory) elif f.faultCode == -208: if options.verbose: print "Errata %s Doesn't Exist on %s ..." % (advisory,self.rhnServerName) return [] else: print "Error Getting Keywords : "+advisory except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.getErrataKeywords(advisory, (retry + 1)) return keywords def getErrataBugs(self,advisory,retry): bugs = [] try: bugs = self.server.errata.bugzillaFixes(self.rhnSession,advisory) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.bugzillaFixes(self.rhnSession,advisory) elif f.faultCode == -208: if options.verbose: print "Errata %s Doesn't Exist on %s ..." % (advisory,self.rhnServerName) return [] else: print "Error Getting Bugs : "+advisory except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.getErrataBugs(advisory, (retry + 1)) return bugs def getErrataCVEs(self,advisory,retry): cves=[] try: cves = self.server.errata.listCves(self.rhnSession,advisory) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.listCves(self.rhnSession,advisory) elif f.faultCode == -208: if options.verbose: print "Errata %s Doesn't Exist on %s ..." % (advisory,self.rhnServerName) return [] else: print "Error Getting CVEs : %s" % advisory except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.getErrataCVEs(advisory, (retry + 1)) return cves def getErrataPackages(self,advisory,retry): packages=[] try: packages = self.server.errata.listPackages(self.rhnSession,advisory) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.listPackages(self.rhnSession,advisory) elif f.faultCode == -208: if options.verbose: print "Errata %s Doesn't Exist on %s ..." % (advisory,self.rhnServerName) return [] else: print "Error Getting Packages : %s" % advisory except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.getErrataPackages(advisory, (retry + 1)) return packages def listChannelErrata(self,dest_chan,start_date,end_date,retry): out = [] try: out = self.server.channel.software.listErrata(self.rhnSession,dest_chan,start_date,end_date) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.channel.software.listErrata(self.rhnSession,dest_chan,start_date,end_date) else: raise except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.listChannelErrata(dest_chan,start_date,end_date, (retry + 1)) return out def findPackageChannels(self,pkgid,retry): channels=[] try: channels = self.server.packages.listProvidingChannels(self.rhnSession,pkgid) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.packages.listProvidingChannels(self.rhnSession,pkgid) else: print "Error Finding Channels for Package : %s" % pkgid except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.server.packages.findPackageChannels(pkgid, (retrun + 1)) return channels def cloneErrata(self,dest_chan,errata,retry): out=[] try: print "Clone errata in progress, please be patient.." out = self.server.errata.clone(self.rhnSession,dest_chan,errata) except xmlrpclib.Fault, f: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.self.server.errata.clone(self.rhnSession,dest_chan,errata) else: raise except xmlrpclib.ProtocolError, err: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.cloneErrata(dest_chan,errata, (retry + 1)) return out class SPWServer(RHNServer): def searchNVREA(self,name,version,release,epoch,archlabel,retry): package=[] try: package = self.server.packages.findByNvrea(self.rhnSession,name,version,release,epoch,archlabel) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d\tFault String: %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.packages.findByNvrea(self.rhnSession,name,version,release,archlabel) else: print "Error Finding Package via NVREA : %s" % name except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.searchNVREA(name,version,release,epoch,archlabel, (retry + 1)) return package def errataPublish(self,name,channels,retry): errata=[] try: errata = self.server.errata.publish(self.rhnSession,name,channels) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d - %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.publish(self.rhnSession,name,channels) elif f.faultCode == 2601: print "Errata Already Exists..." return [] else: print "Error Creating Errata!" raise except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.errataPublish(name,channels, (retry + 1)) return errata def errataCreate(self,info,bugs,keywords,packages,publish,channels,retry): new_errata=[] try: new_errata = self.server.errata.create(self.rhnSession,info,bugs,keywords,packages,publish,channels) except xmlrpclib.Fault, f: if options.verbose: print "Fault Code: %d - %s" % (f.faultCode,f.faultString) if f.faultCode == -20: self.rhnLogin(self.login,self.password) return self.server.errata.create(self.rhnSession,info,bugs,keywords,packages,publish,channels) elif f.faultCode == 2601: print "Errata Already Exists..." return [] else: print "Error Creating Errata!" raise except xmlrpclib.ProtocolError, err: if options.verbose: print "ProtocolError: %d - %s" % (err.errcode,err.errmsg) if retry > 3: raise else: return self.errataCreate(info,bugs,keywords,packages,publish,channels, (retry + 1)) return new_errata def parse_args(): parser = OptionParser() parser.add_option("-s", "--src-server", type="string", dest="src_server", help="Source Server (defaults to RHN hosted if omitted - rhn.redhat.com)") parser.add_option("-t", "--tgt-server", type="string", dest="tgt_server", help="Target Server, aka YOUR server") parser.add_option("-l", "--login", type="string", dest="login", help="RHN Login") parser.add_option("-p", "--password", type="string", dest="passwd", help="RHN password") parser.add_option("-c", "--src-channel", type="string", dest="src_channel", help="Source Channel Label: ie.\"rhel-x86_64-server-5\"") parser.add_option("-b", "--begin-date", type="string", dest="bdate", help="Beginning Date: ie. \"19000101\" (defaults to \"19000101\")") parser.add_option("-e", "--end-date", type="string", dest="edate", help="Ending Date: ie. \"19001231\" (defaults to TODAY)") parser.add_option("-u", "--publish", action="store_true", dest="publish", default=False, help="Publish Errata (into destination channels)") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False) parser.add_option("-q", "--quiet", action="store_true", dest="quiet", default=False) (options,args) = parser.parse_args() return options def main(): global options options = parse_args() start_date = options.bdate or '19000101' end_date = options.edate or strftime("%Y%m%d", localtime()) src_server = options.src_server or "rhn.redhat.com" if (options.tgt_server and options.login and options.passwd) is None: print "try: "+sys.argv[0]+" --help" sys.exit(2) myRHN = RHNServer(src_server,'username','password') mySPW = SPWServer(options.tgt_server,options.login,options.passwd) solution = "Before applying this update, make sure that all previously-released errata relevant to your system have been applied." for rhn_err in myRHN.listChannelErrata(options.src_channel,start_date,end_date,0): rhn_err_details=[] rhn_err_keywords=[] rhn_err_packages=[] rhn_err_bugs=[] rhn_pkg=[] spw_pkg=[] spw_bugs=[] new_err=[] dst_channels=[] channel_set=Set() if not options.quiet: print rhn_err['errata_advisory'] err_check = mySPW.getErrataDetails(rhn_err['errata_advisory'],0) if (err_check): if options.verbose: print "Errata Already Exists... skipping!" continue rhn_err_details = myRHN.getErrataDetails(rhn_err['errata_advisory'],0) rhn_err_keywords = myRHN.getErrataKeywords(rhn_err['errata_advisory'],0) rhn_err_bugs = myRHN.getErrataBugs(rhn_err['errata_advisory'],0) for bug in rhn_err_bugs: spw_bugs.append({'id': int(bug), 'summary': rhn_err_bugs[bug]}) for rhn_pkg in myRHN.getErrataPackages(rhn_err['errata_advisory'],0): spw_pkg = mySPW.searchNVREA(rhn_pkg['package_name'],\ rhn_pkg['package_version'],\ rhn_pkg['package_release'],\ '',\ rhn_pkg['package_arch_label'],\ 0) if spw_pkg: rhn_err_packages.append(spw_pkg[0]['id']) for err_chan in mySPW.findPackageChannels(spw_pkg[0]['id'],0): if err_chan['label'] not in channel_set: channel_set.add(err_chan['label']) if not options.quiet: print "\t"+rhn_err_details['errata_issue_date']+" - "+rhn_err_details['errata_synopsis'] for channel in channel_set: dst_channels.append(channel) new_err = mySPW.errataCreate( { 'synopsis': rhn_err_details['errata_synopsis'],\ 'advisory_name': rhn_err['errata_advisory'],\ 'advisory_release': 1,\ 'advisory_type': rhn_err_details['errata_type'],\ 'product': 'RHEL',\ 'topic': rhn_err_details['errata_topic'],\ 'description': rhn_err_details['errata_description'],\ 'references': rhn_err_details['errata_references'],\ 'notes': rhn_err_details['errata_notes'],\ 'solution': solution},\ spw_bugs,\ rhn_err_keywords,\ rhn_err_packages,\ 0,\ [],\ 0) print "\tErrata Created: %d" % new_err['id'] if options.publish and dst_channels: for channel in channel_set: err_publish = [] err_publish = mySPW.errataPublish(rhn_err['errata_advisory'],[channel],0) if err_publish['id']: print "\tErrata Published to Channel: %s" % channel if __name__ == "__main__": main()
signature.asc
Description: This is a digitally signed message part
_______________________________________________ Spacewalk-list mailing list [email protected] https://www.redhat.com/mailman/listinfo/spacewalk-list
