On Tuesday 08 February à 19:01, Mike Brown wrote: > Sylvain Thénault wrote: > > I guess you're right. I wrote this patch because it was fixing my > > problem. Now if it doesn't take too much time to have every cases > > correctly fixed by implementing RFC 3986, I may take some time to do so > > or to help having it done. And if parts of the job is already done in > > 4suite, that's great. However what's in 4suite, what's not and need to > > be implemented is not yet clear to me. > > The current version of Ft.Lib.Uri is here: > http://cvs.4suite.org/viewcvs/4Suite/Ft/Lib/Uri.py?view=markup [1] > > If you see "rfc2396bis" in the doc strings, you may safely interpret > them to mean "RFC 3986". > > > The functions that you should look at are the following: > > MakeUrllibSafe(uriRef) > ====================== > This exists in order to convert a proper URI reference into one that > can be handled by urllib.urlopen(). It does the following: > 1. If the reference contains an Internationalized Domain Name, > recodes it so that it is resolvable. (Py 2.3+ only) > 2. Strips the fragment component, if any. > 3. Ensures that the reference is a byte string, not unicode. > 4. On Windows, assumes that the first ':' appearing in the path > component is part of a drivespec, and converts it to '|'. > > If you port this function, the reference to PercentDecode() may be replaced > with urllib.unquote(), but you must move the byte string check (#3, above) to > occur before calling unquote. The references to the functions SplitUriRef and > UnsplitUriRef can be replaced with urlsplit() and urlunsplit() from the > urlparse module. > > > Absolutize(uriRef, baseUri) > =========================== > This does strict merging of a URI reference and a base URI. The base URI > *must* be absolute (must have a scheme). If you port this function, the > UriException may be replaced with a ValueError, and SplitUriRef & > UnsplitUriRef may be replaced with their urlparse equivalents, as > mentioned above. The RemoveDotSegments function must also be ported and > should be made semi-private because it is not for general use. I've > implemented it using two segment stacks, as alluded to in the spec, > rather than the explicit string-walking algorithm that would be too > inefficient. > > > BaseJoin(base, UriRef) > ====================== > This does lenient merging of a base URI and a URI reference (note the > argument order is different than that of Absolutize). It allows the base > URI to be a relative reference. In such cases, we use a dummy scheme > (we don't say "assume 'file:' because the spec says all schemes must be > resolved the same), run it through Absolutize, and then remove the scheme > from the result. If you port this function, you will need to port the > IsAbsolute function, which just checks to see if the URI has a scheme. > I prefer to use a regex for this, as it is fast and accurate (':' can > appear in more than one place in a URI reference, so it is not safe to > assume that its presence means there is a scheme).
thanks a lot. Actually almost all the work is already done right there. Here is what I've worked on. Once we'll reach a consensus, I'll add that to pyxml. So I've joined to this mail: - a light version of 4Suite Uri.py including the following functions: SplitUriRef, UnsplitUriRef (it was really less annoying to use those two functions than the equivalent urllib's ones), Absolutize, MakeUrllibSafe, _RemoveDotSegments, BaseJoin, GetScheme and IsAbsolute. With the presented solution, the 3 last ones are not used and could be removed, but I've kept them in for now. Every tests for Absolutize from 4suite are still passing. - a modified version of saxutils, expecting the Uri module above to be in the _xmlplus directory (ie importable as xml.Uri). I've refactored prepare_input_source to ease testing of the URI merging stuff. - a unittest file, which include some test cases for the URI merging function. Please take a look at the existant test cases to check everything looks fine to you. If you have other case to add, please let me know (or maybe can I add this file to the cvs first). Notice that to run the tests, you should have a "quotes.xml" file in the same directory as the test file (there is one in the test directory of pyxml). As a bonus, I've converted the escape function test from test_utils into a unittest in the same file. Anyway, having SplitUriRef/UnsplitUriRef replacing urlparse.urlsplit/urlunsplit and Absolutize or BaseJoin replacing urlparse.urljoin would definitly be the right thing. -- Sylvain Thénault LOGILAB, Paris (France). http://www.logilab.com http://www.logilab.fr http://www.logilab.org
import unittest from os.path import dirname, abspath, join from xml.sax.saxutils import escape, absolute_system_id class EscapeTC(unittest.TestCase): def test(self): v1, v2 = escape('&<>'), '&<>' self.assertEquals(v1, v2) v1, v2 = escape('foo&bar'), 'foo&amp;bar' self.assertEquals(v1, v2) v1, v2 = escape('< test > &', {'test': '&myentity;'}), '< &myentity; > &' self.assertEquals(v1, v2) v1, v2 = escape('&\'"<>', {'"': '"', "'": '''}), '&'"<>' self.assertEquals(v1, v2) TEST_DIR = abspath(dirname(__file__)) + '/' class AbsoluteSystemIdTC(unittest.TestCase): def test_base(self): res = absolute_system_id('http://www.xml.com') self.assertEquals(res, 'http://www.xml.com') res = absolute_system_id('http://www.xml.com', 'http://whatever') self.assertEquals(res, 'http://www.xml.com') res = absolute_system_id('quotes.xml') self.assertEquals(res, 'file://%s' % join(TEST_DIR, 'quotes.xml')) def test_relative(self): # FIXME: empty authority // added by MakeUrlLibSafe (actually by # urlunsplit), which is probably acceptable since the sysid is designed # to be used by urlopen res = absolute_system_id('quotes.xml', 'file:%s' % TEST_DIR) self.assertEquals(res, 'file://%squotes.xml' % TEST_DIR) res = absolute_system_id('relative.xml', 'file:/base') self.assertEquals(res, 'file:///relative.xml') res = absolute_system_id('relative.xml', 'file:/base/') self.assertEquals(res, 'file:///base/relative.xml') res = absolute_system_id('file:relative.xml', 'file:/base') self.assertEquals(res, 'file:///relative.xml') def test_no_base_scheme(self): # FIXME: warning ? self.assertRaises(ValueError, absolute_system_id, 'file:relative.xml', '/base') if __name__ == '__main__': unittest.main()
# pylint: disable-msg=C0103 # # backported code from 4Suite with slight modifications, started from r1.89 of # Ft/Lib/Uri.py, by [EMAIL PROTECTED] on 2005-02-09 # # part if not all of this code should probably move to urlparse (or be used # to fix some existant functions in this module) # # # Copyright 2004 Fourthought, Inc. (USA). # Detailed license and copyright information: http://4suite.org/COPYRIGHT # Project home, documentation, distributions: http://4suite.org/ import os.path import sys import re import urlparse, urllib, urllib2 def UnsplitUriRef(uriRefSeq): """should replace urlparse.urlunsplit Given a sequence as would be produced by SplitUriRef(), assembles and returns a URI reference as a string. """ if not (isinstance(uriRefSeq, tuple) or isinstance(uriRefSeq, list)): raise TypeError("sequence expected, got %s" % type(uriRefSeq)) #print 'unsplit', uriRefSeq (scheme, authority, path, query, fragment) = uriRefSeq uri = '' if scheme is not None: uri += scheme + ':' if authority is not None: uri += '//' + authority uri += path if query is not None: uri += '?' + query if fragment is not None: uri += '#' + fragment return uri SPLIT_URI_REF_PATTERN = re.compile(r"^(?:(?P<scheme>[^:/?#]+):)?(?://(?P<authority>[^/?#]*))?(?P<path>[^?#]*)(?:\?(?P<query>[^#]*))?(?:#(?P<fragment>.*))?$") def SplitUriRef(uriref): """should replace urlparse.urlsplit Given a valid URI reference as a string, returns a tuple representing the generic URI components, as per RFC 2396 appendix B. The tuple's structure is (scheme, authority, path, query, fragment). All values will be strings (possibly empty) or None if undefined. Note that per rfc2396bis, there is no distinction between a path and an "opaque part", as there was in RFC 2396. """ # the pattern will match every possible string, so it's safe to # assume there's a groupdict method to call. g = SPLIT_URI_REF_PATTERN.match(uriref).groupdict() scheme = g['scheme'] authority = g['authority'] path = g['path'] query = g['query'] fragment = g['fragment'] return (scheme, authority, path, query, fragment) def Absolutize(uriRef, baseUri): """ Resolves a URI reference to absolute form, effecting the result of RFC 2396bis section 5. The URI reference is considered to be relative to the given base URI. It is the caller's responsibility to ensure that the base URI matches the absolute-URI syntax rule of rfc2396bis, and that its path component does not contain '.' or '..' segments if the scheme is hierarchical. Unexpected results may occur otherwise. This function only conducts a minimal sanity check in order to determine if relative resolution is possible: it raises a ValueError if the base URI does not have a scheme component. While it is true that the base URI is irrelevant if the URI reference has a scheme, an exception is raised in order to signal that the given string does not even come close to meeting the criteria to be usable as a base URI. It is the caller's responsibility to make a determination of whether the URI reference constitutes a "same-document reference", as defined in RFC 2396 or rfc2396bis. As per the spec, dereferencing a same-document reference "should not" involve retrieval of a new representation of the referenced resource. Note that the two specs have different definitions of same-document reference: RFC 2396 says it is *only* the cases where the reference is the empty string, or "#" followed by a fragment; rfc2396bis "emacspymodestink requires making a comparison of the base URI to the absolute form of the reference (as is returned by the spec), minus its fragment component, if any. This function is similar to urlparse.urljoin() and urllib.basejoin(). Those functions, however, are (as of Python 2.3) outdated, buggy, and/or designed to produce results acceptable for use with other core Python libraries, rather than being earnest implementations of the relevant specs. Their problems are most noticeable in their handling of same-document references and 'file:' URIs, both being situations that come up far too often to consider the functions reliable enough for general use. """ # Reasons to avoid using urllib.basejoin() and urlparse.urljoin(): # - Both are partial implementations of long-obsolete specs. # - Both accept relative URLs as the base, which no spec allows. # - urllib.basejoin() mishandles the '' and '..' references. # - If the base URL uses a non-hierarchical or relative path, # or if the URL scheme is unrecognized, the result is not # always as expected (partly due to issues in RFC 1808). # - If the authority component of a 'file' URI is empty, # the authority component is removed altogether. If it was # not present, an empty authority component is in the result. # - '.' and '..' segments are not always collapsed as well as they # should be (partly due to issues in RFC 1808). # - Effective Python 2.4, urllib.basejoin() *is* urlparse.urljoin(), # but urlparse.urljoin() is still based on RFC 1808. #print 'absolutize', baseUri, uriRef # This procedure is based on the pseudocode in rfc2396bis sec. 5.2. # # ensure base URI is absolute if not baseUri: raise ValueError('baseUri is required and must be a non empty string') if not IsAbsolute(baseUri): raise ValueError('%r is not an absolute URI' % baseUri) # shortcut for the simplest same-document reference cases if uriRef == '' or uriRef[0] == '#': return baseUri.split('#')[0] + uriRef # ensure a clean slate tScheme = tAuth = tPath = tQuery = None # parse the reference into its components (rScheme, rAuth, rPath, rQuery, rFrag) = SplitUriRef(uriRef) # if the reference is absolute, eliminate '.' and '..' path segments # and skip to the end if rScheme is not None: tScheme = rScheme tAuth = rAuth tPath = _RemoveDotSegments(rPath) tQuery = rQuery else: # the base URI's scheme, and possibly more, will be inherited (bScheme, bAuth, bPath, bQuery, bFrag) = SplitUriRef(baseUri) # if the reference is a net-path, just eliminate '.' and '..' path # segments; no other changes needed. if rAuth is not None: tAuth = rAuth tPath = _RemoveDotSegments(rPath) tQuery = rQuery # if it's not a net-path, we need to inherit pieces of the base URI else: # use base URI's path if the reference's path is empty if not rPath: tPath = bPath # use the reference's query, if any, or else the base URI's, tQuery = rQuery and rQuery or bQuery # the reference's path is not empty else: # just use the reference's path if it's absolute if rPath[0] == '/': tPath = _RemoveDotSegments(rPath) # merge the reference's relative path with the base URI's path else: if bAuth is not None and not bPath: tPath = '/' + rPath else: tPath = bPath[:bPath.rfind('/')+1] + rPath tPath = _RemoveDotSegments(tPath) # use the reference's query tQuery = rQuery # since the reference isn't a net-path, # use the authority from the base URI tAuth = bAuth # inherit the scheme from the base URI tScheme = bScheme # always use the reference's fragment (but no need to define another var) #tFrag = rFrag # now compose the target URI (rfc2396bis sec. 5.3) return UnsplitUriRef((tScheme, tAuth, tPath, tQuery, rFrag)) REG_NAME_HOST_PATTERN = re.compile(r"^(?:(?:[0-9A-Za-z\-_\.!~*'();&=+$,]|(?:%[0-9A-Fa-f]{2}))*)$") def MakeUrllibSafe(uriRef): """ Makes the given rfc2396bis-conformant URI reference safe for passing to legacy urllib functions. The result may not be a valid URI. As of Python 2.3.3, urllib.urlopen() does not fully support internationalized domain names, it does not strip fragment components, and on Windows, it expects file URIs to use '|' instead of ':' in the path component corresponding to the drivespec. It also relies on urllib.unquote(), which mishandles unicode arguments. This function produces a URI reference that will work around these issues, although the IDN workaround is limited to Python 2.3 only. May raise a UnicodeEncodeError if the URI reference is Unicode and erroneously contains non-ASCII characters. """ # IDN support requires decoding any percent-encoded octets in the # host part (if it's a reg-name) of the authority component, and when # doing DNS lookups, applying IDNA encoding to that string first. # As of Python 2.3, there is an IDNA codec, and the socket and httplib # modules accept Unicode strings and apply IDNA encoding automatically # where necessary. However, urllib.urlopen() has not yet been updated # to do the same; it raises an exception if you give it a Unicode # string, and does no conversion on non-Unicode strings, meaning you # have to give it an IDNA string yourself. We will only support it on # Python 2.3 and up. # # see if host is a reg-name, as opposed to IPv4 or IPv6 addr. if isinstance(uriRef, unicode): uriRef = uriRef.encode('us-ascii') # parts of urllib are not unicode safe (scheme, auth, path, query, frag) = urlparse.urlsplit(uriRef) if auth and auth.find('@') > -1: userinfo, hostport = auth.split('@') else: userinfo = None hostport = auth if hostport and hostport.find(':') > -1: host, port = hostport.split(':') else: host = hostport port = None if host and REG_NAME_HOST_PATTERN.match(host): # percent-encoded hostnames will always fail DNS lookups host = urllib.unquote(host) #PercentDecode(host) # IDNA-encode if possible. # We shouldn't do this for schemes that don't need DNS lookup, # but are there any (that you'd be calling urlopen for)? if sys.version_info[0:2] >= (2, 3): if isinstance(host, str): host = host.decode('utf-8') host = host.encode('idna') # reassemble the authority with the new hostname # (percent-decoded, and possibly IDNA-encoded) auth = '' if userinfo: auth += userinfo + '@' auth += host if port: auth += ':' + port # On Windows, ensure that '|', not ':', is used in a drivespec. if os.name == 'nt' and scheme == 'file': path = path.replace(':', '|', 1) # Note that we drop fragment, if any. See rfc2396bis sec. 3.5. uri = urlparse.urlunsplit((scheme, auth, path, query, None)) return uri def BaseJoin(base, uriRef): """ Merges a base URI reference with another URI reference, returning a new URI reference. It behaves exactly the same as Absolutize(), except the arguments are reversed, and it accepts any URI reference (even a relative URI) as the base URI. If the base has no scheme component, it is evaluated as if it did, and then the scheme component of the result is removed from the result, unless the uriRef had a scheme. Thus, if neither argument has a scheme component, the result won't have one. This function is named BaseJoin because it is very much like urllib.basejoin(), but it follows the current rfc2396bis algorithms for path merging, dot segment elimination, and inheritance of query and fragment components. WARNING: This function exists for 2 reasons: (1) because of a need within the 4Suite repository to perform URI reference absolutization using base URIs that are stored (inappropriately) as absolute paths in the subjects of statements in the RDF model, and (2) because of a similar need to interpret relative repo paths in a 4Suite product setup.xml file as being relative to a path that can be set outside the document. When these needs go away, this function probably will, too, so it is not advisable to use it. """ if IsAbsolute(base): return Absolutize(uriRef, base) else: dummyscheme = 'basejoin' res = Absolutize(uriRef, '%s:%s' % (dummyscheme, base)) if IsAbsolute(uriRef): # scheme will be inherited from uriRef return res else: # no scheme in, no scheme out return res[len(dummyscheme)+1:] def _RemoveDotSegments(path): """ Supports Absolutize() by implementing the remove_dot_segments function described in rfc2396bis sec. 5.2. It collapses most of the '.' and '..' segments out of a path without eliminating empty segments. It is intended to be used during the path merging process and may not give expected results when used independently. Use NormalizePathSegments() or NormalizePathSegmentsInUri() if more general normalization is desired. semi-private because it is not for general use. I've implemented it using two segment stacks, as alluded to in the spec, rather than the explicit string-walking algorithm that would be too inefficient. (mbrown) """ # return empty string if entire path is just "." or ".." if path == '.' or path == '..': return path[0:0] # preserves string type # remove all "./" or "../" segments at the beginning while path: if path[:2] == './': path = path[2:] elif path[:3] == '../': path = path[3:] else: break # We need to keep track of whether there was a leading slash, # because we're going to drop it in order to prevent our list of # segments from having an ambiguous empty first item when we call # split(). leading_slash = 0 if path[:1] == '/': path = path[1:] leading_slash = 1 # replace a trailing "/." with just "/" if path[-2:] == '/.': path = path[:-1] # convert the segments into a list and process each segment in # order from left to right. segments = path.split('/') keepers = [] segments.reverse() while segments: seg = segments.pop() # '..' means drop the previous kept segment, if any. # If none, and if the path is relative, then keep the '..'. # If the '..' was the last segment, ensure # that the result ends with '/'. if seg == '..': if keepers: keepers.pop() elif not leading_slash: keepers.append(seg) if not segments: keepers.append('') # ignore '.' segments and keep all others, even empty ones elif seg != '.': keepers.append(seg) # reassemble the kept segments return leading_slash * '/' + '/'.join(keepers) SCHEME_PATTERN = re.compile(r'([a-zA-Z][a-zA-Z0-9+\-.]*):') def GetScheme(uriRef): """ Obtains, with optimum efficiency, just the scheme from a URI reference. Returns a string, or if no scheme could be found, returns None. """ # Using a regex seems to be the best option. Called 50,000 times on # different URIs, on a 1.0-GHz PIII with FreeBSD 4.7 and Python # 2.2.1, this method completed in 0.95s, and 0.05s if there was no # scheme to find. By comparison, # urllib.splittype()[0] took 1.5s always; # Ft.Lib.Uri.SplitUriRef()[0] took 2.5s always; # urlparse.urlparse()[0] took 3.5s always. m = SCHEME_PATTERN.match(uriRef) if m is None: return None else: return m.group(1) def IsAbsolute(identifier): """ Given a string believed to be a URI or URI reference, tests that it is absolute (as per RFC 2396), not relative -- i.e., that it has a scheme. """ # We do it this way to avoid compiling another massive regex. return GetScheme(identifier) is not None
""" A library of useful helper classes to the saxlib classes, for the convenience of application and driver writers. $Id: saxutils.py,v 1.35 2004/03/20 07:46:04 fdrake Exp $ """ import os, urlparse, urllib, urllib2, types import handler import xmlreader import sys, _exceptions, saxlib try: _StringTypes = [types.StringType, types.UnicodeType] except AttributeError: # 1.5 compatibility:UnicodeType not defined _StringTypes = [types.StringType] def __dict_replace(s, d): """Replace substrings of a string using a dictionary.""" for key, value in d.items(): s = s.replace(key, value) return s def escape(data, entities={}): """Escape &, <, and > in a string of data. You can escape other strings of data by passing a dictionary as the optional entities parameter. The keys and values must all be strings; each key will be replaced with its corresponding value. """ data = data.replace("&", "&") data = data.replace("<", "<") data = data.replace(">", ">") if entities: data = __dict_replace(data, entities) return data def unescape(data, entities={}): """Unescape &, <, and > in a string of data. You can unescape other strings of data by passing a dictionary as the optional entities parameter. The keys and values must all be strings; each key will be replaced with its corresponding value. """ data = data.replace("<", "<") data = data.replace(">", ">") if entities: data = __dict_replace(data, entities) # must do ampersand last return data.replace("&", "&") def quoteattr(data, entities={}): """Escape and quote an attribute value. Escape &, <, and > in a string of data, then quote it for use as an attribute value. The \" character will be escaped as well, if necessary. You can escape other strings of data by passing a dictionary as the optional entities parameter. The keys and values must all be strings; each key will be replaced with its corresponding value. """ data = escape(data, entities) if '"' in data: if "'" in data: data = '"%s"' % data.replace('"', """) else: data = "'%s'" % data else: data = '"%s"' % data return data # --- DefaultHandler class DefaultHandler(handler.EntityResolver, handler.DTDHandler, handler.ContentHandler, handler.ErrorHandler): """Default base class for SAX2 event handlers. Implements empty methods for all callback methods, which can be overridden by application implementors. Replaces the deprecated SAX1 HandlerBase class.""" # --- Location class Location: """Represents a location in an XML entity. Initialized by being passed a locator, from which it reads off the current location, which is then stored internally.""" def __init__(self, locator): self.__col = locator.getColumnNumber() self.__line = locator.getLineNumber() self.__pubid = locator.getPublicId() self.__sysid = locator.getSystemId() def getColumnNumber(self): return self.__col def getLineNumber(self): return self.__line def getPublicId(self): return self.__pubid def getSystemId(self): return self.__sysid def __str__(self): if self.__line is None: line = "?" else: line = self.__line if self.__col is None: col = "?" else: col = self.__col return "%s:%s:%s" % ( self.__sysid or self.__pubid or "<unknown>", line, col) # --- ErrorPrinter class ErrorPrinter: "A simple class that just prints error messages to standard out." def __init__(self, level=0, outfile=sys.stderr): self._level = level self._outfile = outfile def warning(self, exception): if self._level <= 0: self._outfile.write("WARNING in %s: %s\n" % (self.__getpos(exception), exception.getMessage())) def error(self, exception): if self._level <= 1: self._outfile.write("ERROR in %s: %s\n" % (self.__getpos(exception), exception.getMessage())) def fatalError(self, exception): if self._level <= 2: self._outfile.write("FATAL ERROR in %s: %s\n" % (self.__getpos(exception), exception.getMessage())) def __getpos(self, exception): if isinstance(exception, _exceptions.SAXParseException): return "%s:%s:%s" % (exception.getSystemId(), exception.getLineNumber(), exception.getColumnNumber()) else: return "<unknown>" # --- ErrorRaiser class ErrorRaiser: "A simple class that just raises the exceptions it is passed." def __init__(self, level = 0): self._level = level def error(self, exception): if self._level <= 1: raise exception def fatalError(self, exception): if self._level <= 2: raise exception def warning(self, exception): if self._level <= 0: raise exception # --- AttributesImpl now lives in xmlreader from xmlreader import AttributesImpl # --- XMLGenerator is the SAX2 ContentHandler for writing back XML import codecs def _outputwrapper(stream,encoding): writerclass = codecs.lookup(encoding)[3] return writerclass(stream) if hasattr(codecs, "register_error"): def writetext(stream, text, entities={}): stream.errors = "xmlcharrefreplace" stream.write(escape(text, entities)) stream.errors = "strict" else: def writetext(stream, text, entities={}): text = escape(text, entities) try: stream.write(text) except UnicodeError: for c in text: try: stream.write(c) except UnicodeError: stream.write(u"&#%d;" % ord(c)) def writeattr(stream, text): countdouble = text.count('"') if countdouble: countsingle = text.count("'") if countdouble <= countsingle: entities = {'"': """} quote = '"' else: entities = {"'": "'"} quote = "'" else: entities = {} quote = '"' stream.write(quote) writetext(stream, text, entities) stream.write(quote) class XMLGenerator(handler.ContentHandler): GENERATED_PREFIX = "xml.sax.saxutils.prefix%s" def __init__(self, out=None, encoding="iso-8859-1"): if out is None: import sys out = sys.stdout handler.ContentHandler.__init__(self) self._out = _outputwrapper(out,encoding) self._ns_contexts = [{}] # contains uri -> prefix dicts self._current_context = self._ns_contexts[-1] self._undeclared_ns_maps = [] self._encoding = encoding self._generated_prefix_ctr = 0 return # ContentHandler methods def startDocument(self): self._out.write('<?xml version="1.0" encoding="%s"?>\n' % self._encoding) def startPrefixMapping(self, prefix, uri): self._ns_contexts.append(self._current_context.copy()) self._current_context[uri] = prefix self._undeclared_ns_maps.append((prefix, uri)) def endPrefixMapping(self, prefix): self._current_context = self._ns_contexts[-1] del self._ns_contexts[-1] def startElement(self, name, attrs): self._out.write('<' + name) for (name, value) in attrs.items(): self._out.write(' %s=' % name) writeattr(self._out, value) self._out.write('>') def endElement(self, name): self._out.write('</%s>' % name) def startElementNS(self, name, qname, attrs): if name[0] is None: name = name[1] elif self._current_context[name[0]] is None: # default namespace name = name[1] else: name = self._current_context[name[0]] + ":" + name[1] self._out.write('<' + name) for k,v in self._undeclared_ns_maps: if k is None: self._out.write(' xmlns="%s"' % (v or '')) else: self._out.write(' xmlns:%s="%s"' % (k,v)) self._undeclared_ns_maps = [] for (name, value) in attrs.items(): if name[0] is None: name = name[1] elif self._current_context[name[0]] is None: # default namespace #If an attribute has a nsuri but not a prefix, we must #create a prefix and add a nsdecl prefix = self.GENERATED_PREFIX % self._generated_prefix_ctr self._generated_prefix_ctr = self._generated_prefix_ctr + 1 name = prefix + ':' + name[1] self._out.write(' xmlns:%s=%s' % (prefix, quoteattr(name[0]))) self._current_context[name[0]] = prefix else: name = self._current_context[name[0]] + ":" + name[1] self._out.write(' %s=' % name) writeattr(self._out, value) self._out.write('>') def endElementNS(self, name, qname): # XXX: if qname is not None, we better use it. # Python 2.0b2 requires us to use the recorded prefix for # name[0], though if name[0] is None: qname = name[1] elif self._current_context[name[0]] is None: qname = name[1] else: qname = self._current_context[name[0]] + ":" + name[1] self._out.write('</%s>' % qname) def characters(self, content): writetext(self._out, content) def ignorableWhitespace(self, content): self._out.write(content) def processingInstruction(self, target, data): self._out.write('<?%s %s?>' % (target, data)) class LexicalXMLGenerator(XMLGenerator, saxlib.LexicalHandler): """A XMLGenerator that also supports the LexicalHandler interface""" def __init__(self, out=None, encoding="iso-8859-1"): XMLGenerator.__init__(self, out, encoding) self._in_cdata = 0 def characters(self, content): if self._in_cdata: self._out.write(content.replace(']]>', ']]>]]><![CDATA[')) else: self._out.write(escape(content)) # LexicalHandler methods # (we only support the most important ones and inherit the rest) def startDTD(self, name, public_id, system_id): self._out.write('<!DOCTYPE %s' % name) if public_id: self._out.write(' PUBLIC %s %s' % ( quoteattr(public_id or ""), quoteattr(system_id or "") )) elif system_id: self._out.write(' SYSTEM %s' % quoteattr(system_id or "")) def endDTD(self): self._out.write('>') def comment(self, content): self._out.write('<!--') self._out.write(content) self._out.write('-->') def startCDATA(self): self._in_cdata = 1 self._out.write('<![CDATA[') def endCDATA(self): self._in_cdata = 0 self._out.write(']]>') # --- ContentGenerator is the SAX1 DocumentHandler for writing back XML class ContentGenerator(XMLGenerator): def characters(self, str, start, end): # In SAX1, characters receives start and end; in SAX2, it receives # a string. For plain strings, we may want to use a buffer object. return XMLGenerator.characters(self, str[start:start+end]) # --- XMLFilterImpl class XMLFilterBase(saxlib.XMLFilter): """This class is designed to sit between an XMLReader and the client application's event handlers. By default, it does nothing but pass requests up to the reader and events on to the handlers unmodified, but subclasses can override specific methods to modify the event stream or the configuration requests as they pass through.""" # ErrorHandler methods def error(self, exception): self._err_handler.error(exception) def fatalError(self, exception): self._err_handler.fatalError(exception) def warning(self, exception): self._err_handler.warning(exception) # ContentHandler methods def setDocumentLocator(self, locator): self._cont_handler.setDocumentLocator(locator) def startDocument(self): self._cont_handler.startDocument() def endDocument(self): self._cont_handler.endDocument() def startPrefixMapping(self, prefix, uri): self._cont_handler.startPrefixMapping(prefix, uri) def endPrefixMapping(self, prefix): self._cont_handler.endPrefixMapping(prefix) def startElement(self, name, attrs): self._cont_handler.startElement(name, attrs) def endElement(self, name): self._cont_handler.endElement(name) def startElementNS(self, name, qname, attrs): self._cont_handler.startElementNS(name, qname, attrs) def endElementNS(self, name, qname): self._cont_handler.endElementNS(name, qname) def characters(self, content): self._cont_handler.characters(content) def ignorableWhitespace(self, chars): self._cont_handler.ignorableWhitespace(chars) def processingInstruction(self, target, data): self._cont_handler.processingInstruction(target, data) def skippedEntity(self, name): self._cont_handler.skippedEntity(name) # DTDHandler methods def notationDecl(self, name, publicId, systemId): self._dtd_handler.notationDecl(name, publicId, systemId) def unparsedEntityDecl(self, name, publicId, systemId, ndata): self._dtd_handler.unparsedEntityDecl(name, publicId, systemId, ndata) # EntityResolver methods def resolveEntity(self, publicId, systemId): self._ent_handler.resolveEntity(publicId, systemId) # XMLReader methods def parse(self, source): self._parent.setContentHandler(self) self._parent.setErrorHandler(self) self._parent.setEntityResolver(self) self._parent.setDTDHandler(self) self._parent.parse(source) def setLocale(self, locale): self._parent.setLocale(locale) def getFeature(self, name): return self._parent.getFeature(name) def setFeature(self, name, state): self._parent.setFeature(name, state) def getProperty(self, name): return self._parent.getProperty(name) def setProperty(self, name, value): self._parent.setProperty(name, value) # FIXME: remove this backward compatibility hack when not needed anymore XMLFilterImpl = XMLFilterBase # --- BaseIncrementalParser class BaseIncrementalParser(xmlreader.IncrementalParser): """This class implements the parse method of the XMLReader interface using the feed, close and reset methods of the IncrementalParser interface as a convenience to SAX 2.0 driver writers.""" def parse(self, source): source = prepare_input_source(source) self.prepareParser(source) self._cont_handler.startDocument() # FIXME: what about char-stream? inf = source.getByteStream() buffer = inf.read(16384) while buffer != "": self.feed(buffer) buffer = inf.read(16384) self.close() self.reset() self._cont_handler.endDocument() def prepareParser(self, source): """This method is called by the parse implementation to allow the SAX 2.0 driver to prepare itself for parsing.""" raise NotImplementedError("prepareParser must be overridden!") # --- Utility functions def prepare_input_source(source, base = ""): """This function takes an InputSource and an optional base URL and returns a fully resolved InputSource object ready for reading.""" if type(source) in _StringTypes: source = xmlreader.InputSource(source) elif hasattr(source, "read"): f = source source = xmlreader.InputSource() source.setByteStream(f) if hasattr(f, "name"): source.setSystemId(f.name) if source.getByteStream() is None: sysid = absolute_system_id(source.getSystemId(), base) source.setSystemId(sysid) f = urllib2.urlopen(sysid) source.setByteStream(f) return source from xml.Uri import Absolutize, MakeUrllibSafe,IsAbsolute def absolute_system_id(sysid, base=''): # if a base is given, sysid may be relative to it, make the # join before isfile() test if base: basehead = os.path.split(os.path.abspath(base))[0] path = os.path.join(basehead, sysid) else: path = os.path.abspath(sysid) if os.path.isfile(path): sysid = 'file:%s' % path elif base: sysid = Absolutize(sysid, base) #assert IsAbsolute(sysid) return MakeUrllibSafe(sysid) # =========================================================================== # # DEPRECATED SAX 1.0 CLASSES # # =========================================================================== # --- AttributeMap class AttributeMap: """An implementation of AttributeList that takes an (attr,val) hash and uses it to implement the AttributeList interface.""" def __init__(self, map): self.map=map def getLength(self): return len(self.map.keys()) def getName(self, i): try: return self.map.keys()[i] except IndexError,e: return None def getType(self, i): return "CDATA" def getValue(self, i): try: if type(i)==types.IntType: return self.map[self.getName(i)] else: return self.map[i] except KeyError,e: return None def __len__(self): return len(self.map) def __getitem__(self, key): if type(key)==types.IntType: return self.map.keys()[key] else: return self.map[key] def items(self): return self.map.items() def keys(self): return self.map.keys() def has_key(self,key): return self.map.has_key(key) def get(self, key, alternative=None): return self.map.get(key, alternative) def copy(self): return AttributeMap(self.map.copy()) def values(self): return self.map.values() # --- Event broadcasting object class EventBroadcaster: """Takes a list of objects and forwards any method calls received to all objects in the list. The attribute list holds the list and can freely be modified by clients.""" class Event: "Helper objects that represent event methods." def __init__(self,list,name): self.list=list self.name=name def __call__(self,*rest): for obj in self.list: apply(getattr(obj,self.name), rest) def __init__(self,list): self.list=list def __getattr__(self,name): return self.Event(self.list,name) def __repr__(self): return "<EventBroadcaster instance at %d>" % id(self) # --- ESIS document handler import saxlib class ESISDocHandler(saxlib.HandlerBase): "A SAX document handler that produces naive ESIS output." def __init__(self,writer=sys.stdout): self.writer=writer def processingInstruction (self,target, remainder): """Receive an event signalling that a processing instruction has been found.""" self.writer.write("?"+target+" "+remainder+"\n") def startElement(self,name,amap): "Receive an event signalling the start of an element." self.writer.write("("+name+"\n") for a_name in amap.keys(): self.writer.write("A"+a_name+" "+amap[a_name]+"\n") def endElement(self,name): "Receive an event signalling the end of an element." self.writer.write(")"+name+"\n") def characters(self,data,start_ix,length): "Receive an event signalling that character data has been found." self.writer.write("-"+data[start_ix:start_ix+length]+"\n") # --- XML canonizer class Canonizer(saxlib.HandlerBase): "A SAX document handler that produces canonized XML output." def __init__(self,writer=sys.stdout): self.elem_level=0 self.writer=writer def processingInstruction (self,target, remainder): if not target=="xml": self.writer.write("<?"+target+" "+remainder+"?>") def startElement(self,name,amap): self.writer.write("<"+name) a_names=amap.keys() a_names.sort() for a_name in a_names: self.writer.write(" "+a_name+"=\"") self.write_data(amap[a_name]) self.writer.write("\"") self.writer.write(">") self.elem_level=self.elem_level+1 def endElement(self,name): self.writer.write("</"+name+">") self.elem_level=self.elem_level-1 def ignorableWhitespace(self,data,start_ix,length): self.characters(data,start_ix,length) def characters(self,data,start_ix,length): if self.elem_level>0: self.write_data(data[start_ix:start_ix+length]) def write_data(self,data): "Writes datachars to writer." data=data.replace("&","&") data=data.replace("<","<") data=data.replace("\"",""") data=data.replace(">",">") data=data.replace(chr(9),"	") data=data.replace(chr(10)," ") data=data.replace(chr(13)," ") self.writer.write(data) # --- mllib class mllib: """A re-implementation of the htmllib, sgmllib and xmllib interfaces as a SAX DocumentHandler.""" # Unsupported: # - setnomoretags # - setliteral # - translate_references # - handle_xml # - handle_doctype # - handle_charref # - handle_entityref # - handle_comment # - handle_cdata # - tag_attributes def __init__(self): self.reset() def reset(self): import saxexts # only used here self.parser=saxexts.XMLParserFactory.make_parser() self.handler=mllib.Handler(self.parser,self) self.handler.reset() def feed(self,data): self.parser.feed(data) def close(self): self.parser.close() def get_stack(self): return self.handler.get_stack() # --- Handler methods (to be overridden) def handle_starttag(self,name,method,atts): method(atts) def handle_endtag(self,name,method): method() def handle_data(self,data): pass def handle_proc(self,target,data): pass def unknown_starttag(self,name,atts): pass def unknown_endtag(self,name): pass def syntax_error(self,message): pass # --- The internal handler class class Handler(saxlib.DocumentHandler,saxlib.ErrorHandler): """An internal class to handle SAX events and translate them to mllib events.""" def __init__(self,driver,handler): self.driver=driver self.driver.setDocumentHandler(self) self.driver.setErrorHandler(self) self.handler=handler self.reset() def get_stack(self): return self.stack def reset(self): self.stack=[] # --- DocumentHandler methods def characters(self, ch, start, length): self.handler.handle_data(ch[start:start+length]) def endElement(self, name): if hasattr(self.handler,"end_"+name): self.handler.handle_endtag(name, getattr(self.handler,"end_"+name)) else: self.handler.unknown_endtag(name) del self.stack[-1] def ignorableWhitespace(self, ch, start, length): self.handler.handle_data(ch[start:start+length]) def processingInstruction(self, target, data): self.handler.handle_proc(target,data) def startElement(self, name, atts): self.stack.append(name) if hasattr(self.handler,"start_"+name): self.handler.handle_starttag(name, getattr(self.handler, "start_"+name), atts) else: self.handler.unknown_starttag(name,atts) # --- ErrorHandler methods def error(self, exception): self.handler.syntax_error(str(exception)) def fatalError(self, exception): raise RuntimeError(str(exception))
_______________________________________________ XML-SIG maillist - XML-SIG@python.org http://mail.python.org/mailman/listinfo/xml-sig