Hi,
I wrote a simple C++ application to test my ability to stream to and from
the SplitStream NoC block (this test was in preparation for subsequently
using a custom block that has 1 input and 2 outputs).  The application
works, but it has the behavior that there are a couple of "D"
out-of-sequence errors at the start of each run (unless it is the first run
following the FPGA load, in which case it works fine with no sequence
errors).  I am wondering if this is:
- an application bug
- a UHD bug
- an FPGA bug
- or, behavior I just need to live with and handle

Note that I have tested other NoC blocks (FFT, Window, & custom) with a
similar application and did not experience these sequence errors. However,
they were all single input, single output blocks.  I do not know if the
different port configuration for the SplitStream block is relevant or if it
is just incidental.

For reference, I am using an X310/UBX over a 1Gbe link with the latest UHD
master commit from today (see log file for hash).  And, my FPGA build used
the latest from FPGA master.

It would seem to me that the block controller "clear()" function should
clean up the internals such as sequence numbers, but that does not seem to
be happening.  I'm wondering if there is something else that the
application should do to fully clear out any previous streaming.

Note that I also wrote a similar gnuradio application and saw the same
behavior.  Both the C++ application and gnuradio application show these
sequence errors at the start of each run.  I have attached the source code
for each application as well as a log file for each showing the command
terminal outputs.  In both logs, there is a "clean" run (initially after
loading the FPGA) followed by a run with sequence errors.

In the C++ application, there is just one SplitStream block (no rfnoc
graph) with one input port connected to a host tx_streamer and two output
ports connected to two host rx_streamers.  All streamers run in separate
threads once the streaming starts.  By default, only 1024 samples are sent
in packets of 256 each.  The rx_streamers simply store the incoming streams
to separate files which I have verified to contain the correct results.

Please let me know if you have any comments / suggestions.
Rob Kossler

Attachment: splitstream_test.grc
Description: Binary data

kossler@kossler-ThinkPad-P51:~/uhd_nd/nd_apps/build_rfnoc$ ./splitstream_test.py
[INFO] [UHD] linux; GNU C++ version 7.3.0; Boost_106501; UHD_3.14.0.0-13-ge3fd16a8
[INFO] [X300] X300 initialization sequence...
[INFO] [X300] Maximum frame size: 1472 bytes.
[INFO] [X300] Radio 1x clock: 200 MHz
[INFO] [GPS] No GPSDO found
[INFO] [0/DmaFIFO_0] Initializing block control (NOC ID: 0xF1F0D00000000000)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1312 MB/s)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1301 MB/s)
[INFO] [0/Radio_0] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/Radio_1] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/DUC_0] Initializing block control (NOC ID: 0xD0C0000000000000)
[INFO] [0/DDC_0] Initializing block control (NOC ID: 0xDDC0000000000000)
[WARNING] [RFNOC] Can't find a block controller for key FFT, using default block controller!
[INFO] [0/FFT_0] Initializing block control (NOC ID: 0xFF70000000000000)
[WARNING] [RFNOC] Can't find a block controller for key SplitStream, using default block controller!
[INFO] [0/SplitStream_0] Initializing block control (NOC ID: 0x5757000000000000)
[INFO] [0/Window_0] Initializing block control (NOC ID: 0xD053000000000000)
[WARNING] [RFNOC] Can't find a block controller for key KeepOneInN, using default block controller!
[INFO] [0/KeepOneInN_0] Initializing block control (NOC ID: 0x0246000000000000)

kossler@kossler-ThinkPad-P51:~/uhd_nd/nd_apps/build_rfnoc$ ./splitstream_test.py
[INFO] [UHD] linux; GNU C++ version 7.3.0; Boost_106501; UHD_3.14.0.0-13-ge3fd16a8
[INFO] [X300] X300 initialization sequence...
[INFO] [X300] Maximum frame size: 1472 bytes.
[INFO] [X300] Radio 1x clock: 200 MHz
[INFO] [0/DmaFIFO_0] Initializing block control (NOC ID: 0xF1F0D00000000000)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1317 MB/s)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1309 MB/s)
[INFO] [0/Radio_0] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/Radio_1] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/DUC_0] Initializing block control (NOC ID: 0xD0C0000000000000)
[INFO] [0/DDC_0] Initializing block control (NOC ID: 0xDDC0000000000000)
[WARNING] [RFNOC] Can't find a block controller for key FFT, using default block controller!
[INFO] [0/FFT_0] Initializing block control (NOC ID: 0xFF70000000000000)
[WARNING] [RFNOC] Can't find a block controller for key SplitStream, using default block controller!
[INFO] [0/SplitStream_0] Initializing block control (NOC ID: 0x5757000000000000)
[INFO] [0/Window_0] Initializing block control (NOC ID: 0xD053000000000000)
[WARNING] [RFNOC] Can't find a block controller for key KeepOneInN, using default block controller!
[INFO] [0/KeepOneInN_0] Initializing block control (NOC ID: 0x0246000000000000)
overrun on chan 0
overrun on chan D1
Dkossler@kossler-ThinkPad-P51:~/uhd_nd/nd_apps/build_rfnoc$ 

kossler@kossler-ThinkPad-P51:~/uhd_nd/nd_apps/build_rfnoc$ ./splitstream_test

Creating the USRP device with: ...
[INFO] [UHD] linux; GNU C++ version 7.3.0; Boost_106501; UHD_3.14.0.0-13-ge3fd16a8
[INFO] [X300] X300 initialization sequence...
[INFO] [X300] Maximum frame size: 1472 bytes.
[INFO] [X300] Radio 1x clock: 200 MHz
[INFO] [GPS] No GPSDO found
[INFO] [0/DmaFIFO_0] Initializing block control (NOC ID: 0xF1F0D00000000000)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1292 MB/s)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1292 MB/s)
[INFO] [0/Radio_0] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/Radio_1] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/DUC_0] Initializing block control (NOC ID: 0xD0C0000000000000)
[INFO] [0/DDC_0] Initializing block control (NOC ID: 0xDDC0000000000000)
[WARNING] [RFNOC] Can't find a block controller for key FFT, using default block controller!
[INFO] [0/FFT_0] Initializing block control (NOC ID: 0xFF70000000000000)
[WARNING] [RFNOC] Can't find a block controller for key SplitStream, using default block controller!
[INFO] [0/SplitStream_0] Initializing block control (NOC ID: 0x5757000000000000)
[INFO] [0/Window_0] Initializing block control (NOC ID: 0xD053000000000000)
[WARNING] [RFNOC] Can't find a block controller for key KeepOneInN, using default block controller!
[INFO] [0/KeepOneInN_0] Initializing block control (NOC ID: 0x0246000000000000)

Clearing block: 0/SplitStream_0
Using rx streamer args: spp=256,block_id=0/SplitStream_0,block_port=0
Using rx streamer args: spp=256,block_id=0/SplitStream_0,block_port=1
Using tx streamer args: spp=256,block_id=0/SplitStream_0,block_port=0

Rx: Issueing start stream cmd
Tx: Streaming started for 1024 samps, spp = 256
Rx: Issueing start stream cmd
Tx: send() samps: 256, EOB: 0
Tx: send() samps: 256, EOB: 0
Tx: send() samps: 256, EOB: 0
Tx: send() samps: 256, EOB: 1
Tx: Streaming complete
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 1
Rx: recv() samps: 256, EOB: 1
Rx: Issueing stop stream cmd
Rx: Issueing stop stream cmd
Rx: Streaming complete
Rx: Streaming complete

Done!

kossler@kossler-ThinkPad-P51:~/uhd_nd/nd_apps/build_rfnoc$ ./splitstream_test 

Creating the USRP device with: ...
[INFO] [UHD] linux; GNU C++ version 7.3.0; Boost_106501; UHD_3.14.0.0-13-ge3fd16a8
[INFO] [X300] X300 initialization sequence...
[INFO] [X300] Maximum frame size: 1472 bytes.
[INFO] [X300] Radio 1x clock: 200 MHz
[INFO] [0/DmaFIFO_0] Initializing block control (NOC ID: 0xF1F0D00000000000)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1292 MB/s)
[INFO] [0/DmaFIFO_0] BIST passed (Throughput: 1315 MB/s)
[INFO] [0/Radio_0] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/Radio_1] Initializing block control (NOC ID: 0x12AD100000000001)
[INFO] [0/DUC_0] Initializing block control (NOC ID: 0xD0C0000000000000)
[INFO] [0/DDC_0] Initializing block control (NOC ID: 0xDDC0000000000000)
[WARNING] [RFNOC] Can't find a block controller for key FFT, using default block controller!
[INFO] [0/FFT_0] Initializing block control (NOC ID: 0xFF70000000000000)
[WARNING] [RFNOC] Can't find a block controller for key SplitStream, using default block controller!
[INFO] [0/SplitStream_0] Initializing block control (NOC ID: 0x5757000000000000)
[INFO] [0/Window_0] Initializing block control (NOC ID: 0xD053000000000000)
[WARNING] [RFNOC] Can't find a block controller for key KeepOneInN, using default block controller!
[INFO] [0/KeepOneInN_0] Initializing block control (NOC ID: 0x0246000000000000)

Clearing block: 0/SplitStream_0
Using rx streamer args: spp=256,block_id=0/SplitStream_0,block_port=0
Using rx streamer args: spp=256,block_id=0/SplitStream_0,block_port=1
Using tx streamer args: spp=256,block_id=0/SplitStream_0,block_port=0

Rx: Issueing start stream cmd
Rx: Issueing start stream cmd
Tx: Streaming started for 1024 samps, spp = 256
Tx: send() samps: 256, EOB: 0
Tx: send() samps: 256, EOB: 0
Tx: send() samps: 256, EOB: 0
Tx: send() samps: 256, EOB: 1
Tx: Streaming complete
DRx: recv() samps: 0, EOB: 0
DRx: recv() samps: 0, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 0
Rx: recv() samps: 256, EOB: 1
Rx: recv() samps: 256, EOB: 1
Rx: Issueing stop stream cmd
Rx: Issueing stop stream cmd
Rx: Streaming complete
Rx: Streaming complete

Done!

kossler@kossler-ThinkPad-P51:~/uhd_nd/nd_apps/build_rfnoc$ 

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
##################################################
# GNU Radio Python Flow Graph
# Title: Splitstream Test
# Generated: Thu Jan  3 16:48:19 2019
##################################################

if __name__ == '__main__':
    import ctypes
    import sys
    if sys.platform.startswith('linux'):
        try:
            x11 = ctypes.cdll.LoadLibrary('libX11.so')
            x11.XInitThreads()
        except:
            print "Warning: failed to XInitThreads()"

from PyQt4 import Qt
from gnuradio import analog
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio import qtgui
from gnuradio import uhd
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
import ettus
import sip
import sys
from gnuradio import qtgui


class splitstream_test(gr.top_block, Qt.QWidget):

    def __init__(self):
        gr.top_block.__init__(self, "Splitstream Test")
        Qt.QWidget.__init__(self)
        self.setWindowTitle("Splitstream Test")
        qtgui.util.check_set_qss()
        try:
            self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc'))
        except:
            pass
        self.top_scroll_layout = Qt.QVBoxLayout()
        self.setLayout(self.top_scroll_layout)
        self.top_scroll = Qt.QScrollArea()
        self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame)
        self.top_scroll_layout.addWidget(self.top_scroll)
        self.top_scroll.setWidgetResizable(True)
        self.top_widget = Qt.QWidget()
        self.top_scroll.setWidget(self.top_widget)
        self.top_layout = Qt.QVBoxLayout(self.top_widget)
        self.top_grid_layout = Qt.QGridLayout()
        self.top_layout.addLayout(self.top_grid_layout)

        self.settings = Qt.QSettings("GNU Radio", "splitstream_test")
        self.restoreGeometry(self.settings.value("geometry").toByteArray())


        ##################################################
        # Variables
        ##################################################
        self.device3 = variable_uhd_device3_0 = ettus.device3(uhd.device_addr_t( ",".join(('type=x300', "")) ))
        self.samp_rate = samp_rate = 32000

        ##################################################
        # Blocks
        ##################################################
        self.uhd_rfnoc_split_stream_0 = ettus.rfnoc_generic(
            self.device3,
            uhd.stream_args( # TX Stream Args
                cpu_format="fc32",
                otw_format="sc16",
                args="gr_vlen={0},{1}".format(1, "" if 1 == 1 else "spp={0}".format(1)),
            ),
            uhd.stream_args( # RX Stream Args
                cpu_format="fc32",
                otw_format="sc16",
        	    channels=(0,1),
                args="gr_vlen={0},{1}".format(1, "" if 1 == 1 else "spp={0}".format(1)),
            ),
            "SplitStream", -1, -1,
        )
        self.qtgui_time_sink_x_0_0 = qtgui.time_sink_c(
        	1024, #size
        	samp_rate, #samp_rate
        	"", #name
        	1 #number of inputs
        )
        self.qtgui_time_sink_x_0_0.set_update_time(0.10)
        self.qtgui_time_sink_x_0_0.set_y_axis(-1, 1)

        self.qtgui_time_sink_x_0_0.set_y_label('Amplitude', "")

        self.qtgui_time_sink_x_0_0.enable_tags(-1, True)
        self.qtgui_time_sink_x_0_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, 0, "")
        self.qtgui_time_sink_x_0_0.enable_autoscale(False)
        self.qtgui_time_sink_x_0_0.enable_grid(False)
        self.qtgui_time_sink_x_0_0.enable_axis_labels(True)
        self.qtgui_time_sink_x_0_0.enable_control_panel(False)
        self.qtgui_time_sink_x_0_0.enable_stem_plot(False)

        if not True:
          self.qtgui_time_sink_x_0_0.disable_legend()

        labels = ['', '', '', '', '',
                  '', '', '', '', '']
        widths = [1, 1, 1, 1, 1,
                  1, 1, 1, 1, 1]
        colors = ["blue", "red", "green", "black", "cyan",
                  "magenta", "yellow", "dark red", "dark green", "blue"]
        styles = [1, 1, 1, 1, 1,
                  1, 1, 1, 1, 1]
        markers = [-1, -1, -1, -1, -1,
                   -1, -1, -1, -1, -1]
        alphas = [1.0, 1.0, 1.0, 1.0, 1.0,
                  1.0, 1.0, 1.0, 1.0, 1.0]

        for i in xrange(2):
            if len(labels[i]) == 0:
                if(i % 2 == 0):
                    self.qtgui_time_sink_x_0_0.set_line_label(i, "Re{{Data {0}}}".format(i/2))
                else:
                    self.qtgui_time_sink_x_0_0.set_line_label(i, "Im{{Data {0}}}".format(i/2))
            else:
                self.qtgui_time_sink_x_0_0.set_line_label(i, labels[i])
            self.qtgui_time_sink_x_0_0.set_line_width(i, widths[i])
            self.qtgui_time_sink_x_0_0.set_line_color(i, colors[i])
            self.qtgui_time_sink_x_0_0.set_line_style(i, styles[i])
            self.qtgui_time_sink_x_0_0.set_line_marker(i, markers[i])
            self.qtgui_time_sink_x_0_0.set_line_alpha(i, alphas[i])

        self._qtgui_time_sink_x_0_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0_0.pyqwidget(), Qt.QWidget)
        self.top_grid_layout.addWidget(self._qtgui_time_sink_x_0_0_win)
        self.qtgui_time_sink_x_0 = qtgui.time_sink_c(
        	1024, #size
        	samp_rate, #samp_rate
        	"", #name
        	1 #number of inputs
        )
        self.qtgui_time_sink_x_0.set_update_time(0.10)
        self.qtgui_time_sink_x_0.set_y_axis(-1, 1)

        self.qtgui_time_sink_x_0.set_y_label('Amplitude', "")

        self.qtgui_time_sink_x_0.enable_tags(-1, True)
        self.qtgui_time_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, 0, "")
        self.qtgui_time_sink_x_0.enable_autoscale(False)
        self.qtgui_time_sink_x_0.enable_grid(False)
        self.qtgui_time_sink_x_0.enable_axis_labels(True)
        self.qtgui_time_sink_x_0.enable_control_panel(False)
        self.qtgui_time_sink_x_0.enable_stem_plot(False)

        if not True:
          self.qtgui_time_sink_x_0.disable_legend()

        labels = ['', '', '', '', '',
                  '', '', '', '', '']
        widths = [1, 1, 1, 1, 1,
                  1, 1, 1, 1, 1]
        colors = ["blue", "red", "green", "black", "cyan",
                  "magenta", "yellow", "dark red", "dark green", "blue"]
        styles = [1, 1, 1, 1, 1,
                  1, 1, 1, 1, 1]
        markers = [-1, -1, -1, -1, -1,
                   -1, -1, -1, -1, -1]
        alphas = [1.0, 1.0, 1.0, 1.0, 1.0,
                  1.0, 1.0, 1.0, 1.0, 1.0]

        for i in xrange(2):
            if len(labels[i]) == 0:
                if(i % 2 == 0):
                    self.qtgui_time_sink_x_0.set_line_label(i, "Re{{Data {0}}}".format(i/2))
                else:
                    self.qtgui_time_sink_x_0.set_line_label(i, "Im{{Data {0}}}".format(i/2))
            else:
                self.qtgui_time_sink_x_0.set_line_label(i, labels[i])
            self.qtgui_time_sink_x_0.set_line_width(i, widths[i])
            self.qtgui_time_sink_x_0.set_line_color(i, colors[i])
            self.qtgui_time_sink_x_0.set_line_style(i, styles[i])
            self.qtgui_time_sink_x_0.set_line_marker(i, markers[i])
            self.qtgui_time_sink_x_0.set_line_alpha(i, alphas[i])

        self._qtgui_time_sink_x_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0.pyqwidget(), Qt.QWidget)
        self.top_grid_layout.addWidget(self._qtgui_time_sink_x_0_win)
        self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, samp_rate,True)
        self.analog_sig_source_x_0 = analog.sig_source_c(samp_rate, analog.GR_COS_WAVE, 111, 1, 0)



        ##################################################
        # Connections
        ##################################################
        self.connect((self.analog_sig_source_x_0, 0), (self.blocks_throttle_0, 0))
        self.connect((self.blocks_throttle_0, 0), (self.uhd_rfnoc_split_stream_0, 0))
        self.connect((self.uhd_rfnoc_split_stream_0, 0), (self.qtgui_time_sink_x_0, 0))
        self.connect((self.uhd_rfnoc_split_stream_0, 1), (self.qtgui_time_sink_x_0_0, 0))

    def closeEvent(self, event):
        self.settings = Qt.QSettings("GNU Radio", "splitstream_test")
        self.settings.setValue("geometry", self.saveGeometry())
        event.accept()

    def get_variable_uhd_device3_0(self):
        return self.variable_uhd_device3_0

    def set_variable_uhd_device3_0(self, variable_uhd_device3_0):
        self.variable_uhd_device3_0 = variable_uhd_device3_0

    def get_samp_rate(self):
        return self.samp_rate

    def set_samp_rate(self, samp_rate):
        self.samp_rate = samp_rate
        self.qtgui_time_sink_x_0_0.set_samp_rate(self.samp_rate)
        self.qtgui_time_sink_x_0.set_samp_rate(self.samp_rate)
        self.blocks_throttle_0.set_sample_rate(self.samp_rate)
        self.analog_sig_source_x_0.set_sampling_freq(self.samp_rate)


def main(top_block_cls=splitstream_test, options=None):

    from distutils.version import StrictVersion
    if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"):
        style = gr.prefs().get_string('qtgui', 'style', 'raster')
        Qt.QApplication.setGraphicsSystem(style)
    qapp = Qt.QApplication(sys.argv)

    tb = top_block_cls()
    tb.start()
    tb.show()

    def quitting():
        tb.stop()
        tb.wait()
    qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting)
    qapp.exec_()


if __name__ == '__main__':
    main()
#include <uhd/utils/thread.hpp>
#include <uhd/utils/safe_main.hpp>
#include <uhd/device3.hpp>
#include <uhd/rfnoc/source_block_ctrl_base.hpp>
//#include <uhd/rfnoc/window_block_ctrl.hpp>
//#include <uhd/rfnoc/cmultram2fifo_block_ctrl.hpp>
#include <uhd/exception.hpp>
#include <boost/program_options.hpp>
#include <boost/format.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem.hpp>
#include <iostream>
#include <fstream>
#include <csignal>
#include <complex>

namespace po = boost::program_options;

static bool stop_signal_called = false;
void sig_int_handler(int) {
	stop_signal_called = true;
}

template<typename samp_type> void recv_to_file(
    uhd::rx_streamer::sptr rx_stream,
    const std::vector<std::string> &file_vec,
    const unsigned long long num_requested_samples,
	uhd::time_spec_t start_time
) {
	uhd::set_thread_priority_safe();
	
	size_t num_chan = rx_stream->get_num_channels();
	if (file_vec.size() != num_chan) {
		throw std::runtime_error("Mismatched size between file vector and number of stream channels");
	}

	const size_t samps_per_buff = rx_stream->get_max_num_samps();

	// Allocate buffers to receive samples (one buffer per channel)
	std::vector<std::vector<samp_type> > buffs(num_chan, std::vector<samp_type>(samps_per_buff));
	
	// Create a vector of pointers to point to each of the channel buffers
	std::vector<samp_type*> buff_ptrs;
	for (size_t i = 0; i < num_chan; i++) buff_ptrs.push_back(&buffs[i].front());
	
	// Create a separate output file for each channel
	std::ofstream outfile[num_chan];
	for (size_t i=0; i<num_chan; i++) {
		std::string fname = file_vec[i];
		outfile[i].open(fname.c_str(), std::ofstream::binary);
	}
	

	uhd::rx_metadata_t md;
	
	//setup streaming
	uhd::stream_cmd_t stream_cmd((num_requested_samples >= 0)?
	                             uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS:
	                             uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE
	                            );
	stream_cmd.num_samps = size_t(num_requested_samples);
	if (num_chan > 1) {
		stream_cmd.stream_now = false;
		stream_cmd.time_spec = start_time;
	} else {
		stream_cmd.stream_now = true;
	}

	std::cout << "Rx: Issueing start stream cmd\n";
	rx_stream->issue_stream_cmd(stream_cmd);

	unsigned long long num_total_samps = 0;

	while (not stop_signal_called and (num_requested_samples > num_total_samps or num_requested_samples == 0)) {
		size_t num_rx_samps = rx_stream->recv(buff_ptrs, samps_per_buff, md, 1.0);

		std::cout << ("Rx: recv() samps: "+std::to_string(num_rx_samps)+", EOB: "+std::to_string(md.end_of_burst)+"\n");
		switch (md.error_code) {
			case uhd::rx_metadata_t::ERROR_CODE_NONE:
				break;
			case uhd::rx_metadata_t::ERROR_CODE_OVERFLOW:
				break;
			default:
				std::string error = str(boost::format("Receiver error: %s") % md.strerror());
				throw std::runtime_error(error);
		}

		num_total_samps += num_rx_samps;
		
		// Write the data to the output files
		for (size_t i=0; i<num_chan; i++) {
			if (outfile[i].is_open()) outfile[i].write((const char*)buff_ptrs[i], num_rx_samps*sizeof(samp_type));
		}
	}

	std::cout << "Rx: Issueing stop stream cmd\n";
	stream_cmd.stream_mode = uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS;
	rx_stream->issue_stream_cmd(stream_cmd);

	// Run recv until nothing is left
	while (not md.end_of_burst and  md.error_code == uhd::rx_metadata_t::ERROR_CODE_NONE) {
		size_t num_post_samps = rx_stream->recv(buff_ptrs, samps_per_buff, md, 0.1);
		std::cout << ("Rx: flush samps: " + std::to_string(num_post_samps) + "\n");
	}

	// Close the output files
	for (size_t i=0; i<num_chan; i++){
		if (outfile[i].is_open()) outfile[i].close();
	}
	
	std::cout << "Rx: Streaming complete\n";
}


template<typename samp_type> void send_from_buff(
    uhd::tx_streamer::sptr tx_stream,
    std::vector<samp_type> &buff
)
{
	uhd::set_thread_priority_safe();

	uhd::tx_metadata_t md;
	md.has_time_spec = false;
	md.start_of_burst = true;

	const size_t tx_spp = tx_stream->get_max_num_samps();

	std::vector<samp_type *> buffs(1, &buff.front());

	unsigned long long samps_requested = buff.size();
	unsigned long long samps_sent = 0;

	std::cout << ("Tx: Streaming started for " + std::to_string(samps_requested) + " samps, spp = " + std::to_string(tx_spp) + "\n");
	while (not stop_signal_called and (samps_sent < samps_requested)) {
		unsigned long long samps_remaining = samps_requested - samps_sent;
		size_t num_this_time = (samps_remaining> tx_spp) ? tx_spp : samps_remaining;
		if (samps_remaining==num_this_time) md.end_of_burst = true;

		buffs[0] = &buff[samps_sent];
		tx_stream->send(buffs, num_this_time, md, 1);
		std::cout << ("Tx: send() samps: "+std::to_string(num_this_time)+", EOB: "+std::to_string(md.end_of_burst)+"\n");

		samps_sent += num_this_time;

		md.start_of_burst = false;
	}
	std::cout << "Tx: Streaming complete\n";
}

template<typename samp_type> std::vector<samp_type> read_file_to_buff(
    std::string fname,
    size_t bytes_per_samp)
{
	std::ifstream infile(fname.c_str(), std::ifstream::binary);

	//If file could be opened, continue
	if (infile.is_open()) {

		//Determine number of samples in file
		size_t num_file_bytes = boost::filesystem::file_size(fname);
		size_t num_tx_samps =  num_file_bytes / bytes_per_samp;

		//Check that file size matches expected
		if (num_tx_samps*bytes_per_samp != num_file_bytes) {
			throw std::runtime_error("File " + fname + ": size is not divisible by sample type");
		}

		//Read file into buffer
		std::vector<samp_type> ret(num_tx_samps);
		infile.read((char*)&ret.front(), num_file_bytes);
		if (infile.gcount() != num_file_bytes) {
			infile.close();
			throw std::runtime_error("File " + fname + ": didn't read correct number of bytes from file");
		}
		infile.close();

		//Print out on successful file read.
		std::cout << "\tFile " << fname << ": Num samps= " << num_tx_samps << std::endl;
		return (ret);
	}

	//If file could not be opened, return error and exit.
	else {
		std::vector<samp_type> ret;
		std::cout << "\tFile " << fname << ": could not open file" <<std::endl;
		return (ret);
	}
}



int UHD_SAFE_MAIN(int argc, char *argv[])
{
	uhd::set_thread_priority_safe();

	//variables to be set by po
	std::string args, file, format, wirefmt, streamargs, block_id, file_tx, file_coeff;
	size_t total_num_samps, tx_spp, rx_spp;
	short const_real, const_imag;
	double total_time;
	bool single_rx_thread;
	
	//setup the program options
	po::options_description desc("Allowed options");
	desc.add_options()
	("help", "help message")
	("args", po::value<std::string>(&args)->default_value(""), "USRP device address args")

	("file", po::value<std::string>(&file)->default_value("usrp_samples.dat"), "name of the file to write binary samples to")
	("format", po::value<std::string>(&format)->default_value("sc16"), "File sample format: sc16, fc32, or fc64")
	("duration", po::value<double>(&total_time)->default_value(0), "total number of seconds to receive")
	("nsamps", po::value<size_t>(&total_num_samps)->default_value(1024), "total number of samples to receive")
	("tx-spp", po::value<size_t>(&tx_spp)->default_value(256), "Tx samples per packet")
	("rx-spp", po::value<size_t>(&rx_spp)->default_value(256), "Rx samples per packet")

	("const-real", po::value<short>(&const_real)->default_value(100),"Tx constant value (real)")
	("const-imag", po::value<short>(&const_imag)->default_value(-200),"Tx constant value (imag)")
	("file-tx", po::value<std::string>(&file_tx),"Optional file to use as source of Tx samples")
	("file-coeff", po::value<std::string>(&file_coeff),"Optional file to use as source of block coefficients")
	
	("single-rx-thread", po::value<bool>(&single_rx_thread)->default_value(false),"Use single rx thread for all channels")
	;
	po::variables_map vm;
	po::store(po::parse_command_line(argc, argv, desc), vm);
	po::notify(vm);

	//print the help message
	if (vm.count("help")) {
		std::cout << boost::format("UHD/RFNoC SplitStream test %s") % desc << std::endl;
		std::cout
		        << std::endl
		        << "This application streams data to the splitstream block and captures the two outputs.\n"
		        << std::endl;
		return ~0;
	}

	if (format != "sc16" and format != "fc32" and format != "fc64") {
		std::cout << "Invalid sample format: " << format << std::endl;
		return EXIT_FAILURE;
	}
	

	// Create vector of samples to send to the block
	std::vector<std::complex<short> > buff;
	if (vm.count("file-tx")) {
		buff = read_file_to_buff<std::complex<short> >(file_tx,4);
	}
	else {
		std::complex<short> val(const_real,const_imag);
		buff = std::vector<std::complex<short> >(total_num_samps,val);
	}


	/************************************************************************
	 * Create device and block controls
	 ***********************************************************************/
	std::cout << std::endl;
	std::cout << boost::format("Creating the USRP device with: %s...") % args << std::endl;
	uhd::device3::sptr usrp = uhd::device3::make(args);

	std::cout << std::endl;


	//uhd::rfnoc::graph::sptr rx_graph = usrp->create_graph("rfnoc_rx_to_file");
	usrp->clear();  // not sure what this does

	block_id = "SplitStream_0";

	if (not usrp->has_block(block_id)) {
		std::cout << "Block does not exist on current device: " << block_id << std::endl;
		return EXIT_FAILURE;
	}

	auto blk_ctrl = usrp->get_block_ctrl<uhd::rfnoc::block_ctrl_base>(block_id);

	std::cout << "Clearing block: " << blk_ctrl->get_block_id() << std::endl;
	blk_ctrl->clear();


	size_t Num_rx = 2;

	// Configure rx stream args and create a receive streamer
	uhd::stream_args_t rx_stream_args(format, "sc16");
	rx_stream_args.args["spp"] = std::to_string(rx_spp);

	std::vector<uhd::rx_streamer::sptr> rx_stream_vec;

	if (single_rx_thread) {
		std::vector<size_t> chan;
		for (size_t i=0; i<Num_rx; i++) {
			rx_stream_args.args["block_id"+std::to_string(i)] = blk_ctrl->get_block_id().to_string();
			rx_stream_args.args["block_port"+std::to_string(i)] = std::to_string(i);
			chan.push_back(i);
		}
		rx_stream_args.channels = chan;
		std::cout << "Using rx streamer args: " << rx_stream_args.args.to_string() << std::endl;

		rx_stream_vec.push_back(usrp->get_rx_stream(rx_stream_args));
		//std::cout << "Rx num channels: " << rx_stream_vec[0]->get_num_channels() << std::endl;
	} else {
		for (size_t i=0; i<Num_rx; i++) {
			rx_stream_args.args["block_id"] = blk_ctrl->get_block_id().to_string();
			rx_stream_args.args["block_port"] = std::to_string(i);
			std::cout << "Using rx streamer args: " << rx_stream_args.args.to_string() << std::endl;
			rx_stream_vec.push_back(usrp->get_rx_stream(rx_stream_args));
		}
	}


	// Configure tx stream args
	uhd::stream_args_t tx_stream_args("sc16", "sc16");
	tx_stream_args.args["spp"] = std::to_string(tx_spp);
	tx_stream_args.args["block_id"] = blk_ctrl->get_block_id().to_string();
	tx_stream_args.args["block_port"] = "0"; //std::to_string(block_port);
	std::cout << "Using tx streamer args: " << tx_stream_args.args.to_string() << std::endl;
	std::cout << std::endl;

	// create a transmit streamer
	uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(tx_stream_args);

	if (total_num_samps == 0) {
		std::signal(SIGINT, &sig_int_handler);
		std::cout << "Press Ctrl + C to stop streaming..." << std::endl;
	}
	
	// Create two file names from the base filename provided
	std::vector<std::string> file_vec;
	for (size_t i=0; i<Num_rx; i++) file_vec.push_back(file + "." + std::to_string(i));


	uhd::time_spec_t start_time = usrp->get_tree()->access<uhd::time_spec_t>("/mboards/0/time/now").get() + 0.2;
	
	boost::thread_group tg;

	for (size_t i=0; i<rx_stream_vec.size(); i++) {
		auto rx_stream = rx_stream_vec[i];
		
		std::vector<std::string> file_tmp;
		if (single_rx_thread) {
			file_tmp = file_vec;
		} else {
			file_tmp = std::vector<std::string>(1,file_vec[i]);
		}

		// start the rx thread
		if (format == "fc64")
			tg.create_thread(boost::bind(&recv_to_file<std::complex<double> >,rx_stream,file_tmp,total_num_samps,start_time));
		else if (format == "fc32")
			tg.create_thread(boost::bind(&recv_to_file<std::complex<float> >,rx_stream,file_tmp,total_num_samps,start_time));
		else if (format == "sc16")
			tg.create_thread(boost::bind(&recv_to_file<std::complex<short> >,rx_stream,file_tmp,total_num_samps,start_time));
		else
			throw std::runtime_error("Unknown data format: " + format);
	}

	// start the tx thread
	tg.create_thread(boost::bind(&send_from_buff<std::complex<short> >,tx_stream,buff));

	// wait for both tx and rx threads to complete
	tg.join_all();

	// The following 'blk_ctrl->clear()' is causing an AssertionError (as program terminates - likely in
	// one of the destructors) so it is presently commented out.  No idea why??
	//    AssertionError: [0/SplitStream_0] Attempting to disconnect input port 0, which is not registered as connected!
	//std::cout << "Clearing block: " << blk_ctrl->get_block_id() << std::endl;
	//blk_ctrl->clear();

	//finished
	std::cout << std::endl << "Done!" << std::endl << std::endl;

	return EXIT_SUCCESS;
}
_______________________________________________
USRP-users mailing list
[email protected]
http://lists.ettus.com/mailman/listinfo/usrp-users_lists.ettus.com

Reply via email to