Review at  https://gerrit.osmocom.org/6829

fake_trx: use DATAMSG classes for DATA messages

The DATAMSG API, that was introduced and extended a few commits
before, provides all required methods to create, validate,
generate and parse DATA messages. Let's use it now.

Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56
---
M src/target/fake_trx/burst_gen.py
M src/target/fake_trx/burst_send.py
M src/target/fake_trx/data_if.py
M src/target/fake_trx/trx_sniff.py
4 files changed, 114 insertions(+), 157 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/29/6829/1

diff --git a/src/target/fake_trx/burst_gen.py b/src/target/fake_trx/burst_gen.py
index 04b7f47..ced2de3 100755
--- a/src/target/fake_trx/burst_gen.py
+++ b/src/target/fake_trx/burst_gen.py
@@ -4,7 +4,7 @@
 # Auxiliary tool to generate and send random bursts via TRX DATA
 # interface, which may be useful for fuzzing and testing
 #
-# (C) 2017 by Vadim Yanitskiy <axilira...@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilira...@gmail.com>
 #
 # All Rights Reserved
 #
@@ -22,7 +22,6 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import random
 import signal
 import getopt
 import sys
@@ -30,6 +29,7 @@
 from rand_burst_gen import RandBurstGen
 from data_if import DATAInterface
 from gsm_shared import *
+from data_msg import *
 
 COPYRIGHT = \
        "Copyright (C) 2017 by Vadim Yanitskiy <axilira...@gmail.com>\n" \
@@ -70,38 +70,56 @@
                # Init random burst generator
                burst_gen = RandBurstGen()
 
+               # Init an empty DATA message
+               if self.conn_mode == "TRX":
+                       msg = DATAMSG_L12TRX()
+               elif self.conn_mode == "L1":
+                       msg = DATAMSG_TRX2L1()
+
                # Generate a random frame number or use provided one
-               if self.fn is None:
-                       fn = random.randint(0, GSM_HYPERFRAME)
-               else:
-                       fn = self.fn
+               fn_init = msg.rand_fn() if self.fn is None else self.fn
 
                # Send as much bursts as required
                for i in range(self.burst_count):
+                       # Randomize the message header
+                       msg.rand_hdr()
+
+                       # Increase and set frame number
+                       msg.fn = (fn_init + i) % GSM_HYPERFRAME
+
+                       # Set timeslot number
+                       if self.tn is not None:
+                               msg.tn = self.tn
+
+                       # Set transmit power level
+                       if self.pwr is not None:
+                               msg.pwr = self.pwr
+
+                       # TODO: also set TRX2L1 specific fields
+
                        # Generate a random burst
                        if self.burst_type == "NB":
-                               buf = burst_gen.gen_nb()
+                               burst = burst_gen.gen_nb()
                        elif self.burst_type == "FB":
-                               buf = burst_gen.gen_fb()
+                               burst = burst_gen.gen_fb()
                        elif self.burst_type == "SB":
-                               buf = burst_gen.gen_sb()
+                               burst = burst_gen.gen_sb()
                        elif self.burst_type == "AB":
-                               buf = burst_gen.gen_ab()
+                               burst = burst_gen.gen_ab()
 
-                       print("[i] Sending %d/%d %s burst (fn=%u) to %s..."
+                       # Convert to soft-bits in case of TRX -> L1 message
+                       if self.conn_mode == "L1":
+                               burst = msg.ubit2sbit(burst)
+
+                       # Set burst
+                       msg.burst = burst
+
+                       print("[i] Sending %d/%d %s burst %s to %s..."
                                % (i + 1, self.burst_count, self.burst_type,
-                                       fn, self.conn_mode))
+                                       msg.desc_hdr(), self.conn_mode))
 
-                       # Send to TRX or L1
-                       if self.conn_mode == "TRX":
-                               self.data_if.send_trx_msg(buf,
-                                       self.tn, fn, self.pwr)
-                       elif self.conn_mode == "L1":
-                               self.data_if.send_l1_msg(buf,
-                                       self.tn, fn, self.pwr)
-
-                       # Increase frame number (for count > 1)
-                       fn = (fn + 1) % GSM_HYPERFRAME
+                       # Send message
+                       self.data_if.send_msg(msg)
 
                self.shutdown()
 
diff --git a/src/target/fake_trx/burst_send.py 
b/src/target/fake_trx/burst_send.py
index ee8e51a..4dbe984 100755
--- a/src/target/fake_trx/burst_send.py
+++ b/src/target/fake_trx/burst_send.py
@@ -3,7 +3,7 @@
 
 # Auxiliary tool to send existing bursts via TRX DATA interface
 #
-# (C) 2017 by Vadim Yanitskiy <axilira...@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilira...@gmail.com>
 #
 # All Rights Reserved
 #
@@ -21,13 +21,13 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import random
 import signal
 import getopt
 import sys
 
 from data_if import DATAInterface
 from gsm_shared import *
+from data_msg import *
 
 COPYRIGHT = \
        "Copyright (C) 2017 by Vadim Yanitskiy <axilira...@gmail.com>\n" \
@@ -74,40 +74,61 @@
                        print("[i] Reading bursts from stdin...")
                        src = sys.stdin
 
+               # Init an empty DATA message
+               if self.conn_mode == "TRX":
+                       msg = DATAMSG_L12TRX()
+               elif self.conn_mode == "L1":
+                       msg = DATAMSG_TRX2L1()
+
                # Generate a random frame number or use provided one
-               if self.fn is None:
-                       fn = random.randint(0, GSM_HYPERFRAME)
-               else:
-                       fn = self.fn
+               fn = msg.rand_fn() if self.fn is None else self.fn
 
                # Read the burst source line-by-line
                for line in src:
                        # Strip spaces
-                       burst = line.strip()
-                       buf = []
+                       burst_str = line.strip()
+                       burst = []
 
                        # Check length
-                       if len(burst) != 148:
+                       if len(burst_str) != 148:
                                print("[!] Dropping burst due to length != 148")
                                continue
 
-                       print("[i] Sending a burst (fn=%u) to %s..."
-                               % (fn, self.conn_mode))
+                       # Randomize the message header
+                       msg.rand_hdr()
+
+                       # Set frame number
+                       msg.fn = fn
+
+                       # Set timeslot number
+                       if self.tn is not None:
+                               msg.tn = self.tn
+
+                       # Set transmit power level
+                       if self.pwr is not None:
+                               msg.pwr = self.pwr
+
+                       # TODO: also set TRX2L1 specific fields
 
                        # Parse a string
-                       for bit in burst:
+                       for bit in burst_str:
                                if bit == "1":
-                                       buf.append(1)
+                                       burst.append(1)
                                else:
-                                       buf.append(0)
+                                       burst.append(0)
 
-                       # Send to TRX or L1
-                       if self.conn_mode == "TRX":
-                               self.data_if.send_trx_msg(buf,
-                                       self.tn, fn, self.pwr)
-                       elif self.conn_mode == "L1":
-                               self.data_if.send_l1_msg(buf,
-                                       self.tn, fn, self.pwr)
+                       # Convert to soft-bits in case of TRX -> L1 message
+                       if self.conn_mode == "L1":
+                               burst = msg.ubit2sbit(burst)
+
+                       # Set burst
+                       msg.burst = burst
+
+                       print("[i] Sending a burst %s to %s..."
+                               % (msg.desc_hdr(), self.conn_mode))
+
+                       # Send message
+                       self.data_if.send_msg(msg)
 
                        # Increase frame number (for count > 1)
                        fn = (fn + 1) % GSM_HYPERFRAME
diff --git a/src/target/fake_trx/data_if.py b/src/target/fake_trx/data_if.py
index 0f373ab..8b0cd8e 100644
--- a/src/target/fake_trx/data_if.py
+++ b/src/target/fake_trx/data_if.py
@@ -4,7 +4,7 @@
 # Virtual Um-interface (fake transceiver)
 # DATA interface implementation
 #
-# (C) 2017 by Vadim Yanitskiy <axilira...@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilira...@gmail.com>
 #
 # All Rights Reserved
 #
@@ -22,85 +22,18 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import random
-
 from udp_link import UDPLink
-from gsm_shared import *
+from data_msg import *
 
 class DATAInterface(UDPLink):
 
-       def send_l1_msg(self, burst,
-               tn = None, fn = None, rssi = None):
-               # Generate random timeslot index if not preset
-               if tn is None:
-                       tn = random.randint(0, 7)
+       def send_msg(self, msg):
+               # Validate a message
+               if not msg.validate():
+                       raise ValueError("Message incomplete or incorrect")
 
-               # Generate random frame number if not preset
-               if fn is None:
-                       fn = random.randint(0, GSM_HYPERFRAME)
-
-               # Generate random RSSI if not preset
-               if rssi is None:
-                       rssi = -random.randint(-75, -50)
-
-               # Prepare a buffer for header and burst
-               buf = []
-
-               # Put timeslot index
-               buf.append(tn)
-
-               # Put frame number
-               buf.append((fn >> 24) & 0xff)
-               buf.append((fn >> 16) & 0xff)
-               buf.append((fn >>  8) & 0xff)
-               buf.append((fn >>  0) & 0xff)
-
-               # Put RSSI
-               buf.append(rssi)
-
-               # HACK: put fake TOA value
-               buf += [0x00] * 2
-
-               # Put burst
-               buf += burst
-
-               # Put two unused bytes
-               buf += [0x00] * 2
+               # Generate TRX message
+               payload = msg.gen_msg()
 
                # Send message
-               self.send(bytearray(buf))
-
-       def send_trx_msg(self, burst,
-               tn = None, fn = None, pwr = None):
-               # Generate random timeslot index if not preset
-               if tn is None:
-                       tn = random.randint(0, 7)
-
-               # Generate random frame number if not preset
-               if fn is None:
-                       fn = random.randint(0, GSM_HYPERFRAME)
-
-               # Generate random power level if not preset
-               if pwr is None:
-                       pwr = random.randint(0, 34)
-
-               # Prepare a buffer for header and burst
-               buf = []
-
-               # Put timeslot index
-               buf.append(tn)
-
-               # Put frame number
-               buf.append((fn >> 24) & 0xff)
-               buf.append((fn >> 16) & 0xff)
-               buf.append((fn >>  8) & 0xff)
-               buf.append((fn >>  0) & 0xff)
-
-               # Put transmit power level
-               buf.append(pwr)
-
-               # Put burst
-               buf += burst
-
-               # Send message
-               self.send(bytearray(buf))
+               self.send(payload)
diff --git a/src/target/fake_trx/trx_sniff.py b/src/target/fake_trx/trx_sniff.py
index cf3ab9c..91c2c4a 100755
--- a/src/target/fake_trx/trx_sniff.py
+++ b/src/target/fake_trx/trx_sniff.py
@@ -27,6 +27,8 @@
 
 import scapy.all
 
+from data_msg import *
+
 COPYRIGHT = \
        "Copyright (C) 2018 by Vadim Yanitskiy <axilira...@gmail.com>\n" \
        "License GPLv2+: GNU GPL version 2 or later " \
@@ -98,32 +100,39 @@
                trx = udp.payload
 
                # Convert to bytearray
-               trx = bytearray(str(trx))
-
-               # Parse GSM TDMA specific data
-               fn = (trx[1] << 24) | (trx[2] << 16) | (trx[3] << 8) | trx[4]
-               tn = trx[0]
+               msg_raw = bytearray(str(trx))
 
                # Determine a burst direction (L1 <-> TRX)
                l12trx = udp.sport > udp.dport
 
+               # Create an empty DATA message
+               msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
+
+               # Attempt to parse the payload as a DATA message
+               try:
+                       msg.parse_msg(msg_raw)
+               except:
+                       print("[!] Failed to parse message, dropping...")
+                       self.cnt_burst_dropped_num += 1
+                       return
+
                # Poke burst pass filter
-               rc = self.burst_pass_filter(l12trx, fn, tn)
+               rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn)
                if rc is False:
                        self.cnt_burst_dropped_num += 1
                        return
 
                # Debug print
-               print("[i] %s burst: fn=%u, tn=%d" \
-                       % ("L1 -> TRX" if l12trx else "TRX -> L1", fn, tn))
+               print("[i] %s burst: %s" \
+                       % ("L1 -> TRX" if l12trx else "TRX -> L1", 
msg.desc_hdr()))
 
                # Poke burst handler
-               rc = self.burst_handle(trx, l12trx, fn, tn)
+               rc = self.burst_handle(l12trx, msg_raw, msg)
                if rc is False:
                        self.shutdown()
 
                # Poke burst counter
-               rc = self.burst_count(fn, tn)
+               rc = self.burst_count(msg.fn, msg.tn)
                if rc is True:
                        self.shutdown()
 
@@ -149,46 +158,22 @@
                # Burst passed ;)
                return True
 
-       def burst_handle(self, trx_burst, l12trx, fn, tn):
+       def burst_handle(self, l12trx, msg_raw, msg):
                if self.print_bursts:
-                       self.burst_dump_bits(sys.stdout, trx_burst, l12trx)
-                       sys.stdout.flush()
+                       print(msg.burst)
 
                if self.output_file is not None:
                        # TLV: tag defines burst direction (one byte, BE)
                        self.output_file.write('\x01' if l12trx else '\x02')
 
                        # TLV: length of value (one byte, BE)
-                       length = len(trx_burst)
+                       length = len(msg_raw)
                        self.output_file.write(chr(length))
 
                        # TLV: raw value
-                       self.output_file.write(trx_burst)
+                       self.output_file.write(msg_raw)
 
                return True
-
-       def burst_dump_bits(self, dst, trx_burst, l12trx):
-               # Split out burst header
-               if l12trx:
-                       burst = trx_burst[6:]
-               else:
-                       burst = trx_burst[8:]
-
-               # Print normal bits: 0 or 1
-               for i in range(0, 148):
-                       # Convert bits to chars
-                       if l12trx:
-                               # Convert bits as is
-                               bit = '1' if burst[i] else '0'
-                       else:
-                               # Convert trx bits {254..0} to sbits {-127..127}
-                               bit = -127 if burst[i] == 255 else 127 - 
burst[i]
-                               # Convert sbits {-127..127} to ubits {0..1}
-                               bit = '1' if bit < 0 else '0'
-
-                       # Write a normal bit
-                       dst.write(bit)
-               dst.write("\n")
 
        def burst_count(self, fn, tn):
                # Update frame counter

-- 
To view, visit https://gerrit.osmocom.org/6829
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56
Gerrit-PatchSet: 1
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Owner: Harald Welte <lafo...@gnumonks.org>

Reply via email to