Hello,

   I was looking at the codec.py script and decided to write a unt test script 
fot it. I wrote 41 tests of which 9 failed. The ones that failed were due to:

  (1) No range testing for unsinged integers. e.g. an unsigned octet can take 
values from 0 through 255. Hence if we try and encode 256 or -1, the script 
should raise an exception.This does not happen with octets, short, long and 
long long data types. It fails silently.
  
  (2) In the Field Table data type, the name should have a maximum length of 
128. Currently it accepts length greater than this as well. Also, Field names 
can begin only with letters, $ or #. Currently it accepts a field name with a 
digit as well.

  Attached is a unit test script for codec.py. It is written in the same style 
as the existing test harness and can be run independandly or as part of the 
run-tests script. Refer the __doc__ string in the script for details of how to 
run it.


  I'm not sure of what the protocol is for this project. You can review it and 
use it/check in etc. If you think these are bugs that need rectification, I can 
volunteer to fix them.

Thanks
Jimmy
 




       
____________________________________________________________________________________Get
 the Yahoo! toolbar and be alerted to new email wherever you're surfing.
http://new.toolbar.yahoo.com/toolbar/features/mail/index.php
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import unittest
from struct import pack
from qpid.codec import Codec
from cStringIO import StringIO
from qpid.reference import ReferenceId

__doc__ = """
    
    This is a unit test script for qpid/codec.py
    
    It can be run standalone or as part of the existing test framework.
    
    To run standalone: 
    -------------------
    
        Place in the qpid/python/tests_0-9 directory and type...
    
        python codec.py
        
        A brief output will be printed on screen. The verbose output will be placed inn a file called
        codec_unit_test_output.txt. [TODO: make this filename configurable]
        
    To run as part of the existing test framework:        
    -----------------------------------------------
    
        python run-tests tests_0-9.codec

    Change History:
    -----------------
        Jimmy John	05/19/2007	Initial draft


"""

        
# --------------------------------------
# --------------------------------------
class BaseDataTypes(unittest.TestCase):

    """
    Base class containing common functions
    """

    # ---------------
    def setUp(self):
        """
        standard setUp for unitetest (refer unittest documentation for details)
        """
        self.codec = Codec(StringIO())
        
    # ------------------
    def tearDown(self):
        """
        standard tearDown for unitetest (refer unittest documentation for details)
        """
        self.codec.stream.flush()
        self.codec.stream.close()
        
    # ----------------------------------------
    def callFunc(self, functionName, *args):
        """
        helper function - given a function name and arguments, calls the function with the args and
        returns the contents of the stream
        """
        if hasattr(self.codec, functionName):
            getattr(self.codec, functionName)(args[0])
            return self.codec.stream.getvalue()
        
    # ----------------------------------------
    def readFunc(self, functionName, *args):
        """
        helper function - creates a input stream and then calls the function with arguments as have been
        supplied
        """
        if hasattr(self.codec, functionName):
            
            self.codec.stream = StringIO(args[0])
            return getattr(self.codec, functionName)()
    
    
# ----------------------------------------
# ----------------------------------------
class IntegerTestCase(BaseDataTypes):

    """
    Handles octet, short, long, long long
    
    """
    
    # -------------------------- #
    # Unsigned Octect - 8 bits   #
    # -------------------------- #
    
    # --------------------------
    def test_unsigned_octet(self):
        """
        ubyte format requires 0<=number<=255
        """
        self.failUnlessEqual(self.callFunc('encode_octet', 2), pack('!B', 2), 'octect encoding FAILED...') 

    # -------------------------------------------
    def test_octet_out_of_upper_range(self):
        """
        testing for input above acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_octet, 256) 

    # -------------------------------------------
    def test_uoctet_out_of_lower_range(self):
        """
        testing for input below acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_octet, -1) 
        
    # ---------------------------------
    def test_uoctet_with_fraction(self):
        """
        the fractional part should be ignored...
        """
        self.failUnlessEqual(self.callFunc('encode_octet', 2.5), pack('!B', 2.5), 'octect encoding FAILED with fractions...') 
        
    # ------------------------------------
    def test_unsigned_octet_decode(self):
        """
        octet decoding
        """
        self.failUnlessEqual(self.readFunc('decode_octet', '\x02'), 2, 'octect decoding FAILED...') 

    # ----------------------------------- #
    # Unsigned Short Integers - 16 bits   #
    # ----------------------------------- #
    
    # -----------------------
    def test_ushort_int(self):
        self.failUnlessEqual(self.callFunc('encode_short', 2), pack('!H', 2), 'short encoding FAILED...') 
        
    # -------------------------------------------
    def test_ushort_int_out_of_upper_range(self):
        """
        testing for input above acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_short, 65536) 

    # -------------------------------------------
    def test_ushort_int_out_of_lower_range(self):
        """
        testing for input below acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_short, -1) 
        
    # ---------------------------------
    def test_ushort_int_with_fraction(self):
        """
        the fractional part should be ignored...
        """
        self.failUnlessEqual(self.callFunc('encode_short', 2.5), pack('!H', 2.5), 'short encoding FAILED with fractions...')         
        
    # ------------------------------------
    def test_ushort_int_decode(self):
        """
        unsigned short decoding
        """
        self.failUnlessEqual(self.readFunc('decode_short', '\x00\x02'), 2, 'unsigned short decoding FAILED...') 
        

    # ---------------------------------- #
    # Unsigned Long Integers - 32 bits   #
    # ---------------------------------- #
    
    # -----------------------
    def test_ulong_int(self):
        self.failUnlessEqual(self.callFunc('encode_long', 2), pack('!L', 2), 'long encoding FAILED...') 
        
    # -------------------------------------------
    def test_ulong_int_out_of_upper_range(self):
        """
        testing for input above acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) 

    # -------------------------------------------
    def test_ulong_int_out_of_lower_range(self):
        """
        testing for input below acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_long, -1) 
        
    # ---------------------------------
    def test_ulong_int_with_fraction(self):
        """
        the fractional part should be ignored...
        """
        self.failUnlessEqual(self.callFunc('encode_long', 2.5), pack('!L', 2.5), 'long encoding FAILED with fractions...')         
        
    # -------------------------------
    def test_ulong_int_decode(self):
        """
        unsigned long decoding
        """
        self.failUnlessEqual(self.readFunc('decode_long', '\x00\x00\x00\x02'), 2, 'unsigned long decoding FAILED...') 
            

    # --------------------------------------- #
    # Unsigned Long Long Integers - 64 bits   #
    # --------------------------------------- #    
    
    # -----------------------
    def test_ulong_long_int(self):
        self.failUnlessEqual(self.callFunc('encode_longlong', 2), pack('!Q', 2), 'long long encoding FAILED...') 
    
    # -------------------------------------------
    def test_ulong_long_int_out_of_upper_range(self):
        """
        testing for input above acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_long, 18446744073709551616) 

    # -------------------------------------------
    def test_ulong_long_int_out_of_lower_range(self):
        """
        testing for input below acceptable range 
        """
        self.failUnlessRaises(Exception, self.codec.encode_long, -1) 
        
    # ---------------------------------
    def test_ulong_long_int_with_fraction(self):
        """
        the fractional part should be ignored...
        """
        self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), pack('!Q', 2.5), 'long long encoding FAILED with fractions...')         
        
    # ------------------------------------
    def test_ulong_long_int_decode(self):
        """
        unsigned long long decoding
        """
        self.failUnlessEqual(self.readFunc('decode_longlong', '\x00\x00\x00\x00\x00\x00\x00\x02'), 2, 'unsigned long long decoding FAILED...') 
        
# -----------------------------------
# -----------------------------------
class BitTestCase(BaseDataTypes): 

    """
    Handles bits
    """

    # ----------------------------------------------
    def callFunc(self, functionName, *args):
        """
        helper function
        """
        if hasattr(self.codec, functionName):
        
            for ele in args:
                getattr(self.codec, functionName)(ele)
            
            self.codec.flush()
            return self.codec.stream.getvalue()  
            
    # -------------------
    def test_bit1(self):
        """
        sends in 11
        """
        self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), pack('!B', 3), '11 bit encoding FAILED...')         
        
    # -------------------
    def test_bit2(self):
        """
        sends in 10011
        """
        self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), pack('!B', 19), '10011 bit encoding FAILED...')         
        
    # -------------------
    def test_bit3(self):
        """
        sends in 1110100111 [10 bits(right to left), should be compressed into two octets]
        """
        self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...')         
        
    # ------------------------------------
    def test_bit_decode_1(self):
        """
        decode bit 1
        """
        self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') 
            
    # ------------------------------------
    def test_bit_decode_0(self):
        """
        decode bit 0
        """
        self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') 
            
# -----------------------------------
# -----------------------------------
class StringTestCase(BaseDataTypes): 

    """
    Handles short strings, long strings
    """

    # ------------------------------------------------------------- #
    # Short Strings - 8 bit length followed by zero or more octets  #
    # ------------------------------------------------------------- #

    # ---------------------------------------
    def test_short_string_zero_length(self):
        """
        0 length short string
        """
        self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...')

    # -------------------------------------------
    def test_short_string_positive_length(self):
        """
        positive length short string
        """
        self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...')

    # -------------------------------------------
    def test_short_string_out_of_upper_range(self):
        """
        string length > 255
        """
        self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256)
        
    # ------------------------------------
    def test_short_string_decode(self):
        """
        short string decode
        """
        self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') 
            
    
    # ------------------------------------------------------------- #
    # Long Strings - 32 bit length followed by zero or more octets  #
    # ------------------------------------------------------------- #
    
    # ---------------------------------------
    def test_long_string_zero_length(self):
        """
        0 length long string
        """
        self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...')

    # -------------------------------------------
    def test_long_string_positive_length(self):
        """
        positive length long string
        """
        self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...')
        
    # ------------------------------------
    def test_long_string_decode(self):
        """
        long string decode
        """
        self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') 
            

# --------------------------------------
# --------------------------------------
class TimestampTestCase(BaseDataTypes): 
    
    """
    No need of any test cases here as timestamps are implemented as long long which is tested above
    """
    pass
    
# ---------------------------------------
# ---------------------------------------
class FieldTableTestCase(BaseDataTypes):

    """
    Handles Field Tables 
    
    Only S/I type messages seem to be implemented currently
    """

    # -------------------------------------------
    def test_field_table_name_value_pair(self):
        """
        valid name value pair
        """
        self.failUnlessEqual(self.callFunc('encode_longstr', dict({'$key1':'value1'})), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...')
    
    # -------------------------------------------
    def test_field_table_invalid_field_name(self):
        """
        invalid field name
        """
        self.failUnlessRaises(Exception, self.codec.encode_longstr, dict({'1key1':'value1'}))
    
    # ----------------------------------------------------
    def test_field_table_invalid_field_name_length(self):
        """
        field names can have a maximum length of 128 chars
        """
        self.failUnlessRaises(Exception, self.codec.encode_longstr, dict({'x'*129:'value1'}))
        
    # ---------------------------------------------------
    def test_field_table_multiple_name_value_pair(self):
        """
        multiple name value pair
        """
        self.failUnlessEqual(self.callFunc('encode_longstr', dict({'$key1':'value1','$key2':'value2'})), '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1', 'multiple name value pair encoding FAILED...')
        
    # ------------------------------------
    def test_field_table_decode(self):
        """
        field table decode
        """
        self.failUnlessEqual(self.readFunc('decode_table', '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1'), dict({'$key1':'value1','$key2':'value2'}), 'field table decode FAILED...') 
            
            
# ------------------------------------
# ------------------------------------
class ContentTestCase(BaseDataTypes):    

    """
    Handles Content data types
    """

    # -----------------------------
    def test_content_inline(self):
        """
        inline content
        """
        self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...')

    # --------------------------------
    def test_content_reference(self):
        """
        reference content
        """
        self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...')

    # ------------------------------------
    def test_content_inline_decode(self):
        """
        inline content decode
        """
        self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') 
            
    # ------------------------------------
    def test_content_reference_decode(self):
        """
        reference content decode
        """
        self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') 
    
    
# ---------------------------
# ---------------------------
if __name__ == '__main__':
   
    codec_test_suite = unittest.TestSuite()
    
    #adding all the test suites...
    codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase))
    codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase))
    codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase))
    codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase))
    codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase))
    codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase))
    
    run_output_stream = StringIO()
    test_runner = unittest.TextTestRunner(run_output_stream, '', '')
    test_result = test_runner.run(codec_test_suite)
    
    print '\n%d test run...' % (test_result.testsRun)
    
    if test_result.wasSuccessful():
        print '\nAll tests successful\n'
        
    if test_result.failures:
        print '\n----------'
        print '%d FAILURES:' % (len(test_result.failures))
        print '----------\n'
        for failure in test_result.failures:
            print str(failure[0]) + ' ... FAIL'
        
    if test_result.errors:
        print '\n---------'
        print '%d ERRORS:' % (len(test_result.errors))
        print '---------\n'
        
        for error in test_result.errors:
            print str(error[0]) + ' ... ERROR'
            
    f = open('codec_unit_test_output.txt', 'w')            
    f.write(str(run_output_stream.getvalue()))
    f.close()
    
    
    

Reply via email to