Author: fw Date: 2005-12-15 11:37:40 +0000 (Thu, 15 Dec 2005) New Revision: 3051
Modified: bin/tracker_service.py bin/update-db lib/python/security_db.py Log: lib/python/security_db.py (DB): Bump schema version. (DB.initSchema): Add debsecan_data table. (DB.calculateDebsecan, DB.getDebsecan): New methods. bin/update-db: Invoke calculateDebsecan. bin/tracker_service.py (TrackerService): Add support for debsecan/* pages. (TrackerService.page_debsecan): New method. Modified: bin/tracker_service.py =================================================================== --- bin/tracker_service.py 2005-12-15 11:34:35 UTC (rev 3050) +++ bin/tracker_service.py 2005-12-15 11:37:40 UTC (rev 3051) @@ -98,6 +98,7 @@ self.register('data/releases', self.page_data_releases) self.register('data/funny-versions', self.page_data_funny_versions) self.register('data/fake-names', self.page_data_fake_names) + self.register('debsecan/**', self.page_debsecan) def page_home(self, path, params, url): query = params.get('query', ('',))[0] @@ -854,6 +855,17 @@ make_table(gen(), caption=("Bug", "Description"))]) + def page_debsecan(self, path, params, url): + obj = '/'.join(path) + data = self.db.getDebsecan(obj) + if data: + return BinaryResult(data) + else: + return self.create_page( + url, "Object not found", + [P("The requested debsecan object has not been found.")], + status=404) + def create_page(self, url, title, body, search_in_page=False, status=200): append = body.append append(HR()) Modified: bin/update-db =================================================================== --- bin/update-db 2005-12-15 11:34:35 UTC (rev 3050) +++ bin/update-db 2005-12-15 11:37:40 UTC (rev 3051) @@ -75,6 +75,11 @@ print x sys.exit(1) +# debsecan data + +for release in ('', 'woody', 'sarge', 'etch'): + db.calculateDebsecan(release) + # Everything worked well. - + db.commit(cursor) Modified: lib/python/security_db.py =================================================================== --- lib/python/security_db.py 2005-12-15 11:34:35 UTC (rev 3050) +++ lib/python/security_db.py 2005-12-15 11:37:40 UTC (rev 3051) @@ -27,6 +27,7 @@ """ import apsw +import base64 import bugs import cPickle import cStringIO @@ -37,6 +38,7 @@ import re import sys import types +import zlib class InsertError(Exception): """Class for capturing insert errors. @@ -111,7 +113,7 @@ self.db = apsw.Connection(name) self.verbose = verbose - self.schema_version = 17 + self.schema_version = 18 self._initFunctions() c = self.cursor() @@ -309,6 +311,11 @@ loss_sec_prot_other INTEGER NOT NULL)""") cursor.execute( + """CREATE TABLE debsecan_data + (name TEXT NOT NULL PRIMARY KEY, + data TEXT NOT NULL)""") + + cursor.execute( """CREATE VIEW testing_status AS SELECT DISTINCT sp.name AS package, st.bug_name AS bug, sp.archive AS section, st.urgency AS urgency, @@ -1219,6 +1226,133 @@ VALUES (?, ?, ?, ?)""", (bug_name, suite, status, pkgs)) + def calculateDebsecan(self, release): + """Create data for the debsecan tool.""" + + c = self.cursor() + + c.execute("""CREATE TEMPORARY TABLE vulnlist ( + name TEXT NOT NULL, + package TEXT NOT NULL, + note INTEGER NOT NULL, + PRIMARY KEY (name, package) + )""") + + # Populate the table with the unstable vulnerabilities; + # override them with the release-specific status. + + c.execute("""INSERT INTO vulnlist + SELECT bug_name, package, id FROM package_notes WHERE release = ''""") + + if release: + c.execute("""INSERT OR REPLACE INTO vulnlist + SELECT bug_name, package, id FROM package_notes + WHERE release = ?""", (release,)) + + c.execute("""DELETE FROM vulnlist WHERE name LIKE 'FAKE-0000000-%'""") + + urgency_to_flag = {'low' : 'L', 'medium' : 'M', 'high' : 'H', + 'unknown' : ' '} + + result = ["VERSION 0\n"] + for (name, package, fixed_version, kind, urgency, remote, description, + note_id) in list(c.execute("""SELECT + vulnlist.name, vulnlist.package, + COALESCE(n.fixed_version, ''), + n.package_kind, n.urgency, + (SELECT range_remote FROM nvd_data + WHERE cve_name = vulnlist.name) AS remote, + bugs.description, + n.id + FROM vulnlist, bugs, package_notes AS n + WHERE bugs.name = vulnlist.name + AND n.id = vulnlist.note + ORDER BY vulnlist.package""")): + if fixed_version == '0' or urgency == 'unimportant' \ + or kind not in ('source', 'binary', 'unknown'): + continue + + # Normalize FAKE-* names a bit. The line number (which + # makes the name unique) is completely useless for the + # client. + + if name[0:5] == "FAKE-": + name = '-'.join(name.split('-')[0:2]) + + # Determine if a fix is available for the specific + # release. + + fix_available = ' ' + if release: + fix_available = ' ' + if kind == 'source': + fix_available_sql = """SELECT st.vulnerable + FROM source_packages AS p, source_package_status AS st + WHERE p.name = ? + AND p.release = ? + AND p.subrelease IN ('', 'security') + AND st.bug_name = ? + AND st.package = p.rowid + ORDER BY p.version COLLATE version DESC""" + elif kind == 'binary': + fix_available_sql = """SELECT st.vulnerable + FROM binary_packages AS p, binary_package_status AS st + WHERE p.name = ? + AND p.release = ? + AND p.subrelease IN ('', 'security') + AND st.bug_name = ? + AND st.package = p.rowid + ORDER BY p.version COLLATE version DESC""" + else: + fix_available_sql = '' + + if fix_available_sql: + for (v,) in c.execute(fix_available_sql, + (package, release, name)): + assert v is not None + if not v: + fix_available = 'F' + break + elif fixed_version <> '': + fix_available = 'F' + + if kind == 'source': + kind = 'S' + elif kind == 'binary': + kind = 'B' + else: + kind = ' ' + + if remote is None: + remote = '?' + elif remote: + remote = 'R' + else: + remote = ' ' + + result.append("%s,%c%c%c%c,%s,%s,%s\n" + % (name, + kind, urgency_to_flag[urgency], remote, + fix_available, + package, fixed_version, description)) + result = base64.encodestring(zlib.compress(''.join(result), 9)) + + if not release: + release = 'sid' + c.execute( + "INSERT OR REPLACE INTO debsecan_data (name, data) VALUES (?, ?)", + ('release/' + release, result)) + + c.execute("DROP TABLE vulnlist") + + def getDebsecan(self, name): + """Returns the debsecan data item NAME.""" + for (data,) in self.cursor().execute( + "SELECT data FROM debsecan_data WHERE name = ?", (name,)): + return base64.decodestring(data) + else: + return None + def replaceNVD(self, cursor, data): """Replaces the stored NVD data.""" cursor.execute("DELETE FROM nvd_data"); _______________________________________________ Secure-testing-commits mailing list Secure-testing-commits@lists.alioth.debian.org http://lists.alioth.debian.org/mailman/listinfo/secure-testing-commits