When writing tests it can be very difficult to understand what
the test is doing without properly documenting the test.  Added
to that difficulty is when a stream of bytes gets posted with
the patch as a packet definition.  We then need to write out
detailed descriptions of the packets, and tend to get packet
strings that break line length parsers (leading to errors applying
patches.

With this change, we can write out clear descriptions of packets
that make use of Scapy to generate the actual bytes strings.
This should serve as accurate documentation, while also trimming
the amount of bytes a developer needs to write to represent a
packet.

Signed-off-by: Aaron Conole <[email protected]>
---
NOTE: I'm submitting it here as RFC, because I'm a bit nervous
      with the eval() call I've baked into the parser routine.
      Probably it's okay because users just trust the test suite
      anyway.

 tests/sendpkt.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 62 insertions(+), 2 deletions(-)

diff --git a/tests/sendpkt.py b/tests/sendpkt.py
index 7cbea51654..32af99b308 100755
--- a/tests/sendpkt.py
+++ b/tests/sendpkt.py
@@ -25,10 +25,66 @@
 #
 
 
+import re
 import socket
 import sys
 from optparse import OptionParser
 
+try:
+    from scapy.all import Ether, IP, IPv6, TCP, UDP, ARP, ICMP, ICMPv6ND_RA
+    from scapy.all import ICMPv6NDOptSrcLLAddr, ICMPv6NDOptMTU
+    from scapy.all import ICMPv6NDOptPrefixInfo
+    from scapy.all import ICMPv6EchoRequest, ICMPv6EchoReply
+    SCAPY_PROTO = {
+        "Ether": Ether,
+        "IP": IP,
+        "IPv6": IPv6,
+        "TCP": TCP,
+        "UDP": UDP,
+        "ARP": ARP,
+        "ICMP": ICMP,
+        "ICMPv6ND_RA": ICMPv6ND_RA,
+        "ICMPv6NDOptSrcLLAddr": ICMPv6NDOptSrcLLAddr,
+        "ICMPv6NDOptMTU": ICMPv6NDOptMTU,
+        "ICMPv6NDOptPrefixInfo": ICMPv6NDOptPrefixInfo,
+        "ICMPv6EchoRequest": ICMPv6EchoRequest,
+        "ICMPv6EchoReply": ICMPv6EchoReply,
+    }
+
+    def scapy_parse(packet_def):
+        pkt_layers = packet_def.split("/")
+
+        pkt = None
+
+        for layer in pkt_layers:
+            # Word(...) match
+            lm = re.match(r'(\w+)\((.*?)\)', layer)
+            if not lm:
+                raise ValueError(
+                    f"Invalid definition {packet_def} at layer {layer}")
+
+            proto, proto_args_str = lm.groups()
+            if proto not in SCAPY_PROTO:
+                raise ValueError("Unable to construct a packet with {proto}.")
+
+            proto_args = {}
+            if proto_args_str:
+                kvp = re.findall(r'(\w)=(?:\'([^\']*)\'|([\w.]+))',
+                                 proto_args_str)
+                for key, str_type, n_type in kvp:
+                    proto_args[key] = str_type if str_type else eval(n_type)
+
+            layer_obj = SCAPY_PROTO[proto](**proto_args)
+            if pkt is None:
+                pkt = layer_obj
+            else:
+                pkt /= layer_obj
+
+        return pkt
+
+except:
+    def scapy_parse(packet_def):
+        raise RuntimeError("No scapy module while trying to parse scapy def.")
 
 usage = "usage: %prog [OPTIONS] OUT-INTERFACE HEX-BYTES \n \
          bytes in HEX-BYTES must be separated by space(s)"
@@ -50,8 +106,12 @@ if options.packet_type != "eth":
 
 # Strip '0x' prefixes from hex input, combine into a single string and
 # convert to bytes.
-hex_str = "".join([a[2:] if a.startswith("0x") else a for a in args[1:]])
-pkt = bytes.fromhex(hex_str)
+try:
+    hex_str = "".join([a[2:] if a.startswith("0x") else a for a in args[1:]])
+    pkt = bytes.fromhex(hex_str)
+except ValueError:
+    parsed_pkt_obj = scapy_parse(args[1])
+    pkt = bytes(parsed_pkt_obj)
 
 try:
     sockfd = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
-- 
2.47.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to