Hi all,
My lab is doing experiments involving sweeping instrument outputs and reading
back data. This is currently done using Keysight's proprietary Labber software
that uses instrument drivers written in python to configure connections. The
driver files can be found here https://github.com/Labber-software/Drivers . If
I can port the driver for the Agilent 5230 network analyzer (attached) I may be
able to switch from Keysight to sigrok.
I'm hoping to avoid rewriting the existing drivers in C. Does sigrok have any
support built in for drivers in python? Has anyone done a similar port that I
can look at for reference?
Thanks,
Theo
#!/usr/bin/env python
from VISA_Driver import VISA_Driver
import numpy as np
import os.path
__version__ = "0.0.1"
class Error(Exception):
pass
class Driver(VISA_Driver):
""" This class implements the Agilent 5230 PNA driver"""
def performOpen(self, options={}):
"""Perform the operation of opening the instrument connection"""
# init meas param dict
self.dMeasParam = {}
# calling the generic VISA open to make sure we have a connection
VISA_Driver.performOpen(self, options=options)
# do perform get value for acquisition mode
def performSetValue(self, quant, value, sweepRate=0.0, options={}):
"""Perform the Set Value instrument operation. This function should
return the actual value set by the instrument"""
if self.isFinalCall(options) and self.getValue('Sweep type') ==
'Lorentzian':
# get parameters
centerFreq = self.getValue('Center frequency')
qEst = self.getValue('Q Value')
thetaMax = self.getValue('Maximum Angle')
numPoints = self.getValue('# of points')
# calculate distribution
frequencies = self.calcLorentzianDistr(thetaMax, numPoints,
qEst, centerFreq)
data = []
for freq in frequencies:
data.append('1')
data.append('1')
data.append(str(freq))
data.append(str(freq))
dataset = ','.join(data)
self.writeAndLog('SENS:SEGM:LIST SSTOP, %s, %s' % (numPoints,
dataset))
# update visa commands for triggers
if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled',
'S22 - Enabled'):
if self.getModel() in ('E5071C',):
# new trace handling, use trace numbers, set all at once
lParam = ['S11', 'S21', 'S12', 'S22']
dParamValue = dict()
for param in lParam:
dParamValue[param] = self.getValue('%s - Enabled' % param)
dParamValue[quant.name[:3]] = value
# add parameters, if enabled
self.dMeasParam = dict()
for (param, enabled) in dParamValue.items():
if enabled:
nParam = len(self.dMeasParam)+1
self.writeAndLog(":CALC:PAR%d:DEF %s" %
(nParam, param))
self.dMeasParam[param] = nParam
# set number of visible traces
self.writeAndLog(":CALC:PAR:COUN %d" % len(self.dMeasParam))
else:
# get updated list of measurements in use
self.getActiveMeasurements()
param = quant.name[:3]
# old-type handling of traces
if param in self.dMeasParam:
# clear old measurements for this parameter
for name in self.dMeasParam[param]:
self.writeAndLog("CALC:PAR:DEL '%s'" % name)
# create new measurement, if enabled is true
if value:
newName = 'LabC_%s' % param
self.writeAndLog("CALC:PAR:EXT '%s','%s'" % (newName,
param))
# show on PNA screen
iTrace = 1 + ['S11', 'S21', 'S12', 'S22'].index(param)
# sPrev = self.askAndLog('DISP:WIND:CAT?')
# if sPrev.find('EMPTY')>0:
# # no previous traces
# iTrace = 1
# else:
# # previous traces, add new
# lTrace = sPrev[1:-1].split(',')
# iTrace = int(lTrace[-1]) + 1
self.writeAndLog("DISP:WIND:TRAC%d:FEED '%s'" % (iTrace,
newName))
# add to dict with list of measurements
self.dMeasParam[param] = [newName]
elif quant.name in ('Wait for new trace',):
# do nothing
pass
elif quant.name in ('Range type',):
# change range if single point
if value == 'Single frequency':
self.writeAndLog(':SENS:FREQ:SPAN 0')
self.writeAndLog(':SENS:SWE:POIN 1')
elif quant.name in ('Sweep type'):
# if linear:
if self.getValue('Sweep type') == 'Linear':
self.writeAndLog(':SENS:SWE:TYPE LIN')
#if log:
elif self.getValue('Sweep type') == 'Log':
self.writeAndLog(':SENS:SWE:TYPE LOG')
# if Lorentzian:
elif self.getValue('Sweep type') == 'Lorentzian':
# prepare VNA for segment sweep
self.writeAndLog(':SENS:SWE:TYPE SEGM')
self.writeAndLog('DISP:WIND:TABL SEGM')
else:
# run standard VISA case
value = VISA_Driver.performSetValue(self, quant, value, sweepRate,
options)
return value
def performGetValue(self, quant, options={}):
"""Perform the Get Value instrument operation"""
# check type of quantity
if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled',
'S22 - Enabled'):
# update list of channels in use
self.getActiveMeasurements()
# get selected parameter
param = quant.name[:3]
value = (param in self.dMeasParam)
elif quant.name in ('S11 - Value', 'S21 - Value', 'S12 - Value', 'S22 -
Value'):
# read trace, return averaged data
data = self.readValueFromOther(quant.name[:3])
return np.mean(data['y'])
elif quant.name in ('S11', 'S21', 'S12', 'S22'):
# check if channel is on
if quant.name not in self.dMeasParam:
# get active measurements again, in case they changed
self.getActiveMeasurements()
if quant.name in self.dMeasParam:
if self.getModel() in ('E5071C',):
# new trace handling, use trace numbers
self.writeAndLog("CALC:PAR%d:SEL" %
self.dMeasParam[quant.name])
else:
# old parameter handing, select parameter (use last in list)
sName = self.dMeasParam[quant.name][-1]
self.writeAndLog("CALC:PAR:SEL '%s'" % sName)
# if not in continous mode, trig from computer
bWaitTrace = self.getValue('Wait for new trace')
bAverage = self.getValue('Average')
# wait for trace, either in averaging or normal mode
if bWaitTrace:
if bAverage:
# set channels 1-4 to set event when average complete
(bit 1 start)
self.writeAndLog(':SENS:AVER:CLE;:STAT:OPER:AVER1:ENAB
30;:ABOR;:SENS:AVER:CLE;')
else:
self.writeAndLog(':ABOR;:INIT:CONT OFF;:INIT:IMM;')
self.writeAndLog('*OPC')
# wait some time before first check
self.wait(0.03)
bDone = False
while (not bDone) and (not self.isStopped()):
# check if done
if bAverage:
sAverage = self.askAndLog('STAT:OPER:AVER1:COND?')
bDone = int(sAverage)>0
else:
stb = int(self.askAndLog('*ESR?'))
bDone = (stb & 1) > 0
if not bDone:
self.wait(0.1)
# if stopped, don't get data
if self.isStopped():
self.writeAndLog('*CLS;:INIT:CONT ON;')
return []
# get data as float32, convert to numpy array
if self.getModel() in ('E5071C',):
# new trace handling, use trace numbers
self.write(':FORM:DATA REAL32;:CALC:SEL:DATA:SDAT?',
bCheckError=False)
else:
# old parameter handing
self.write(':FORM REAL,32;CALC:DATA? SDATA',
bCheckError=False)
sData = self.read(ignore_termination=True)
if bWaitTrace and not bAverage:
self.writeAndLog(':INIT:CONT ON;')
# strip header to find # of points
i0 = sData.find(b'#')
nDig = int(sData[i0+1:i0+2])
nByte = int(sData[i0+2:i0+2+nDig])
nData = int(nByte/4)
nPts = int(nData/2)
# get data to numpy array
vData = np.frombuffer(sData[(i0+2+nDig):(i0+2+nDig+nByte)],
dtype='>f', count=nData)
# data is in I0,Q0,I1,Q1,I2,Q2,.. format, convert to complex
mC = vData.reshape((nPts,2))
vComplex = mC[:,0] + 1j*mC[:,1]
# get start/stop frequencies
centerFreq = self.readValueFromOther('Center frequency')
sweepType = self.readValueFromOther('Sweep type')
# if log scale, take log of start/stop frequencies
logX = (sweepType == 'Log')
lorX = (sweepType == 'Lorentzian')
if lorX:
qEst = self.getValue('Q Value')
thetaMax = self.getValue('Maximum Angle')
numPoints = self.getValue('# of points')
value = quant.getTraceDict(vComplex,
x=self.calcLorentzianDistr(thetaMax, numPoints, qEst, centerFreq))
else:
span = self.readValueFromOther('Span')
startFreq = centerFreq - (span/2)
stopFreq = centerFreq + (span/2)
value = quant.getTraceDict(vComplex, x0=startFreq,
x1=stopFreq,
logX=logX)
else:
# not enabled, return empty array
value = quant.getTraceDict([])
elif quant.name in ('Wait for new trace',):
# do nothing, return local value
value = quant.getValue()
else:
# for all other cases, call VISA driver
value = VISA_Driver.performGetValue(self, quant, options)
return value
def getActiveMeasurements(self):
"""Retrieve and a list of measurement/parameters currently active"""
# proceed depending on model
if self.getModel() in ('E5071C',):
# in this case, meas param is just a trace number
self.dMeasParam = {}
# get number or traces
nTrace = int(self.askAndLog(":CALC:PAR:COUN?"))
# get active trace names, one by one
for n in range(nTrace):
sParam = self.askAndLog(":CALC:PAR%d:DEF?" % (n+1))
self.dMeasParam[sParam] = (n+1)
else:
sAll = self.askAndLog("CALC:PAR:CAT:EXT?")
# strip "-characters
sAll = sAll[1:-1]
# parse list, format is channel, parameter, ...
self.dMeasParam = {}
lAll = sAll.split(',')
nMeas = len(lAll)//2
for n in range(nMeas):
sName = lAll[2*n]
sParam = lAll[2*n + 1]
if sParam not in self.dMeasParam:
# create list with current name
self.dMeasParam[sParam] = [sName,]
else:
# add to existing list
self.dMeasParam[sParam].append(sName)
# helper function to calculate Lorentzian frequency distribution
def calcLorentzianDistr(self, thetaMax, numPoints, qEst, centerFreq):
theta = np.linspace(-thetaMax, thetaMax, numPoints)
freq = np.multiply(centerFreq, (1 - np.multiply(1 / (2*qEst),
np.tan(np.divide(theta, 2)))))
return freq
if __name__ == '__main__':
pass
# Instrument driver configuration file.
[General settings]
# The name is shown in all the configuration windows
name: Agilent Network Analyzer
# The version string should be updated whenever changes are made to this config
file
version: 2.0
# Name of folder containing the code defining a custom driver. Do not define
this item
# or leave it blank for any standard driver based on the built-in VISA
interface.
driver_path: Agilent_NetworkAnalyzer
# default interface and address
interface: TCPIP
[Model and options]
# The option section allow instruments with different options to use the same
driver
# List of models supported by this driver
model_str_1: N5230
model_str_2: E8364B
model_str_3: E5071C
model_str_4: E5063A
model_str_5: N5232
model_str_6: N5222
model_str_7: N5231
# Check instrument model id at startup (True or False). Default is False
check_model: True
# Valid model strings returned by the instrument. Default value = model_str
model_id_1: N5230
model_id_2: E8364B
model_id_3: E5071C
model_id_4: E5063A
model_id_5: N5232
model_id_6: N5222
model_id_7: N5231
# General VISA settings for the instrument.
[VISA settings]
# Enable or disable communication over the VISA protocol (True or False)
# If False, the driver will not perform any operations (unless there is a
custom driver).
use_visa = True
# Reset the interface (not the instrument) at startup (True or False). Default
is False
reset: True
# Time (in seconds) before the timing out while waiting for an instrument
response. Default is 5
timeout: 10
# Query instrument errors (True or False). If True, every command sent to the
device will
# be followed by an error query. This is useful when testing new setups, but
may degrade
# performance by slowing down the instrument communication.
query_instr_errors: False
# Bit mask for checking status byte errors (default is 255, include all errors)
# The bits signal the following errors:
# 0: Operation
# 1: Request control
# 2: Query error
# 3: Device error
# 4: Execution error
# 5: Command error
# 6: User request
# 7: Power on
error_bit_mask: 255
# SCPI string to be used when querying for instrument error messages
error_cmd:
# Initialization commands are sent to the instrument when starting the driver
# *RST will reset the device, *CLS clears the interface
init: :INIT:CONT ON;:FORM:BORD NORM;
# Boolean string values (used for sending True/False to instrument), default is
1 and 0
#str_true: ON
#str_false: OFF
# Final commands sent to the instrument when closing the driver
final:
# Define quantities in sections. This list is a selection of allowed keywords,
# see the manual for a full list of options
# datatype: The datatype should be one of DOUBLE, BOOLEAN, COMBO,
# STRING, COMPLEX, VECTOR, VECTOR_COMPLEX, PATH or BUTTON.
# unit: Quantity unit
# set_cmd: Command used to send data to the instrument. Put <*> where
the value should appear.
# get_cmd: Command used to get the data from the instrument. Default is
set_cmd?
# def_value: Default value
# low_lim: Lowest allowable value. Defaults to -INF
# high_lim: Highest allowable values. Defaults to +INF
# combo_def_1: First option in a pull-down combo box. Only used when
datatype=COMBO
# combo_def_2: Second option in a pull-down combo box. Only used when
datatype=COMBO
# ...
# combo_def_n: nth option in a pull-down combo box. Only used when
datatype=COMBO
# state_quant: Quantity that determines this control's visibility
# state_value_1: Value of "state_quant" for which the control is visible
# state_value_2: Value of "state_quant" for which the control is visible
# ...
# state_value_n: Value of "state_quant" for which the control is visible
# permission: Sets read/writability, options are BOTH, READ, WRITE or
NONE. Default is BOTH
# group: Name of the group where the control belongs.
# section: Name of the section where the control belongs.
[S11 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals
[S21 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals
[S12 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals
[S22 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals
[S11]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals
[S21]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals
[S12]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals
[S22]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals
[S11 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals
[S21 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals
[S12 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals
[S22 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals
[Output enabled]
datatype: BOOLEAN
def_value: False
set_cmd: :OUTP
group: Output
[Output power]
datatype: DOUBLE
def_value: -30.0
unit: dBm
set_cmd: :SOUR:POW
group: Output
[IF bandwidth]
datatype: DOUBLE
def_value: 10E3
unit: Hz
set_cmd: :SENS:BWID
group: Acquisition
[Average]
datatype: BOOLEAN
def_value: False
set_cmd: :SENS:AVER
group: Acquisition
[# of averages]
datatype: DOUBLE
def_value: 10
low_lim: 1
high_lim: 65536
set_cmd: :SENS:AVER:COUN
group: Acquisition
[Wait for new trace]
datatype: BOOLEAN
def_value: False
group: Acquisition
[Range type]
datatype: COMBO
def_value: Start - Stop
combo_def_1: Center - Span
combo_def_2: Start - Stop
combo_def_3: Single frequency
permission: WRITE
group: Horizontal
[Start frequency]
datatype: DOUBLE
def_value: 4E9
unit: Hz
set_cmd: :SENS:FREQ:STAR
state_quant: Range type
state_value_1: Start - Stop
group: Horizontal
[Stop frequency]
datatype: DOUBLE
def_value: 12E9
unit: Hz
set_cmd: :SENS:FREQ:STOP
state_quant: Range type
state_value_1: Start - Stop
group: Horizontal
[Center frequency]
datatype: DOUBLE
def_value: 8E9
unit: Hz
set_cmd: :SENS:FREQ:CENT
state_quant: Range type
state_value_1: Center - Span
state_value_2: Single frequency
group: Horizontal
[Span]
datatype: DOUBLE
def_value: 8E9
unit: Hz
set_cmd: :SENS:FREQ:SPAN
state_quant: Range type
state_value_1: Center - Span
group: Horizontal
[# of points]
datatype: DOUBLE
def_value: 201
set_cmd: :SENS:SWE:POIN
state_quant: Range type
state_value_1: Center - Span
state_value_2: Start - Stop
group: Horizontal
[Sweep type]
datatype: COMBO
def_value: Linear
combo_def_1: Linear
combo_def_2: Log
combo_def_3: Lorentzian
group: Horizontal
[Q Value]
datatype: DOUBLE
def_value: 1.0
group: Horizontal
state_quant: Sweep type
state_value_1: Lorentzian
[Maximum Angle]
datatype: DOUBLE
unit: rad
def_value: 0.0
group: Horizontal
state_quant: Sweep type
state_value_1: Lorentzian
[Sweep mode]
datatype: COMBO
def_value: Stepped
combo_def_1: Stepped
combo_def_2: Analog
cmd_def_1: STEP
cmd_def_2: ANAL
set_cmd: :SENS:SWE:GEN
group: Horizontal
_______________________________________________
sigrok-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/sigrok-devel