| happy to share. haven’t put in github because (a) i need to learn git; and (b) controller i use is now obsolete (but is a useful example). copy of data service code attached i have an ‘aquagauge' controller with serial interface to host and wireless connections to up to 8 sensors. i am using just one sensor at present, a pressure sensor secured to river bottom that is connected to a box that converts water column pressure to water depth with wireless transmitter to the controller (in principle much the same as ultrasonic sensor fixed above river measuring distance to water)
-- it follows the weewx acquisition data service pattern:
i could have jammed the measurements into an existing data_type in LOOP packet (e.g. ‘soilMoist3’) and interpreted that data_type downstream (e.g. soilMoist3 graph has label River Level) but instead i created a new data_type ‘riverLevel’ (i.e. a new database column) some interesting design considerations:
g-eddy You received this message because you are subscribed to the Google Groups "weewx-user" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/weewx-user/638DA29F-AFC3-431E-8743-518ED26D4899%40gmail.com. |
# Copyright (c) 2015-20 Graham Eddy <[email protected]> # Distributed under the terms of the GNU Public License (GPLv3) """ aqua module provides weewx data service for Aquagauge controller.
AquagaugeSvc: class providing data service for Aquagauge data
Aquagauge: class facing Aquagauge controller serial interface
"""
import sys
if sys.version_info[0] < 3:
raise ImportError("requires python3")
import queue
import logging
import serial
import threading
import time
import weewx
import weewx.units
from weewx.engine import StdService
log = logging.getLogger(__name__)
version = "3.1"
class AquagaugeSvc(StdService):
"""service that inserts data from Aquagauge controller into LOOP packets
pseudo-driver for aquagauge controller, which supports up to 8 wireless
sensors, that inserts its values into next LOOP record
weewx.conf configuration:
[Aquagauge]
port = /dev/ttyUSB0 # serial port device; no default
speed = 2400 # baud; port speed; default=2400
open_attempts_max = 4 # max no open attempts; default=4
open_attempts_delay = 2 # secs; delay before re-open; default=2
open_attempts_delay_long = 1800 # secs; long attempts delay; def=1800
# if opening port fails, retry open up to {open_attempts_max} times,
# with {open_attempts_delay} secs between attempts. if even this
# fails, back off for a further {open_attempts_delay_long} secs
# before trying all over again. keep trying until port open
# succeeds. # read failure is handled by closing and re-opening the
# port.
[[ 0 ]] # sensor #0; default=disabled
data_type = outTemp # data_type in LOOP record to be written
unit = degree_C # unit of driver's provided value; def=raw
[[ 1 ]] ... # up to sensor #7...
# at least one sensor must be enabled i.e. have {data_type} defined
# otherwise the service exits.
# service also exits if a mandatory config parameter is absent or
# any provided config parameter is invalid
key implementation notes:
* port i/o confined to its own, separate thread
"""
def __init__(self, engine, config_dict):
super(AquagaugeSvc, self).__init__(engine, config_dict)
self.data_types = None # data_type, indexed by sensor_id
self.units = None # unit, indexed by sensor_id
self.acquirer = None
self.q = queue.Queue()
self.stop = threading.Event() # signal all threads to stop
try: # set up
log.info(f"{self.__class__.__name__}: starting (version {version})")
# configuration
aqua_dict = None
try:
aqua_dict = config_dict['Aquagauge'] # mandatory
self.data_types = [None for _i in range(Aquagauge.MAX_SENSORS)]
self.units = self.data_types[:]
sensor_count = 0
for sensor_id, sensor_dict in aqua_dict.items():
if isinstance(sensor_dict, dict):
sensor_id = int(sensor_id)
if 'data_type' in sensor_dict:
self.data_types[sensor_id] = sensor_dict['data_type']
sensor_count += 1
if 'unit' in sensor_dict:
self.units[sensor_id] = sensor_dict['unit']
if sensor_count <= 0:
log.error(f"{self.__class__.__name__}: no sensors enabled")
raise RuntimeError("no sensors")
# handle configuration error
except IndexError:
log.error(f"{self.__class__.__name__}: config: bad sensor_id")
raise
except KeyError as e:
log.error(f"{self.__class__.__name__}: config: lacking {e.args[0]}")
raise
except RuntimeError as e:
log.error(f"{self.__class__.__name__}: config: {e.args[0]}")
raise
except (TypeError, ValueError) as e:
log.error(f"{self.__class__.__name__}: config: bad value {e.args[0]}")
raise
# spawn acquirer thread
self.acquirer = Aquagauge(aqua_dict)
try:
t = threading.Thread(target=self.acquirer.run, args=(self.q, self.stop))
t.start()
except threading.ThreadError as e:
log.error(f"{self.__class__.__name__}: thread failed: {repr(e)}")
raise
# start listening to new packets
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_record)
log.info(f"{self.__class__.__name__} started (version {version}):"
f" {sensor_count} sensors enabled")
except (IndexError, KeyError, RuntimeError, TypeError, ValueError, threading.ThreadError):
log.error(f"{self.__class__.__name__}: not started (version {version})")
def new_loop_record(self, event):
"""handle LOOP record by inserting queued readings"""
readings_count = 0
try:
while not self.q.empty():
reading = self.q.get(block=False)
if reading is None:
# acquirer thread has indicated it is stopping
log.debug(f"{self.__class__.__name__}: acquirer thread stopped")
# weewx has no provision to unregister new_loop_record.
# polling empty queue on each LOOP cycle is harmless
break
self.update(reading, event.packet) # over-write prior is unlikely but ok
self.q.task_done()
readings_count += 1
except queue.Empty:
log.debug(f"{self.__class__.__name__}: queue.Empty")
pass # corner case that can be ignored
if readings_count > 0:
log.info(f"{self.__class__.__name__}: {readings_count} readings found")
else:
log.debug(f"{self.__class__.__name__}: {readings_count} readings found")
def update(self, reading, packet):
"""apply a reading to the packet"""
# update LOOP packet where sensor is enabled and has a value
for sensor_id, value in enumerate(reading):
if value is not None and self.data_types[sensor_id]:
if self.units[sensor_id]:
# convert to internal units
unit_group = weewx.units.obs_group_dict[self.data_types[sensor_id]]
vt = weewx.units.ValueTuple(value, self.units[sensor_id], unit_group)
vh = weewx.units.ValueHelper(vt,
converter=weewx.units.StdUnitConverters[weewx.US])
value = vh.raw
# insert value into packet
log.debug(f"{self.__class__.__name__}.update packet["
f"{self.data_types[sensor_id]}]={value}")
packet[self.data_types[sensor_id]] = value
def shutDown(self):
"""respond to request to shutdown gracefully"""
log.info(f"{self.__class__.__name__}: shutdown")
self.stop.set() # signal all threads to stop
self.acquirer.kill() # poke hard at acquirer thread
class Terminate(Exception):
"""thread has been requested to stop"""
class Aquagauge:
"""driver for acquiring data from Auqagauge controller via serial interface"""
# a reading is a list of observed values, indexed by sensor_id
#
# number of sensors supported by controller
MAX_SENSORS = 8
# input records are formatted as
# reading ::= [ key value ]* "i"
# key ::= "a"|"b"|"c"|"d"|"e"|"f"|"g"|"h"
# value ::= digit [ digit ]*
# digit ::= "0"|"1"|"2"|"3"|"4"|"5"|"6"|"7"|"8"|"9"
# where
# key is sensor_id offset by "a" e.g. c -> 2
# value is positive decimal integer
# there is no white space
KEYS = b'abcdefgh'
END_KEY = b'i'
DIGITS = b'0123456789'
def __init__(self, aqua_dict):
self.port = None
self.speed = None
self.open_attempts_max = None
self.open_attempts_delay = None
self.open_attempts_delay_long = None
try: # throws to service thread
self.port = aqua_dict['port'] # mandatory
self.speed = int(aqua_dict.get('speed', 2400))
self.open_attempts_max = int(aqua_dict.get('open_attempts_max', 4))
self.open_attempts_delay = float(aqua_dict.get('open_attempts_delay', 2.0))
self.open_attempts_delay_long = float(aqua_dict.get('open_attempts_delay_long',
1800.0))
except KeyError as e:
log.error(f"{self.__class__.__name__}: config: lacking {e.args[0]}")
raise
except ValueError as e:
log.error(f"{self.__class__.__name__}: config: bad value {e.args[0]}")
raise
self.f = None
self.timer = None
self.stop = None
def run(self, q, stop):
"""insert readings from Aquagauge driver onto queue"""
log.debug(f"{self.__class__.__name__} start")
self.stop = stop
try:
ch = self.getc()
while not stop.is_set():
reading = [None for _i in range(Aquagauge.MAX_SENSORS)]
while True: # process all fields of one record
# check for completion of a record
if ch == Aquagauge.END_KEY:
q.put(reading)
ch = self.getc()
break # record finished
# identify key
if ch not in Aquagauge.KEYS:
log.warning(f"{self.__class__.__name__}: invalid key {ch}")
ch = self.getc()
continue # field finished (failed)
sensor_id = Aquagauge.KEYS.index(ch)
ch = self.getc()
# calculate value
if ch not in Aquagauge.DIGITS:
log.warning(f"{self.__class__.__name__}: missing value")
continue # field finished (failed)
value = Aquagauge.DIGITS.index(ch)
while True:
ch = self.getc()
if ch not in Aquagauge.DIGITS:
break # value finished (success)
value = 10*value + Aquagauge.DIGITS.index(ch)
reading[sensor_id] = value
except Terminate as e:
log.debug(f"{self.__class__.__name__}: terminated", exc_info=e)
q.put(None) # signal service thread that we are stopping
log.debug(f"{self.__class__.__name__} finish")
def getc(self):
"""get next char, in robust fashion"""
# open device if not already open
if self.f is None:
self.open()
# before potential wait on read, check if we have been asked to stop
if self.stop.is_set():
raise Terminate("stop")
# try to read next char
try:
ch = self.f.read(1)
if ch:
return ch # finished (success)
# EOF, so drop through to error recovery
log.warning(f"{self.__class__.__name__}: EOF")
except serial.SerialException as e:
log.error(f"{self.__class__.__name__}: read error", exc_info=e)
# error recovery = close device and allow it to be re-opened
try:
self.f.close()
except serial.SerialException as e:
log.error(f"{self.__class__.__name__}: close error", exc_info=e)
self.f = None
def open(self):
"""open device, in robust fashion"""
while True: # try forever...
# have a few short attempts
for attempt in range(self.open_attempts_max):
# before potential wait on port open, check if we have been asked to stop
if self.stop.is_set():
raise Terminate("stop")
try:
# 8 bits, 1 stop bit, no parity, no xon/xoff, no rts/dtr
self.f = serial.Serial(port=self.port, baudrate=self.speed)
log.debug(f"{self.__class__.__name__}: {self.port}: open succeeded")
return # finished open (success)
except ValueError as e:
log.error(f"{self.__class__.__name__}: bad port config: {repr(e)}")
except serial.SerialException as e:
log.error(f"{self.__class__.__name__}: {e.args[1]}")
self.delay(self.open_attempts_delay)
# have a long delay and hope for external intervention
log.warning(f"{self.__class__.__name__}: long sleep"
f" waiting for port problem to be cleared")
self.delay(self.open_attempts_delay_long - self.open_attempts_delay)
def delay(self, secs):
"""delay for a while using cancellable timer, or sleep if timer fails"""
def nothing(): pass
try:
self.timer = threading.Timer(secs, nothing)
self.timer.start()
self.timer.join() # cancellable :-)
self.timer = None
except threading.ThreadError as e:
log.debug(f"{self.__class__.__name__}: timer failed {repr(e)}, call sleep")
time.sleep(secs)
def kill(self):
"""make best effort to kill thread"""
if self.timer:
self.timer.cancel()
You received this message because you are subscribed to the Google Groups "weewx-user" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/weewx-user/638DA29F-AFC3-431E-8743-518ED26D4899%40gmail.com. |
