Enable sanity test for can protocols using can-utils.

Signed-off-by: Liow, Shi Jie Donavan <[email protected]>
Signed-off-by: Yeoh Ee Peng <[email protected]>
---
 lib/oeqa/runtime/cases/can.py | 78 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 78 insertions(+)
 create mode 100644 lib/oeqa/runtime/cases/can.py

diff --git a/lib/oeqa/runtime/cases/can.py b/lib/oeqa/runtime/cases/can.py
new file mode 100644
index 0000000..481cc97
--- /dev/null
+++ b/lib/oeqa/runtime/cases/can.py
@@ -0,0 +1,78 @@
+from oeqa.runtime.case import OERuntimeTestCase
+import threading
+import time
+import os
+script_path = os.path.dirname(os.path.realpath(__file__))
+files_path = os.path.join(script_path, '../files/')
+
+class CanTest(OERuntimeTestCase):
+    can_testapp_dir = '/usr/bin/'
+    iface_rx = 'can1'
+    iface_tx = 'can0'
+    canId = '7FF'
+    canFrameFormat = 'sff'
+    canFD = 'off'
+    canData = '11223344deadbeef'
+
+    def canCheckInterface(self, canIface):
+        status, output = self.target.run("ifconfig -a {}".format(canIface))
+        self.assertTrue(canIface in output, msg='status and output: %s and %s' 
% (status, output))
+
+    def canInterfaceBringUp(self, canIface, canFD, bitrate):
+        status, output = self.target.run("ip link set down 
{}".format(canIface))
+        self.assertEqual(status, 0, msg='status and output: %s and %s' % 
(status, output))
+        if canFD == "on":
+            status, output = self.target.run("ip link set {} type can bitrate 
{} dbitrate 4000000 fd on".format(canIface, bitrate))
+        else:
+            status, output = self.target.run("ip link set {} type can bitrate 
{} fd off".format(canIface, bitrate))
+        self.assertEqual(status, 0, msg='status and output: %s and %s' % 
(status, output))
+        status, output = self.target.run("ip link set {} type can restart-ms 
100".format(canIface))
+        self.assertEqual(status, 0, msg='status and output: %s and %s' % 
(status, output))
+        status, output = self.target.run("ip link set up {}".format(canIface))
+        self.assertEqual(status, 0, msg='status and output: %s and %s' % 
(status, output))
+        status, output = self.target.run("ip link show {}".format(canIface))
+        self.assertTrue("state UP" in output, msg='status and output: %s and 
%s' % (status, output))
+    
+    def canRawTx(self, canIfaceTx, canFd, canFrameFormat, canId, TxData, 
can_testapp_dir):
+        time.sleep(1)
+        if (canFrameFormat == "sff") and (canFd == "off"):
+            cmd = "%scangen %s -I %s -L 8 -D %s -n 1 -v -v" % 
(can_testapp_dir, canIfaceTx, canId, TxData)
+        elif (canFrameFormat == "sff") and (canFd == "on"):
+            cmd = "%scangen %s -I %s -L 64 -fb -D %s -n 1 -v 
-v".format(can_testapp_dir, canIfaceTx, canId, TxData)
+        elif (canFrameFormat == "eff") and (canFd == "off"):
+            cmd = "%scangen %s -I %s -L 8 -e -D %s -n 1 -v 
-v".format(can_testapp_dir, canIfaceTx, canId, TxData)
+        elif (canFrameFormat == "eff") and (canFd == "on"):
+            cmd = "%scangen %s -I %s -L 64 -e -fb -D %s -n 1 -v 
-v".format(can_testapp_dir, canIfaceTx, canId, TxData)
+        status, output = self.target.run(cmd)
+        self.assertEqual(status, 0, msg='status and output: %s and %s' % 
(status, output))
+
+    def canRawRx(self, canIfaceRx, canId, canFrameFormat, can_testapp_dir, 
canRawFilter=None):
+        if (canRawFilter is None) and (canFrameFormat == "sff"):
+            canRawFilter = 0x7FF
+        else: # canFd is eff
+            canRawFilter = 0x1FFFFFFF
+        return self.target.run("%scandump %s,%s:%s -n 1" % (can_testapp_dir, 
canIfaceRx, canId, hex(canRawFilter)))
+    
+    def canBcmDataVerification(self, iface_rx, canFrameFormat, canId, TxData, 
RxData):
+        RxData = RxData.replace(' ','')
+        TxData = TxData.lower()
+        RxData = RxData.lower()
+        
+        # Verify CAN ID
+        iface_rx_can_id = '%s%s' % (iface_rx.lower(), canId.lower())
+        self.assertTrue(iface_rx_can_id in RxData, msg='%s not in %s' % 
(iface_rx_can_id, RxData))
+        
+        # Verify CAN Data
+        self.assertTrue(TxData in RxData, msg='%s not in %s' % (TxData, 
RxData))
+
+    @OEHasPackage(['can-utils'])
+    def test_can_raw_can_send_and_receive_data(self):
+        self.canCheckInterface(self.iface_rx)
+        self.canCheckInterface(self.iface_tx)
+        self.canInterfaceBringUp(self.iface_tx, self.canFD, bitrate=1000000) # 
bring up CAN TX interface
+        self.canInterfaceBringUp(self.iface_rx, self.canFD, bitrate=1000000) # 
bring up CAN RX interface
+        
+        cantx_thread = threading.Thread(target=self.canRawTx, 
args=(self.iface_tx, self.canFD, self.canFrameFormat, self.canId, self.canData, 
self.can_testapp_dir,))
+        cantx_thread.start()
+        status, output = self.canRawRx(self.iface_rx, self.canId, 
self.canFrameFormat, self.can_testapp_dir)
+        self.canBcmDataVerification(self.iface_rx, self.canFrameFormat, 
self.canId, self.canData, output)
-- 
2.7.4

-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.

View/Reply Online (#6587): 
https://lists.yoctoproject.org/g/meta-intel/message/6587
Mute This Topic: https://lists.yoctoproject.org/mt/74689650/21656
Group Owner: [email protected]
Unsubscribe: https://lists.yoctoproject.org/g/meta-intel/unsub  
[[email protected]]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to