Signed-off-by: WATANABE Fumitaka <[email protected]>
Signed-off-by: Yuichi Ito <[email protected]>
---
 ryu/tests/switch/tester.py |   75 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 74 insertions(+), 1 deletion(-)

diff --git a/ryu/tests/switch/tester.py b/ryu/tests/switch/tester.py
index b43cb9f..29c8375 100644
--- a/ryu/tests/switch/tester.py
+++ b/ryu/tests/switch/tester.py
@@ -81,6 +81,8 @@ TARGET_RECEIVE_PORT = 1

 INTERVAL = 1  # sec
 WAIT_TIMER = 3  # sec
+CONTINUOUS_THREAD_INTVL = float(0.01)  # sec
+CONTINUOUS_PROGRESS_SPAN = 3  # sec

 # Default settings for 'ingress: packets'
 DEFAULT_DURATION_TIME = 30
@@ -230,6 +232,9 @@ class OfTester(app_manager.RyuApp):
         self.waiter = None
         self.send_msg_xids = []
         self.rcv_msgs = []
+        self.ingress_event = None
+        self.ingress_threads = []
+        self.thread_msg = None
         self.test_thread = hub.spawn(
             self._test_sequential_execute, test_dir)

@@ -252,7 +257,9 @@ class OfTester(app_manager.RyuApp):
     def close(self):
         if self.test_thread is not None:
             hub.kill(self.test_thread)
-            hub.joinall([self.test_thread])
+        if self.ingress_event:
+            self.ingress_event.set()
+        hub.joinall([self.test_thread])
         self._test_end('--- Test terminated ---')

     @set_ev_cls(ofp_event.EventOFPStateChange,
@@ -336,6 +343,7 @@ class OfTester(app_manager.RyuApp):

         if description:
             self.logger.info('%s', description)
+        self.thread_msg = None

         # Test execute.
         try:
@@ -381,6 +389,11 @@ class OfTester(app_manager.RyuApp):
         except Exception:
             result = [TEST_ERROR, RYU_INTERNAL_ERROR]
             result_type = RYU_INTERNAL_ERROR
+        finally:
+            self.ingress_event = None
+            for tid in self.ingress_threads:
+                hub.kill(tid)
+            self.ingress_threads = []

         # Output test result.
         self.logger.info('    %-100s %s', test.description, result[0])
@@ -635,6 +648,66 @@ class OfTester(app_manager.RyuApp):
         if not lookup:
             raise TestError(self.state)

+    def _continuous_packet_send(self, pkt):
+        assert self.ingress_event is None
+
+        pkt_data = pkt[KEY_PACKETS][KEY_DATA]
+        pktps = pkt[KEY_PACKETS][KEY_PKTPS]
+        duration_time = pkt[KEY_PACKETS][KEY_DURATION_TIME]
+
+        self.logger.debug("send_packet:[%s]", packet.Packet(pkt_data))
+        self.logger.debug("pktps:[%d]", pktps)
+        self.logger.debug("duration_time:[%d]", duration_time)
+
+        arg = {'pkt_data': pkt_data,
+               'thread_counter': 0,
+               'dot_span': int(CONTINUOUS_PROGRESS_SPAN /
+                               CONTINUOUS_THREAD_INTVL),
+               'packet_counter': float(0),
+               'packet_counter_inc': pktps * CONTINUOUS_THREAD_INTVL}
+
+        try:
+            self.ingress_event = hub.Event()
+            tid = hub.spawn(self._send_packet_thread, arg)
+            self.ingress_threads.append(tid)
+            self.ingress_event.wait(duration_time)
+            if self.thread_msg is not None:
+                raise self.thread_msg  # pylint: disable=E0702
+        finally:
+            sys.stdout.write("\r\n")
+            sys.stdout.flush()
+
+    def _send_packet_thread(self, arg):
+        """ Send several packets continuously. """
+        if self.ingress_event is None or self.ingress_event._cond:
+            return
+
+        # display dots to express progress of sending packets
+        if not arg['thread_counter'] % arg['dot_span']:
+            sys.stdout.write(".")
+            sys.stdout.flush()
+
+        arg['thread_counter'] += 1
+
+        # pile up float values and
+        # use integer portion as the number of packets this thread sends
+        arg['packet_counter'] += arg['packet_counter_inc']
+        count = int(arg['packet_counter'])
+        arg['packet_counter'] -= count
+
+        hub.sleep(CONTINUOUS_THREAD_INTVL)
+
+        tid = hub.spawn(self._send_packet_thread, arg)
+        self.ingress_threads.append(tid)
+        hub.sleep(0)
+        for _ in range(count):
+            try:
+                self.tester_sw.send_packet_out(arg['pkt_data'])
+            except Exception as err:
+                self.thread_msg = err
+                self.ingress_event.set()
+                break
+
     def _compare_flow(self, stats1, stats2):
         attr_list = ['cookie', 'priority', 'hard_timeout', 'idle_timeout',
                      'table_id', 'instructions', 'match']
--
1.7.10.4


------------------------------------------------------------------------------
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to