On Tuesday 08 February à 19:01, Mike Brown wrote:
> Sylvain Thénault wrote:
> > I guess you're right. I wrote this patch because it was fixing my
> > problem. Now if it doesn't take too much time to have every cases
> > correctly fixed by implementing RFC 3986, I may take some time to do so
> > or to help having it done. And if parts of the job is already done in
> > 4suite, that's great. However what's in 4suite, what's not and need to
> > be implemented is not yet clear to me.
> 
> The current version of Ft.Lib.Uri is here:
> http://cvs.4suite.org/viewcvs/4Suite/Ft/Lib/Uri.py?view=markup [1]
> 
> If you see "rfc2396bis" in the doc strings, you may safely interpret
> them to mean "RFC 3986".
> 
> 
> The functions that you should look at are the following:
> 
> MakeUrllibSafe(uriRef)
> ======================
> This exists in order to convert a proper URI reference into one that
> can be handled by urllib.urlopen(). It does the following:
> 1. If the reference contains an Internationalized Domain Name,
>    recodes it so that it is resolvable. (Py 2.3+ only)
> 2. Strips the fragment component, if any. 
> 3. Ensures that the reference is a byte string, not unicode.
> 4. On Windows, assumes that the first ':' appearing in the path
>    component is part of a drivespec, and converts it to '|'.
> 
> If you port this function, the reference to PercentDecode() may be replaced 
> with urllib.unquote(), but you must move the byte string check (#3, above) to 
> occur before calling unquote. The references to the functions SplitUriRef and 
> UnsplitUriRef can be replaced with urlsplit() and urlunsplit() from the 
> urlparse module.
> 
> 
> Absolutize(uriRef, baseUri)
> ===========================
> This does strict merging of a URI reference and a base URI. The base URI 
> *must* be absolute (must have a scheme). If you port this function, the
> UriException may be replaced with a ValueError, and SplitUriRef &
> UnsplitUriRef may be replaced with their urlparse equivalents, as
> mentioned above. The RemoveDotSegments function must also be ported and
> should be made semi-private because it is not for general use. I've
> implemented it using two segment stacks, as alluded to in the spec,
> rather than the explicit string-walking algorithm that would be too
> inefficient.
> 
> 
> BaseJoin(base, UriRef)
> ======================
> This does lenient merging of a base URI and a URI reference (note the
> argument order is different than that of Absolutize). It allows the base
> URI to be a relative reference. In such cases, we use a dummy scheme
> (we don't say "assume 'file:' because the spec says all schemes must be
> resolved the same), run it through Absolutize, and then remove the scheme
> from the result. If you port this function, you will need to port the
> IsAbsolute function, which just checks to see if the URI has a scheme.
> I prefer to use a regex for this, as it is fast and accurate (':' can
> appear in more than one place in a URI reference, so it is not safe to
> assume that its presence means there is a scheme).

thanks a lot. Actually almost all the work is already done right there. 
Here is what I've worked on. Once we'll reach a consensus, I'll add that
to pyxml. So I've joined to this mail:

- a light version of 4Suite Uri.py including the following functions:
  SplitUriRef, UnsplitUriRef (it was really less annoying to use those
  two functions than the equivalent urllib's ones), Absolutize,
  MakeUrllibSafe, _RemoveDotSegments, BaseJoin, GetScheme and
  IsAbsolute. With the presented solution, the 3 last ones are not used
  and could be removed, but I've kept them in for now. Every tests for
  Absolutize from 4suite are still passing.

- a modified version of saxutils, expecting the Uri module above to be
  in the _xmlplus directory (ie importable as xml.Uri). I've refactored
  prepare_input_source to ease testing of the URI merging stuff.

- a unittest file, which include some test cases for the URI merging
  function. Please take a look at the existant test cases to check
  everything looks fine to you. If you have other case to add, please let
  me know (or maybe can I add this file to the cvs first). Notice that
  to run the tests, you should have a "quotes.xml" file in the same
  directory as the test file (there is one in the test directory of
  pyxml). As a bonus, I've converted the escape function test from
  test_utils into a unittest in the same file.

Anyway, having SplitUriRef/UnsplitUriRef replacing 
urlparse.urlsplit/urlunsplit and Absolutize or BaseJoin replacing
urlparse.urljoin would definitly be the right thing.

-- 
Sylvain Thénault                               LOGILAB, Paris (France).

http://www.logilab.com   http://www.logilab.fr  http://www.logilab.org

import unittest
from os.path import dirname, abspath, join
from xml.sax.saxutils import escape, absolute_system_id

class EscapeTC(unittest.TestCase):

    def test(self):
        v1, v2 = escape('&<>'), '&amp;&lt;&gt;'
        self.assertEquals(v1, v2)
        v1, v2 = escape('foo&amp;bar'), 'foo&amp;amp;bar'
        self.assertEquals(v1, v2)
        v1, v2 = escape('< test > &', {'test': '&myentity;'}), '&lt; &myentity; &gt; &amp;'
        self.assertEquals(v1, v2)
        v1, v2 = escape('&\'"<>', {'"': '&quot;', "'": '&apos;'}), '&amp;&apos;&quot;&lt;&gt;'
        self.assertEquals(v1, v2)
        

TEST_DIR = abspath(dirname(__file__)) + '/'

class AbsoluteSystemIdTC(unittest.TestCase):

    def test_base(self):
        res = absolute_system_id('http://www.xml.com')
        self.assertEquals(res, 'http://www.xml.com')
        
        res = absolute_system_id('http://www.xml.com', 'http://whatever')
        self.assertEquals(res, 'http://www.xml.com')
        
        res = absolute_system_id('quotes.xml')
        self.assertEquals(res, 'file://%s' % join(TEST_DIR, 'quotes.xml'))


    def test_relative(self):
        # FIXME: empty authority // added by MakeUrlLibSafe (actually by
        # urlunsplit), which is probably acceptable since the sysid is designed
        # to be used by urlopen
        
        res = absolute_system_id('quotes.xml', 'file:%s' % TEST_DIR)
        self.assertEquals(res, 'file://%squotes.xml' % TEST_DIR)
        
        res = absolute_system_id('relative.xml', 'file:/base')
        self.assertEquals(res, 'file:///relative.xml')
        
        res = absolute_system_id('relative.xml', 'file:/base/')
        self.assertEquals(res, 'file:///base/relative.xml') 
        
        res = absolute_system_id('file:relative.xml', 'file:/base')
        self.assertEquals(res, 'file:///relative.xml')

        
    def test_no_base_scheme(self):
        # FIXME: warning ?
        self.assertRaises(ValueError, absolute_system_id, 'file:relative.xml', '/base')

if __name__ == '__main__':
    unittest.main()
# pylint: disable-msg=C0103
#
# backported code from 4Suite with slight modifications, started from r1.89 of
# Ft/Lib/Uri.py, by [EMAIL PROTECTED] on 2005-02-09
#
# part if not all of this code should probably move to urlparse (or be used
# to fix some existant functions in this module)
#
#
# Copyright 2004 Fourthought, Inc. (USA).
# Detailed license and copyright information: http://4suite.org/COPYRIGHT
# Project home, documentation, distributions: http://4suite.org/
import os.path
import sys
import re
import urlparse, urllib, urllib2

def UnsplitUriRef(uriRefSeq):
    """should replace urlparse.urlunsplit
    
    Given a sequence as would be produced by SplitUriRef(), assembles and
    returns a URI reference as a string.
    """
    if not (isinstance(uriRefSeq, tuple) or
            isinstance(uriRefSeq, list)):
        raise TypeError("sequence expected, got %s" % type(uriRefSeq))
    #print 'unsplit', uriRefSeq
    (scheme, authority, path, query, fragment) = uriRefSeq
    uri = ''
    if scheme is not None:
        uri += scheme + ':'
    if authority is not None:
        uri += '//' + authority
    uri += path
    if query is not None:
        uri += '?' + query
    if fragment is not None:
        uri += '#' + fragment
    return uri

SPLIT_URI_REF_PATTERN = re.compile(r"^(?:(?P<scheme>[^:/?#]+):)?(?://(?P<authority>[^/?#]*))?(?P<path>[^?#]*)(?:\?(?P<query>[^#]*))?(?:#(?P<fragment>.*))?$")

def SplitUriRef(uriref):
    """should replace urlparse.urlsplit
    
    Given a valid URI reference as a string, returns a tuple representing the
    generic URI components, as per RFC 2396 appendix B. The tuple's structure
    is (scheme, authority, path, query, fragment).

    All values will be strings (possibly empty) or None if undefined.

    Note that per rfc2396bis, there is no distinction between a path and
    an "opaque part", as there was in RFC 2396.
    """
    # the pattern will match every possible string, so it's safe to
    # assume there's a groupdict method to call.
    g = SPLIT_URI_REF_PATTERN.match(uriref).groupdict()
    scheme      = g['scheme']
    authority   = g['authority']
    path        = g['path']
    query       = g['query']
    fragment    = g['fragment']
    return (scheme, authority, path, query, fragment)


def Absolutize(uriRef, baseUri):
    """
    Resolves a URI reference to absolute form, effecting the result of RFC
    2396bis section 5. The URI reference is considered to be relative to the
    given base URI.

    It is the caller's responsibility to ensure that the base URI matches
    the absolute-URI syntax rule of rfc2396bis, and that its path component
    does not contain '.' or '..' segments if the scheme is hierarchical.
    Unexpected results may occur otherwise.

    This function only conducts a minimal sanity check in order to determine
    if relative resolution is possible: it raises a ValueError if the base
    URI does not have a scheme component. While it is true that the base URI
    is irrelevant if the URI reference has a scheme, an exception is raised
    in order to signal that the given string does not even come close to
    meeting the criteria to be usable as a base URI.

    It is the caller's responsibility to make a determination of whether the
    URI reference constitutes a "same-document reference", as defined in RFC
    2396 or rfc2396bis. As per the spec, dereferencing a same-document
    reference "should not" involve retrieval of a new representation of the
    referenced resource. Note that the two specs have different definitions
    of same-document reference: RFC 2396 says it is *only* the cases where the
    reference is the empty string, or "#" followed by a fragment; rfc2396bis
    "emacspymodestink
    requires making a comparison of the base URI to the absolute form of the
    reference (as is returned by the spec), minus its fragment component,
    if any.

    This function is similar to urlparse.urljoin() and urllib.basejoin().
    Those functions, however, are (as of Python 2.3) outdated, buggy, and/or
    designed to produce results acceptable for use with other core Python
    libraries, rather than being earnest implementations of the relevant
    specs. Their problems are most noticeable in their handling of
    same-document references and 'file:' URIs, both being situations that
    come up far too often to consider the functions reliable enough for
    general use.
    """
    # Reasons to avoid using urllib.basejoin() and urlparse.urljoin():
    # - Both are partial implementations of long-obsolete specs.
    # - Both accept relative URLs as the base, which no spec allows.
    # - urllib.basejoin() mishandles the '' and '..' references.
    # - If the base URL uses a non-hierarchical or relative path,
    #    or if the URL scheme is unrecognized, the result is not
    #    always as expected (partly due to issues in RFC 1808).
    # - If the authority component of a 'file' URI is empty,
    #    the authority component is removed altogether. If it was
    #    not present, an empty authority component is in the result.
    # - '.' and '..' segments are not always collapsed as well as they
    #    should be (partly due to issues in RFC 1808).
    # - Effective Python 2.4, urllib.basejoin() *is* urlparse.urljoin(),
    #    but urlparse.urljoin() is still based on RFC 1808.

    #print 'absolutize', baseUri, uriRef

    # This procedure is based on the pseudocode in rfc2396bis sec. 5.2.
    #
    # ensure base URI is absolute
    if not baseUri:
        raise ValueError('baseUri is required and must be a non empty string')
    if not IsAbsolute(baseUri):
        raise ValueError('%r is not an absolute URI' % baseUri)
    # shortcut for the simplest same-document reference cases
    if uriRef == '' or uriRef[0] == '#':
        return baseUri.split('#')[0] + uriRef
    # ensure a clean slate
    tScheme = tAuth = tPath = tQuery = None
    # parse the reference into its components
    (rScheme, rAuth, rPath, rQuery, rFrag) = SplitUriRef(uriRef)
    # if the reference is absolute, eliminate '.' and '..' path segments
    # and skip to the end
    if rScheme is not None:
        tScheme = rScheme
        tAuth = rAuth
        tPath = _RemoveDotSegments(rPath)
        tQuery = rQuery
    else:
        # the base URI's scheme, and possibly more, will be inherited
        (bScheme, bAuth, bPath, bQuery, bFrag) = SplitUriRef(baseUri)
        # if the reference is a net-path, just eliminate '.' and '..' path
        # segments; no other changes needed.
        if rAuth is not None:
            tAuth = rAuth
            tPath = _RemoveDotSegments(rPath)
            tQuery = rQuery
        # if it's not a net-path, we need to inherit pieces of the base URI
        else:
            # use base URI's path if the reference's path is empty
            if not rPath:
                tPath = bPath
                # use the reference's query, if any, or else the base URI's,
                tQuery = rQuery and rQuery or bQuery
            # the reference's path is not empty
            else:
                # just use the reference's path if it's absolute
                if rPath[0] == '/':
                    tPath = _RemoveDotSegments(rPath)
                # merge the reference's relative path with the base URI's path
                else:
                    if bAuth is not None and not bPath:
                        tPath = '/' + rPath
                    else:
                        tPath = bPath[:bPath.rfind('/')+1] + rPath
                    tPath = _RemoveDotSegments(tPath)
                # use the reference's query
                tQuery = rQuery
            # since the reference isn't a net-path,
            # use the authority from the base URI
            tAuth = bAuth
        # inherit the scheme from the base URI
        tScheme = bScheme
    # always use the reference's fragment (but no need to define another var)
    #tFrag = rFrag

    # now compose the target URI (rfc2396bis sec. 5.3)
    return UnsplitUriRef((tScheme, tAuth, tPath, tQuery, rFrag))


REG_NAME_HOST_PATTERN = re.compile(r"^(?:(?:[0-9A-Za-z\-_\.!~*'();&=+$,]|(?:%[0-9A-Fa-f]{2}))*)$")

def MakeUrllibSafe(uriRef):
    """
    Makes the given rfc2396bis-conformant URI reference safe for passing
    to legacy urllib functions. The result may not be a valid URI.

    As of Python 2.3.3, urllib.urlopen() does not fully support
    internationalized domain names, it does not strip fragment components,
    and on Windows, it expects file URIs to use '|' instead of ':' in the
    path component corresponding to the drivespec. It also relies on
    urllib.unquote(), which mishandles unicode arguments. This function
    produces a URI reference that will work around these issues, although
    the IDN workaround is limited to Python 2.3 only. May raise a
    UnicodeEncodeError if the URI reference is Unicode and erroneously
    contains non-ASCII characters.
    """
    # IDN support requires decoding any percent-encoded octets in the
    # host part (if it's a reg-name) of the authority component, and when
    # doing DNS lookups, applying IDNA encoding to that string first.
    # As of Python 2.3, there is an IDNA codec, and the socket and httplib
    # modules accept Unicode strings and apply IDNA encoding automatically
    # where necessary. However, urllib.urlopen() has not yet been updated
    # to do the same; it raises an exception if you give it a Unicode
    # string, and does no conversion on non-Unicode strings, meaning you
    # have to give it an IDNA string yourself. We will only support it on
    # Python 2.3 and up.
    #
    # see if host is a reg-name, as opposed to IPv4 or IPv6 addr.
    if isinstance(uriRef, unicode):
        uriRef = uriRef.encode('us-ascii') # parts of urllib are not unicode safe
    (scheme, auth, path, query, frag) = urlparse.urlsplit(uriRef)
    if auth and auth.find('@') > -1:
        userinfo, hostport = auth.split('@')
    else:
        userinfo = None
        hostport = auth
    if hostport and hostport.find(':') > -1:
        host, port = hostport.split(':')
    else:
        host = hostport
        port = None
    if host and REG_NAME_HOST_PATTERN.match(host):
        # percent-encoded hostnames will always fail DNS lookups
        host = urllib.unquote(host) #PercentDecode(host)
        # IDNA-encode if possible.
        # We shouldn't do this for schemes that don't need DNS lookup,
        # but are there any (that you'd be calling urlopen for)?
        if sys.version_info[0:2] >= (2, 3):
            if isinstance(host, str):
                host = host.decode('utf-8')
            host = host.encode('idna')
        # reassemble the authority with the new hostname
        # (percent-decoded, and possibly IDNA-encoded)
        auth = ''
        if userinfo:
            auth += userinfo + '@'
        auth += host
        if port:
            auth += ':' + port

    # On Windows, ensure that '|', not ':', is used in a drivespec.
    if os.name == 'nt' and scheme == 'file':
        path = path.replace(':', '|', 1)

    # Note that we drop fragment, if any. See rfc2396bis sec. 3.5.
    uri = urlparse.urlunsplit((scheme, auth, path, query, None))

    return uri



def BaseJoin(base, uriRef):
    """
    Merges a base URI reference with another URI reference, returning a
    new URI reference.

    It behaves exactly the same as Absolutize(), except the arguments
    are reversed, and it accepts any URI reference (even a relative URI)
    as the base URI. If the base has no scheme component, it is
    evaluated as if it did, and then the scheme component of the result
    is removed from the result, unless the uriRef had a scheme. Thus, if
    neither argument has a scheme component, the result won't have one.

    This function is named BaseJoin because it is very much like
    urllib.basejoin(), but it follows the current rfc2396bis algorithms
    for path merging, dot segment elimination, and inheritance of query
    and fragment components.

    WARNING: This function exists for 2 reasons: (1) because of a need
    within the 4Suite repository to perform URI reference absolutization
    using base URIs that are stored (inappropriately) as absolute paths
    in the subjects of statements in the RDF model, and (2) because of
    a similar need to interpret relative repo paths in a 4Suite product
    setup.xml file as being relative to a path that can be set outside
    the document. When these needs go away, this function probably will,
    too, so it is not advisable to use it.
    """
    if IsAbsolute(base):
        return Absolutize(uriRef, base)
    else:
        dummyscheme = 'basejoin'
        res = Absolutize(uriRef, '%s:%s' % (dummyscheme, base))
        if IsAbsolute(uriRef):
            # scheme will be inherited from uriRef
            return res
        else:
            # no scheme in, no scheme out
            return res[len(dummyscheme)+1:]


def _RemoveDotSegments(path):
    """
    Supports Absolutize() by implementing the remove_dot_segments function
    described in rfc2396bis sec. 5.2.  It collapses most of the '.' and '..'
    segments out of a path without eliminating empty segments. It is intended
    to be used during the path merging process and may not give expected
    results when used independently. Use NormalizePathSegments() or
    NormalizePathSegmentsInUri() if more general normalization is desired.

    semi-private because it is not for general use. I've implemented it
    using two segment stacks, as alluded to in the spec, rather than the
    explicit string-walking algorithm that would be too inefficient. (mbrown)
    """
    # return empty string if entire path is just "." or ".."
    if path == '.' or path == '..':
        return path[0:0] # preserves string type
    # remove all "./" or "../" segments at the beginning
    while path:
        if path[:2] == './':
            path = path[2:]
        elif path[:3] == '../':
            path = path[3:]
        else:
            break
    # We need to keep track of whether there was a leading slash,
    # because we're going to drop it in order to prevent our list of
    # segments from having an ambiguous empty first item when we call
    # split().
    leading_slash = 0
    if path[:1] == '/':
        path = path[1:]
        leading_slash = 1
    # replace a trailing "/." with just "/"
    if path[-2:] == '/.':
        path = path[:-1]
    # convert the segments into a list and process each segment in
    # order from left to right.
    segments = path.split('/')
    keepers = []
    segments.reverse()
    while segments:
        seg = segments.pop()
        # '..' means drop the previous kept segment, if any.
        # If none, and if the path is relative, then keep the '..'.
        # If the '..' was the last segment, ensure
        # that the result ends with '/'.
        if seg == '..':
            if keepers:
                keepers.pop()
            elif not leading_slash:
                keepers.append(seg)
            if not segments:
                keepers.append('')
        # ignore '.' segments and keep all others, even empty ones
        elif seg != '.':
            keepers.append(seg)
    # reassemble the kept segments
    return leading_slash * '/' + '/'.join(keepers)


SCHEME_PATTERN = re.compile(r'([a-zA-Z][a-zA-Z0-9+\-.]*):')
def GetScheme(uriRef):
    """
    Obtains, with optimum efficiency, just the scheme from a URI reference.
    Returns a string, or if no scheme could be found, returns None.
    """
    # Using a regex seems to be the best option. Called 50,000 times on
    # different URIs, on a 1.0-GHz PIII with FreeBSD 4.7 and Python
    # 2.2.1, this method completed in 0.95s, and 0.05s if there was no
    # scheme to find. By comparison,
    #   urllib.splittype()[0] took 1.5s always;
    #   Ft.Lib.Uri.SplitUriRef()[0] took 2.5s always;
    #   urlparse.urlparse()[0] took 3.5s always.
    m = SCHEME_PATTERN.match(uriRef)
    if m is None:
        return None
    else:
        return m.group(1)


def IsAbsolute(identifier):
    """
    Given a string believed to be a URI or URI reference, tests that it is
    absolute (as per RFC 2396), not relative -- i.e., that it has a scheme.
    """
    # We do it this way to avoid compiling another massive regex.
    return GetScheme(identifier) is not None
"""
A library of useful helper classes to the saxlib classes, for the
convenience of application and driver writers.

$Id: saxutils.py,v 1.35 2004/03/20 07:46:04 fdrake Exp $
"""

import os, urlparse, urllib, urllib2, types
import handler
import xmlreader
import sys, _exceptions, saxlib

try:
    _StringTypes = [types.StringType, types.UnicodeType]
except AttributeError: # 1.5 compatibility:UnicodeType not defined
    _StringTypes = [types.StringType]

def __dict_replace(s, d):
    """Replace substrings of a string using a dictionary."""
    for key, value in d.items():
        s = s.replace(key, value)
    return s

def escape(data, entities={}):
    """Escape &, <, and > in a string of data.

    You can escape other strings of data by passing a dictionary as
    the optional entities parameter.  The keys and values must all be
    strings; each key will be replaced with its corresponding value.
    """
    data = data.replace("&", "&amp;")
    data = data.replace("<", "&lt;")
    data = data.replace(">", "&gt;")
    if entities:
        data = __dict_replace(data, entities)
    return data

def unescape(data, entities={}):
    """Unescape &amp;, &lt;, and &gt; in a string of data.

    You can unescape other strings of data by passing a dictionary as
    the optional entities parameter.  The keys and values must all be
    strings; each key will be replaced with its corresponding value.
    """
    data = data.replace("&lt;", "<")
    data = data.replace("&gt;", ">")
    if entities:
        data = __dict_replace(data, entities)
    # must do ampersand last
    return data.replace("&amp;", "&")

def quoteattr(data, entities={}):
    """Escape and quote an attribute value.

    Escape &, <, and > in a string of data, then quote it for use as
    an attribute value.  The \" character will be escaped as well, if
    necessary.

    You can escape other strings of data by passing a dictionary as
    the optional entities parameter.  The keys and values must all be
    strings; each key will be replaced with its corresponding value.
    """
    data = escape(data, entities)
    if '"' in data:
        if "'" in data:
            data = '"%s"' % data.replace('"', "&quot;")
        else:
            data = "'%s'" % data
    else:
        data = '"%s"' % data
    return data

# --- DefaultHandler

class DefaultHandler(handler.EntityResolver, handler.DTDHandler,
                     handler.ContentHandler, handler.ErrorHandler):
    """Default base class for SAX2 event handlers. Implements empty
    methods for all callback methods, which can be overridden by
    application implementors. Replaces the deprecated SAX1 HandlerBase
    class."""

# --- Location

class Location:
    """Represents a location in an XML entity. Initialized by being passed
    a locator, from which it reads off the current location, which is then
    stored internally."""

    def __init__(self, locator):
        self.__col = locator.getColumnNumber()
        self.__line = locator.getLineNumber()
        self.__pubid = locator.getPublicId()
        self.__sysid = locator.getSystemId()

    def getColumnNumber(self):
        return self.__col

    def getLineNumber(self):
        return self.__line

    def getPublicId(self):
        return self.__pubid

    def getSystemId(self):
        return self.__sysid

    def __str__(self):
        if self.__line is None:
            line = "?"
        else:
            line = self.__line
        if self.__col is None:
            col = "?"
        else:
            col = self.__col
        return "%s:%s:%s" % (
            self.__sysid or self.__pubid or "<unknown>",
            line, col)

# --- ErrorPrinter

class ErrorPrinter:
    "A simple class that just prints error messages to standard out."

    def __init__(self, level=0, outfile=sys.stderr):
        self._level = level
        self._outfile = outfile

    def warning(self, exception):
        if self._level <= 0:
            self._outfile.write("WARNING in %s: %s\n" %
                               (self.__getpos(exception),
                                exception.getMessage()))

    def error(self, exception):
        if self._level <= 1:
            self._outfile.write("ERROR in %s: %s\n" %
                               (self.__getpos(exception),
                                exception.getMessage()))

    def fatalError(self, exception):
        if self._level <= 2:
            self._outfile.write("FATAL ERROR in %s: %s\n" %
                               (self.__getpos(exception),
                                exception.getMessage()))

    def __getpos(self, exception):
        if isinstance(exception, _exceptions.SAXParseException):
            return "%s:%s:%s" % (exception.getSystemId(),
                                 exception.getLineNumber(),
                                 exception.getColumnNumber())
        else:
            return "<unknown>"

# --- ErrorRaiser

class ErrorRaiser:
    "A simple class that just raises the exceptions it is passed."

    def __init__(self, level = 0):
        self._level = level

    def error(self, exception):
        if self._level <= 1:
            raise exception

    def fatalError(self, exception):
        if self._level <= 2:
            raise exception

    def warning(self, exception):
        if self._level <= 0:
            raise exception

# --- AttributesImpl now lives in xmlreader
from xmlreader import AttributesImpl

# --- XMLGenerator is the SAX2 ContentHandler for writing back XML
import codecs

def _outputwrapper(stream,encoding):
    writerclass = codecs.lookup(encoding)[3]
    return writerclass(stream)

if hasattr(codecs, "register_error"):
    def writetext(stream, text, entities={}):
        stream.errors = "xmlcharrefreplace"
        stream.write(escape(text, entities))
        stream.errors = "strict"
else:
    def writetext(stream, text, entities={}):
        text = escape(text, entities)
        try:
            stream.write(text)
        except UnicodeError:
            for c in text:
                try:
                    stream.write(c)
                except UnicodeError:
                    stream.write(u"&#%d;" % ord(c))

def writeattr(stream, text):
    countdouble = text.count('"')
    if countdouble:
        countsingle = text.count("'")
        if countdouble <= countsingle:
            entities = {'"': "&quot;"}
            quote = '"'
        else:
            entities = {"'": "&apos;"}
            quote = "'"
    else:
        entities = {}
        quote = '"'
    stream.write(quote)
    writetext(stream, text, entities)
    stream.write(quote)


class XMLGenerator(handler.ContentHandler):
    GENERATED_PREFIX = "xml.sax.saxutils.prefix%s"

    def __init__(self, out=None, encoding="iso-8859-1"):
        if out is None:
            import sys
            out = sys.stdout
        handler.ContentHandler.__init__(self)
        self._out = _outputwrapper(out,encoding)
        self._ns_contexts = [{}] # contains uri -> prefix dicts
        self._current_context = self._ns_contexts[-1]
        self._undeclared_ns_maps = []
        self._encoding = encoding
        self._generated_prefix_ctr = 0
        return

    # ContentHandler methods

    def startDocument(self):
        self._out.write('<?xml version="1.0" encoding="%s"?>\n' %
                        self._encoding)

    def startPrefixMapping(self, prefix, uri):
        self._ns_contexts.append(self._current_context.copy())
        self._current_context[uri] = prefix
        self._undeclared_ns_maps.append((prefix, uri))

    def endPrefixMapping(self, prefix):
        self._current_context = self._ns_contexts[-1]
        del self._ns_contexts[-1]

    def startElement(self, name, attrs):
        self._out.write('<' + name)
        for (name, value) in attrs.items():
            self._out.write(' %s=' % name)
            writeattr(self._out, value)
        self._out.write('>')

    def endElement(self, name):
        self._out.write('</%s>' % name)

    def startElementNS(self, name, qname, attrs):
        if name[0] is None:
            name = name[1]
        elif self._current_context[name[0]] is None:
            # default namespace
            name = name[1]
        else:
            name = self._current_context[name[0]] + ":" + name[1]
        self._out.write('<' + name)

        for k,v in self._undeclared_ns_maps:
            if k is None:
                self._out.write(' xmlns="%s"' % (v or ''))
            else:
                self._out.write(' xmlns:%s="%s"' % (k,v))
        self._undeclared_ns_maps = []

        for (name, value) in attrs.items():
            if name[0] is None:
                name = name[1]
            elif self._current_context[name[0]] is None:
                # default namespace
                #If an attribute has a nsuri but not a prefix, we must
                #create a prefix and add a nsdecl
                prefix = self.GENERATED_PREFIX % self._generated_prefix_ctr
                self._generated_prefix_ctr = self._generated_prefix_ctr + 1
                name = prefix + ':' + name[1]
                self._out.write(' xmlns:%s=%s' % (prefix, quoteattr(name[0])))
                self._current_context[name[0]] = prefix
            else:
                name = self._current_context[name[0]] + ":" + name[1]
            self._out.write(' %s=' % name)
            writeattr(self._out, value)
        self._out.write('>')

    def endElementNS(self, name, qname):
        # XXX: if qname is not None, we better use it.
        # Python 2.0b2 requires us to use the recorded prefix for
        # name[0], though
        if name[0] is None:
            qname = name[1]
        elif self._current_context[name[0]] is None:
            qname = name[1]
        else:
            qname = self._current_context[name[0]] + ":" + name[1]
        self._out.write('</%s>' % qname)

    def characters(self, content):
        writetext(self._out, content)

    def ignorableWhitespace(self, content):
        self._out.write(content)

    def processingInstruction(self, target, data):
        self._out.write('<?%s %s?>' % (target, data))


class LexicalXMLGenerator(XMLGenerator, saxlib.LexicalHandler):
    """A XMLGenerator that also supports the LexicalHandler interface"""

    def __init__(self, out=None, encoding="iso-8859-1"):
        XMLGenerator.__init__(self, out, encoding)
        self._in_cdata = 0

    def characters(self, content):
        if self._in_cdata:
            self._out.write(content.replace(']]>', ']]>]]&gt;<![CDATA['))
        else:
            self._out.write(escape(content))

    # LexicalHandler methods
    # (we only support the most important ones and inherit the rest)

    def startDTD(self, name, public_id, system_id):
        self._out.write('<!DOCTYPE %s' % name)
        if public_id:
            self._out.write(' PUBLIC %s %s' % (
                quoteattr(public_id or ""), quoteattr(system_id or "")
            ))
        elif system_id:
            self._out.write(' SYSTEM %s' % quoteattr(system_id or ""))

    def endDTD(self):
        self._out.write('>')

    def comment(self, content):
        self._out.write('<!--')
        self._out.write(content)
        self._out.write('-->')

    def startCDATA(self):
        self._in_cdata = 1
        self._out.write('<![CDATA[')

    def endCDATA(self):
        self._in_cdata = 0
        self._out.write(']]>')


# --- ContentGenerator is the SAX1 DocumentHandler for writing back XML
class ContentGenerator(XMLGenerator):

    def characters(self, str, start, end):
        # In SAX1, characters receives start and end; in SAX2, it receives
        # a string. For plain strings, we may want to use a buffer object.
        return XMLGenerator.characters(self, str[start:start+end])

# --- XMLFilterImpl
class XMLFilterBase(saxlib.XMLFilter):
    """This class is designed to sit between an XMLReader and the
    client application's event handlers.  By default, it does nothing
    but pass requests up to the reader and events on to the handlers
    unmodified, but subclasses can override specific methods to modify
    the event stream or the configuration requests as they pass
    through."""

    # ErrorHandler methods

    def error(self, exception):
        self._err_handler.error(exception)

    def fatalError(self, exception):
        self._err_handler.fatalError(exception)

    def warning(self, exception):
        self._err_handler.warning(exception)

    # ContentHandler methods

    def setDocumentLocator(self, locator):
        self._cont_handler.setDocumentLocator(locator)

    def startDocument(self):
        self._cont_handler.startDocument()

    def endDocument(self):
        self._cont_handler.endDocument()

    def startPrefixMapping(self, prefix, uri):
        self._cont_handler.startPrefixMapping(prefix, uri)

    def endPrefixMapping(self, prefix):
        self._cont_handler.endPrefixMapping(prefix)

    def startElement(self, name, attrs):
        self._cont_handler.startElement(name, attrs)

    def endElement(self, name):
        self._cont_handler.endElement(name)

    def startElementNS(self, name, qname, attrs):
        self._cont_handler.startElementNS(name, qname, attrs)

    def endElementNS(self, name, qname):
        self._cont_handler.endElementNS(name, qname)

    def characters(self, content):
        self._cont_handler.characters(content)

    def ignorableWhitespace(self, chars):
        self._cont_handler.ignorableWhitespace(chars)

    def processingInstruction(self, target, data):
        self._cont_handler.processingInstruction(target, data)

    def skippedEntity(self, name):
        self._cont_handler.skippedEntity(name)

    # DTDHandler methods

    def notationDecl(self, name, publicId, systemId):
        self._dtd_handler.notationDecl(name, publicId, systemId)

    def unparsedEntityDecl(self, name, publicId, systemId, ndata):
        self._dtd_handler.unparsedEntityDecl(name, publicId, systemId, ndata)

    # EntityResolver methods

    def resolveEntity(self, publicId, systemId):
        self._ent_handler.resolveEntity(publicId, systemId)

    # XMLReader methods

    def parse(self, source):
        self._parent.setContentHandler(self)
        self._parent.setErrorHandler(self)
        self._parent.setEntityResolver(self)
        self._parent.setDTDHandler(self)
        self._parent.parse(source)

    def setLocale(self, locale):
        self._parent.setLocale(locale)

    def getFeature(self, name):
        return self._parent.getFeature(name)

    def setFeature(self, name, state):
        self._parent.setFeature(name, state)

    def getProperty(self, name):
        return self._parent.getProperty(name)

    def setProperty(self, name, value):
        self._parent.setProperty(name, value)

# FIXME: remove this backward compatibility hack when not needed anymore
XMLFilterImpl = XMLFilterBase

# --- BaseIncrementalParser

class BaseIncrementalParser(xmlreader.IncrementalParser):
    """This class implements the parse method of the XMLReader
    interface using the feed, close and reset methods of the
    IncrementalParser interface as a convenience to SAX 2.0 driver
    writers."""

    def parse(self, source):
        source = prepare_input_source(source)
        self.prepareParser(source)

        self._cont_handler.startDocument()

        # FIXME: what about char-stream?
        inf = source.getByteStream()
        buffer = inf.read(16384)
        while buffer != "":
            self.feed(buffer)
            buffer = inf.read(16384)

        self.close()
        self.reset()

        self._cont_handler.endDocument()

    def prepareParser(self, source):
        """This method is called by the parse implementation to allow
        the SAX 2.0 driver to prepare itself for parsing."""
        raise NotImplementedError("prepareParser must be overridden!")

# --- Utility functions

def prepare_input_source(source, base = ""):
    """This function takes an InputSource and an optional base URL and
    returns a fully resolved InputSource object ready for reading."""

    if type(source) in _StringTypes:
        source = xmlreader.InputSource(source)
    elif hasattr(source, "read"):
        f = source
        source = xmlreader.InputSource()
        source.setByteStream(f)
        if hasattr(f, "name"):
            source.setSystemId(f.name)
    if source.getByteStream() is None:
        sysid = absolute_system_id(source.getSystemId(), base)
        source.setSystemId(sysid)
        f = urllib2.urlopen(sysid)
        source.setByteStream(f)
    return source

from xml.Uri import Absolutize, MakeUrllibSafe,IsAbsolute

def absolute_system_id(sysid, base=''):
    # if a base is given, sysid may be relative to it, make the
    # join before isfile() test
    if base:
        basehead = os.path.split(os.path.abspath(base))[0]
        path = os.path.join(basehead, sysid)
    else:
        path = os.path.abspath(sysid)
    if os.path.isfile(path):
        sysid = 'file:%s' % path
    elif base:
        sysid = Absolutize(sysid, base)
    #assert IsAbsolute(sysid)
    return MakeUrllibSafe(sysid)
    
# ===========================================================================
#
# DEPRECATED SAX 1.0 CLASSES
#
# ===========================================================================

# --- AttributeMap

class AttributeMap:
    """An implementation of AttributeList that takes an (attr,val) hash
    and uses it to implement the AttributeList interface."""

    def __init__(self, map):
        self.map=map

    def getLength(self):
        return len(self.map.keys())

    def getName(self, i):
        try:
            return self.map.keys()[i]
        except IndexError,e:
            return None

    def getType(self, i):
        return "CDATA"

    def getValue(self, i):
        try:
            if type(i)==types.IntType:
                return self.map[self.getName(i)]
            else:
                return self.map[i]
        except KeyError,e:
            return None

    def __len__(self):
        return len(self.map)

    def __getitem__(self, key):
        if type(key)==types.IntType:
            return self.map.keys()[key]
        else:
            return self.map[key]

    def items(self):
        return self.map.items()

    def keys(self):
        return self.map.keys()

    def has_key(self,key):
        return self.map.has_key(key)

    def get(self, key, alternative=None):
        return self.map.get(key, alternative)

    def copy(self):
        return AttributeMap(self.map.copy())

    def values(self):
        return self.map.values()

# --- Event broadcasting object

class EventBroadcaster:
    """Takes a list of objects and forwards any method calls received
    to all objects in the list. The attribute list holds the list and
    can freely be modified by clients."""

    class Event:
        "Helper objects that represent event methods."

        def __init__(self,list,name):
            self.list=list
            self.name=name

        def __call__(self,*rest):
            for obj in self.list:
                apply(getattr(obj,self.name), rest)

    def __init__(self,list):
        self.list=list

    def __getattr__(self,name):
        return self.Event(self.list,name)

    def __repr__(self):
        return "<EventBroadcaster instance at %d>" % id(self)

# --- ESIS document handler
import saxlib
class ESISDocHandler(saxlib.HandlerBase):
    "A SAX document handler that produces naive ESIS output."

    def __init__(self,writer=sys.stdout):
        self.writer=writer

    def processingInstruction (self,target, remainder):
        """Receive an event signalling that a processing instruction
        has been found."""
        self.writer.write("?"+target+" "+remainder+"\n")

    def startElement(self,name,amap):
        "Receive an event signalling the start of an element."
        self.writer.write("("+name+"\n")
        for a_name in amap.keys():
            self.writer.write("A"+a_name+" "+amap[a_name]+"\n")

    def endElement(self,name):
        "Receive an event signalling the end of an element."
        self.writer.write(")"+name+"\n")

    def characters(self,data,start_ix,length):
        "Receive an event signalling that character data has been found."
        self.writer.write("-"+data[start_ix:start_ix+length]+"\n")

# --- XML canonizer

class Canonizer(saxlib.HandlerBase):
    "A SAX document handler that produces canonized XML output."

    def __init__(self,writer=sys.stdout):
        self.elem_level=0
        self.writer=writer

    def processingInstruction (self,target, remainder):
        if not target=="xml":
            self.writer.write("<?"+target+" "+remainder+"?>")

    def startElement(self,name,amap):
        self.writer.write("<"+name)

        a_names=amap.keys()
        a_names.sort()

        for a_name in a_names:
            self.writer.write(" "+a_name+"=\"")
            self.write_data(amap[a_name])
            self.writer.write("\"")
        self.writer.write(">")
        self.elem_level=self.elem_level+1

    def endElement(self,name):
        self.writer.write("</"+name+">")
        self.elem_level=self.elem_level-1

    def ignorableWhitespace(self,data,start_ix,length):
        self.characters(data,start_ix,length)

    def characters(self,data,start_ix,length):
        if self.elem_level>0:
            self.write_data(data[start_ix:start_ix+length])

    def write_data(self,data):
        "Writes datachars to writer."
        data=data.replace("&","&amp;")
        data=data.replace("<","&lt;")
        data=data.replace("\"","&quot;")
        data=data.replace(">","&gt;")
        data=data.replace(chr(9),"&#9;")
        data=data.replace(chr(10),"&#10;")
        data=data.replace(chr(13),"&#13;")
        self.writer.write(data)

# --- mllib

class mllib:
    """A re-implementation of the htmllib, sgmllib and xmllib interfaces as a
    SAX DocumentHandler."""

# Unsupported:
# - setnomoretags
# - setliteral
# - translate_references
# - handle_xml
# - handle_doctype
# - handle_charref
# - handle_entityref
# - handle_comment
# - handle_cdata
# - tag_attributes

    def __init__(self):
        self.reset()

    def reset(self):
        import saxexts # only used here
        self.parser=saxexts.XMLParserFactory.make_parser()
        self.handler=mllib.Handler(self.parser,self)
        self.handler.reset()

    def feed(self,data):
        self.parser.feed(data)

    def close(self):
        self.parser.close()

    def get_stack(self):
        return self.handler.get_stack()

    # --- Handler methods (to be overridden)

    def handle_starttag(self,name,method,atts):
        method(atts)

    def handle_endtag(self,name,method):
        method()

    def handle_data(self,data):
        pass

    def handle_proc(self,target,data):
        pass

    def unknown_starttag(self,name,atts):
        pass

    def unknown_endtag(self,name):
        pass

    def syntax_error(self,message):
        pass

    # --- The internal handler class

    class Handler(saxlib.DocumentHandler,saxlib.ErrorHandler):
        """An internal class to handle SAX events and translate them to mllib
        events."""

        def __init__(self,driver,handler):
            self.driver=driver
            self.driver.setDocumentHandler(self)
            self.driver.setErrorHandler(self)
            self.handler=handler
            self.reset()

        def get_stack(self):
            return self.stack

        def reset(self):
            self.stack=[]

        # --- DocumentHandler methods

        def characters(self, ch, start, length):
            self.handler.handle_data(ch[start:start+length])

        def endElement(self, name):
            if hasattr(self.handler,"end_"+name):
                self.handler.handle_endtag(name,
                                          getattr(self.handler,"end_"+name))
            else:
                self.handler.unknown_endtag(name)

            del self.stack[-1]

        def ignorableWhitespace(self, ch, start, length):
            self.handler.handle_data(ch[start:start+length])

        def processingInstruction(self, target, data):
            self.handler.handle_proc(target,data)

        def startElement(self, name, atts):
            self.stack.append(name)

            if hasattr(self.handler,"start_"+name):
                self.handler.handle_starttag(name,
                                            getattr(self.handler,
                                                    "start_"+name),
                                             atts)
            else:
                self.handler.unknown_starttag(name,atts)

        # --- ErrorHandler methods

        def error(self, exception):
            self.handler.syntax_error(str(exception))

        def fatalError(self, exception):
            raise RuntimeError(str(exception))
_______________________________________________
XML-SIG maillist  -  XML-SIG@python.org
http://mail.python.org/mailman/listinfo/xml-sig

Reply via email to