Ema has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/354746 )

Change subject: Move IP classes to pybal.ip
......................................................................

Move IP classes to pybal.ip

Move IPv4IP and IPv6IP to pybal.ip. Add unit tests for pybal.ip and
(very basic!) tests for the Origin and BaseASPath BGP attributes.

Change-Id: I4e474adb9177fc934977339266eb7c3b145dae08
---
M pybal/bgp.py
A pybal/ip.py
A pybal/test/test_bgp.py
A pybal/test/test_ip.py
4 files changed, 225 insertions(+), 140 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal 
refs/changes/46/354746/1

diff --git a/pybal/bgp.py b/pybal/bgp.py
index cc44fb5..a9aa7e9 100644
--- a/pybal/bgp.py
+++ b/pybal/bgp.py
@@ -20,6 +20,9 @@
 from twisted import copyright
 from twisted.internet import reactor, protocol, base, interfaces, error, defer
 
+# pybal imports
+from .ip import IPPrefix, IPv4IP, IPv6IP
+
 # Constants
 VERSION = 4
 PORT = 179
@@ -174,146 +177,6 @@
         be initiated.
         """
 
-# TODO: Replace by some better third party classes or rewrite
-class IPPrefix(object):
-    """Class that represents an IP prefix"""
-
-    def __init__(self, ipprefix, addressfamily=None):
-        self.prefix = None # packed ip string
-
-        if isinstance(ipprefix, IPPrefix):
-            self.prefix, self.prefixlen, self.addressfamily = ipprefix.prefix, 
ipprefix.prefixlen, ipprefix.addressfamily
-        elif type(ipprefix) is tuple:
-            # address family must be specified
-            if not addressfamily:
-                raise ValueError()
-
-            self.addressfamily = addressfamily
-
-            prefix, self.prefixlen = ipprefix
-            if type(prefix) is str:
-                # tuple (ipstr, prefixlen)
-                self.prefix = prefix
-            elif type(prefix) is int:
-                if self.addressfamily == AFI_INET:
-                    # tuple (ipint, prefixlen)
-                    self.prefix = struct.pack('!I', prefix)
-                else:
-                    raise ValueError()
-            else:
-                # Assume prefix is a sequence of octets
-                self.prefix = b"".join(map(chr, prefix))
-        elif type(ipprefix) is str:
-            # textual form
-            prefix, prefixlen = ipprefix.split('/')
-            self.addressfamily = addressfamily or (':' in prefix and AFI_INET6 
or AFI_INET)
-
-            if self.addressfamily == AFI_INET:
-                self.prefix = b"".join([chr(int(o)) for o in 
prefix.split('.')])
-            elif self.addressfamily == AFI_INET6:
-                self.prefix = bytearray()
-                hexlist = prefix.split(":")
-                if len(hexlist) > 8:
-                    raise ValueError()
-
-                for hexstr in hexlist:
-                    if hexstr is not "":
-                        self.prefix += struct.pack('!H', int(hexstr, 16))
-                    else:
-                        zeroCount = 8 - len(hexlist) + 1
-                        self.prefix += struct.pack('!%dH' % zeroCount, *((0,) 
* zeroCount))
-                self.prefix = bytes(self.prefix)
-
-            self.prefixlen = int(prefixlen)
-        else:
-            raise ValueError()
-
-    def __repr__(self):
-        return repr(str(self))
-
-    def __str__(self):
-        if self.addressfamily == AFI_INET:
-            return '.'.join([str(ord(o)) for o in self.packed(pad=True)]) + 
'/%d' % self.prefixlen
-        elif self.addressfamily == AFI_INET6:
-            return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', 
self.packed(pad=True))]) + '/%d' % self.prefixlen
-
-    def __eq__(self, other):
-        # FIXME: masked ips
-        return isinstance(other, IPPrefix) and self.prefixlen == 
other.prefixlen and self.prefix == other.prefix
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __lt__(self, other):
-        return self.prefix < other.prefix or \
-            (self.prefix == other.prefix and self.prefixlen < other.prefixlen)
-
-    def __le__(self, other):
-        return self.__lt__(other) or self.__eq__(other)
-
-    def __gt__(self, other):
-        return self.prefix > other.prefix or \
-            (self.prefix == other.prefix and self.prefixlen > other.prefixlen)
-
-    def __ge__(self, other):
-        return self.__gt__(other) or self.__eq__(other)
-
-    def __hash__(self):
-        return hash(self.prefix) ^ hash(self.prefixlen)
-
-    def __len__(self):
-        return self.prefixlen
-
-    def _packedMaxLen(self):
-        return (self.addressfamily == AFI_INET6 and 16 or 4)
-
-    def ipToInt(self):
-        return reduce(lambda x, y: x * 256 + y, map(ord, self.prefix))
-
-    def netmask(self):
-        return ~( (1 << (len(self.prefix)*8 - self.prefixlen)) - 1)
-
-    def mask(self, prefixlen, shorten=False):
-        # DEBUG
-        assert len(self.prefix) == self._packedMaxLen()
-
-        masklen = len(self.prefix) * 8 - prefixlen
-        self.prefix = struct.pack('!I', self.ipToInt() >> masklen << masklen)
-        if shorten: self.prefixlen = prefixlen
-        return self
-
-    def packed(self, pad=False):
-        if pad:
-            return self.prefix + '\0' * (self._packedMaxLen() - 
len(self.prefix))
-        else:
-            return self.prefix
-
-class IPv4IP(IPPrefix):
-    """Class that represents a single non-prefix IPv4 IP."""
-
-    def __init__(self, ip):
-        if type(ip) is str and len(ip) > 4:
-            super(IPv4IP, self).__init__(ip + '/32', AFI_INET)
-        else:
-            super(IPv4IP, self).__init__((ip, 32), AFI_INET)
-
-    def __str__(self):
-        return ".".join([str(ord(o)) for o in self.prefix])
-
-class IPv6IP(IPPrefix):
-    """Class that represents a single non-prefix IPv6 IP."""
-
-    def __init__(self, ip=None, packed=None):
-        if not ip and not packed:
-            raise ValueError()
-
-        if packed:
-            super(IPv6IP, self).__init__((packed, 128), AFI_INET6)
-        else:
-            super(IPv6IP, self).__init__(ip + '/128', AFI_INET6)
-
-    def __str__(self):
-        return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', 
self.packed(pad=True))])
 
 class Attribute(object):
     """
diff --git a/pybal/ip.py b/pybal/ip.py
new file mode 100644
index 0000000..e63c813
--- /dev/null
+++ b/pybal/ip.py
@@ -0,0 +1,149 @@
+# ip.py
+# Copyright (c) 2007 by Mark Bergsma <m...@nedworks.org>
+
+import struct
+
+# Constants
+AFI_INET = 1
+AFI_INET6 = 2
+
+# TODO: Replace by some better third party classes or rewrite
+class IPPrefix(object):
+    """Class that represents an IP prefix"""
+
+    def __init__(self, ipprefix, addressfamily=None):
+        self.prefix = None # packed ip string
+
+        if isinstance(ipprefix, IPPrefix):
+            self.prefix, self.prefixlen, self.addressfamily = ipprefix.prefix, 
ipprefix.prefixlen, ipprefix.addressfamily
+        elif type(ipprefix) is tuple:
+            # address family must be specified
+            if not addressfamily:
+                raise ValueError()
+
+            self.addressfamily = addressfamily
+
+            prefix, self.prefixlen = ipprefix
+            if type(prefix) is str:
+                # tuple (ipstr, prefixlen)
+                self.prefix = prefix
+            elif type(prefix) is int:
+                if self.addressfamily == AFI_INET:
+                    # tuple (ipint, prefixlen)
+                    self.prefix = struct.pack('!I', prefix)
+                else:
+                    raise ValueError()
+            else:
+                # Assume prefix is a sequence of octets
+                self.prefix = b"".join(map(chr, prefix))
+        elif type(ipprefix) is str:
+            # textual form
+            prefix, prefixlen = ipprefix.split('/')
+            self.addressfamily = addressfamily or (':' in prefix and AFI_INET6 
or AFI_INET)
+
+            if self.addressfamily == AFI_INET:
+                self.prefix = b"".join([chr(int(o)) for o in 
prefix.split('.')])
+            elif self.addressfamily == AFI_INET6:
+                self.prefix = bytearray()
+                hexlist = prefix.split(":")
+                if len(hexlist) > 8:
+                    raise ValueError()
+
+                for hexstr in hexlist:
+                    if hexstr is not "":
+                        self.prefix += struct.pack('!H', int(hexstr, 16))
+                    else:
+                        zeroCount = 8 - len(hexlist) + 1
+                        self.prefix += struct.pack('!%dH' % zeroCount, *((0,) 
* zeroCount))
+                self.prefix = bytes(self.prefix)
+
+            self.prefixlen = int(prefixlen)
+        else:
+            raise ValueError()
+
+    def __repr__(self):
+        return repr(str(self))
+
+    def __str__(self):
+        if self.addressfamily == AFI_INET:
+            return '.'.join([str(ord(o)) for o in self.packed(pad=True)]) + 
'/%d' % self.prefixlen
+        elif self.addressfamily == AFI_INET6:
+            return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', 
self.packed(pad=True))]) + '/%d' % self.prefixlen
+
+    def __eq__(self, other):
+        # FIXME: masked ips
+        return isinstance(other, IPPrefix) and self.prefixlen == 
other.prefixlen and self.prefix == other.prefix
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __lt__(self, other):
+        return self.prefix < other.prefix or \
+            (self.prefix == other.prefix and self.prefixlen < other.prefixlen)
+
+    def __le__(self, other):
+        return self.__lt__(other) or self.__eq__(other)
+
+    def __gt__(self, other):
+        return self.prefix > other.prefix or \
+            (self.prefix == other.prefix and self.prefixlen > other.prefixlen)
+
+    def __ge__(self, other):
+        return self.__gt__(other) or self.__eq__(other)
+
+    def __hash__(self):
+        return hash(self.prefix) ^ hash(self.prefixlen)
+
+    def __len__(self):
+        return self.prefixlen
+
+    def _packedMaxLen(self):
+        return (self.addressfamily == AFI_INET6 and 16 or 4)
+
+    def ipToInt(self):
+        return reduce(lambda x, y: x * 256 + y, map(ord, self.prefix))
+
+    def netmask(self):
+        return ~( (1 << (len(self.prefix)*8 - self.prefixlen)) - 1)
+
+    def mask(self, prefixlen, shorten=False):
+        # DEBUG
+        assert len(self.prefix) == self._packedMaxLen()
+
+        masklen = len(self.prefix) * 8 - prefixlen
+        self.prefix = struct.pack('!I', self.ipToInt() >> masklen << masklen)
+        if shorten: self.prefixlen = prefixlen
+        return self
+
+    def packed(self, pad=False):
+        if pad:
+            return self.prefix + '\0' * (self._packedMaxLen() - 
len(self.prefix))
+        else:
+            return self.prefix
+
+class IPv4IP(IPPrefix):
+    """Class that represents a single non-prefix IPv4 IP."""
+
+    def __init__(self, ip):
+        if type(ip) is str and len(ip) > 4:
+            super(IPv4IP, self).__init__(ip + '/32', AFI_INET)
+        else:
+            super(IPv4IP, self).__init__((ip, 32), AFI_INET)
+
+    def __str__(self):
+        return ".".join([str(ord(o)) for o in self.prefix])
+
+class IPv6IP(IPPrefix):
+    """Class that represents a single non-prefix IPv6 IP."""
+
+    def __init__(self, ip=None, packed=None):
+        if not ip and not packed:
+            raise ValueError()
+
+        if packed:
+            super(IPv6IP, self).__init__((packed, 128), AFI_INET6)
+        else:
+            super(IPv6IP, self).__init__(ip + '/128', AFI_INET6)
+
+    def __str__(self):
+        return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', 
self.packed(pad=True))])
diff --git a/pybal/test/test_bgp.py b/pybal/test/test_bgp.py
new file mode 100644
index 0000000..fc442e7
--- /dev/null
+++ b/pybal/test/test_bgp.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+"""
+  PyBal unit tests
+  ~~~~~~~~~~~~~~~~
+
+  This module contains tests for `pybal.bgp`.
+
+"""
+
+import pybal
+import pybal.config
+import pybal.bgp
+
+from .fixtures import PyBalTestCase
+
+
+class AttributeTestCase(PyBalTestCase):
+
+    def testOriginAttribute(self):
+        attr = pybal.bgp.OriginAttribute()
+        self.assertFalse(attr.optional)
+        self.assertTrue(attr.transitive)
+        self.assertEquals(attr.value, attr.ORIGIN_IGP)
+
+    def testBaseASPathAttribute(self):
+        attr = pybal.bgp.BaseASPathAttribute()
+        self.assertEquals(attr.value, [(2, [])])
+
diff --git a/pybal/test/test_ip.py b/pybal/test/test_ip.py
new file mode 100644
index 0000000..6f958c8
--- /dev/null
+++ b/pybal/test/test_ip.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+"""
+  PyBal unit tests
+  ~~~~~~~~~~~~~~~~
+
+  This module contains tests for `pybal.ip`.
+
+"""
+
+import pybal
+import pybal.config
+import pybal.ip
+
+from .fixtures import PyBalTestCase
+
+
+class IPv4IPTestCase(PyBalTestCase):
+
+    def testPrefixStr(self):
+        prefix = pybal.ip.IPv4IP('91.198.174.192')
+
+        self.assertEquals(prefix.addressfamily, pybal.ip.AFI_INET)
+        self.assertEquals(prefix.prefixlen, 32)
+
+    def testPrefixInt(self):
+        prefix = pybal.ip.IPv4IP(2130706433)
+        self.assertEquals(prefix.prefixlen, 32)
+        self.assertEquals(str(prefix), '127.0.0.1')
+
+    def testPrefixOctets(self):
+        prefix = pybal.ip.IPv4IP((0x7f, 0x0, 0x0, 0x1))
+        self.assertEquals(prefix.prefixlen, 32)
+        self.assertEquals(str(prefix), '127.0.0.1')
+
+class IPv6IPTestCase(PyBalTestCase):
+
+    def testOK(self):
+        prefix = pybal.ip.IPv6IP('2620:0:862:ed1a::1')
+
+        self.assertEquals(prefix.addressfamily, pybal.ip.AFI_INET6)
+        self.assertEquals(prefix.prefixlen, 128)
+
+    def testValueError(self):
+        with self.assertRaises(ValueError):
+            pybal.ip.IPv6IP()

-- 
To view, visit https://gerrit.wikimedia.org/r/354746
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e474adb9177fc934977339266eb7c3b145dae08
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/pybal
Gerrit-Branch: master
Gerrit-Owner: Ema <e...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to