happy to share. haven’t put in github because (a) i need to learn git; and (b) controller i use is now obsolete (but is a useful example). copy of data service code attached

i have an ‘aquagauge' controller with serial interface to host and wireless connections to up to 8 sensors. i am using just one sensor at present, a pressure sensor secured to river bottom that is connected to a box that converts water column pressure to water depth with wireless transmitter to the controller (in principle much the same as ultrasonic sensor fixed above river measuring distance to water)

it follows the weewx acquisition data service pattern:
  • external port (interface to controller) firewalled on other side of a thread to protect main weewx thread
  • service starts by spawning acquisition thread and then binding a callback from weewx for arriving LOOP packets
  • acquisition thread opens port and loops through reading measurements from port and putting them on a queue
  • callback gets any measurements from queue and writes them into current LOOP packet
i could have jammed the measurements into an existing data_type in LOOP packet (e.g. ‘soilMoist3’) and interpreted that data_type downstream (e.g. soilMoist3 graph has label River Level) but instead i created a new data_type ‘riverLevel’ (i.e. a new database column)

some interesting design considerations:
  • open/read failure on port - robustly retry? when to give up, and what to do then?
  • multiple measurements waiting on queue - take last? average them?
  • sensor measurement range - accurately covers [min,max]?
g-eddy

--
You received this message because you are subscribed to the Google Groups "weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [email protected].
To view this discussion on the web visit https://groups.google.com/d/msgid/weewx-user/638DA29F-AFC3-431E-8743-518ED26D4899%40gmail.com.
# Copyright (c) 2015-20 Graham Eddy <[email protected]>
# Distributed under the terms of the GNU Public License (GPLv3)
"""
aqua module provides weewx data service for Aquagauge controller.

AquagaugeSvc: class providing data service for Aquagauge data
Aquagauge: class facing Aquagauge controller serial interface
"""

import sys
if sys.version_info[0] < 3:
    raise ImportError("requires python3")
import queue
import logging
import serial
import threading
import time

import weewx
import weewx.units
from weewx.engine import StdService

log = logging.getLogger(__name__)
version = "3.1"


class AquagaugeSvc(StdService):
    """service that inserts data from Aquagauge controller into LOOP packets

    pseudo-driver for aquagauge controller, which supports up to 8 wireless
    sensors, that inserts its values into next LOOP record

    weewx.conf configuration:
    [Aquagauge]
        port = /dev/ttyUSB0         # serial port device; no default
        speed = 2400                # baud; port speed; default=2400
        open_attempts_max = 4       # max no open attempts; default=4
        open_attempts_delay = 2     # secs; delay before re-open; default=2
        open_attempts_delay_long = 1800 # secs; long attempts delay; def=1800
            # if opening port fails, retry open up to {open_attempts_max} times,
            # with {open_attempts_delay} secs between attempts. if even this
            # fails, back off for a further {open_attempts_delay_long} secs
            # before trying all over again. keep trying until port open
            # succeeds. # read failure is handled by closing and re-opening the
            # port.
        [[ 0 ]]                     # sensor #0; default=disabled
            data_type = outTemp     # data_type in LOOP record to be written
            unit = degree_C         # unit of driver's provided value; def=raw
        [[ 1 ]] ...                 # up to sensor #7...
            # at least one sensor must be enabled i.e. have {data_type} defined
            # otherwise the service exits.
            # service also exits if a mandatory config parameter is absent or
            # any provided config parameter is invalid

    key implementation notes:
    * port i/o confined to its own, separate thread
    """

    def __init__(self, engine, config_dict):
        super(AquagaugeSvc, self).__init__(engine, config_dict)

        self.data_types = None              # data_type, indexed by sensor_id
        self.units = None                   # unit, indexed by sensor_id
        self.acquirer = None
        self.q = queue.Queue()
        self.stop = threading.Event()       # signal all threads to stop

        try:                        # set up
            log.info(f"{self.__class__.__name__}: starting (version {version})")

            # configuration
            aqua_dict = None
            try:
                aqua_dict = config_dict['Aquagauge']    # mandatory

                self.data_types = [None for _i in range(Aquagauge.MAX_SENSORS)]
                self.units = self.data_types[:]
                sensor_count = 0
                for sensor_id, sensor_dict in aqua_dict.items():
                    if isinstance(sensor_dict, dict):
                        sensor_id = int(sensor_id)
                        if 'data_type' in sensor_dict:
                            self.data_types[sensor_id] = sensor_dict['data_type']
                            sensor_count += 1
                        if 'unit' in sensor_dict:
                            self.units[sensor_id] = sensor_dict['unit']
                if sensor_count <= 0:
                    log.error(f"{self.__class__.__name__}: no sensors enabled")
                    raise RuntimeError("no sensors")

            # handle configuration error
            except IndexError:
                log.error(f"{self.__class__.__name__}: config: bad sensor_id")
                raise
            except KeyError as e:
                log.error(f"{self.__class__.__name__}: config: lacking {e.args[0]}")
                raise
            except RuntimeError as e:
                log.error(f"{self.__class__.__name__}: config: {e.args[0]}")
                raise
            except (TypeError, ValueError) as e:
                log.error(f"{self.__class__.__name__}: config: bad value {e.args[0]}")
                raise

            # spawn acquirer thread
            self.acquirer = Aquagauge(aqua_dict)
            try:
                t = threading.Thread(target=self.acquirer.run, args=(self.q, self.stop))
                t.start()
            except threading.ThreadError as e:
                log.error(f"{self.__class__.__name__}: thread failed: {repr(e)}")
                raise

            # start listening to new packets
            self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_record)
            log.info(f"{self.__class__.__name__} started (version {version}):"
                     f" {sensor_count} sensors enabled")

        except (IndexError, KeyError, RuntimeError, TypeError, ValueError, threading.ThreadError):
            log.error(f"{self.__class__.__name__}: not started (version {version})")

    def new_loop_record(self, event):
        """handle LOOP record by inserting queued readings"""

        readings_count = 0
        try:
            while not self.q.empty():
                reading = self.q.get(block=False)
                if reading is None:
                    # acquirer thread has indicated it is stopping
                    log.debug(f"{self.__class__.__name__}: acquirer thread stopped")
                    # weewx has no provision to unregister new_loop_record.
                    # polling empty queue on each LOOP cycle is harmless
                    break

                self.update(reading, event.packet)  # over-write prior is unlikely but ok
                self.q.task_done()
                readings_count += 1
        except queue.Empty:
            log.debug(f"{self.__class__.__name__}: queue.Empty")
            pass        # corner case that can be ignored
        if readings_count > 0:
            log.info(f"{self.__class__.__name__}: {readings_count} readings found")
        else:
            log.debug(f"{self.__class__.__name__}: {readings_count} readings found")

    def update(self, reading, packet):
        """apply a reading to the packet"""

        # update LOOP packet where sensor is enabled and has a value
        for sensor_id, value in enumerate(reading):
            if value is not None and self.data_types[sensor_id]:
                if self.units[sensor_id]:
                    # convert to internal units
                    unit_group = weewx.units.obs_group_dict[self.data_types[sensor_id]]
                    vt = weewx.units.ValueTuple(value, self.units[sensor_id], unit_group)
                    vh = weewx.units.ValueHelper(vt,
                        converter=weewx.units.StdUnitConverters[weewx.US])
                    value = vh.raw

                # insert value into packet
                log.debug(f"{self.__class__.__name__}.update packet["
                          f"{self.data_types[sensor_id]}]={value}")
                packet[self.data_types[sensor_id]] = value

    def shutDown(self):
        """respond to request to shutdown gracefully"""
        log.info(f"{self.__class__.__name__}: shutdown")

        self.stop.set()         # signal all threads to stop
        self.acquirer.kill()    # poke hard at acquirer thread


class Terminate(Exception):
    """thread has been requested to stop"""


class Aquagauge:
    """driver for acquiring data from Auqagauge controller via serial interface"""

    # a reading is a list of observed values, indexed by sensor_id
    #
    # number of sensors supported by controller
    MAX_SENSORS = 8

    # input records are formatted as
    #   reading ::= [ key value ]* "i"
    #   key ::= "a"|"b"|"c"|"d"|"e"|"f"|"g"|"h"
    #   value ::= digit [ digit ]*
    #   digit ::= "0"|"1"|"2"|"3"|"4"|"5"|"6"|"7"|"8"|"9"
    # where
    #   key is sensor_id offset by "a" e.g. c -> 2
    #   value is positive decimal integer
    #   there is no white space
    KEYS = b'abcdefgh'
    END_KEY = b'i'
    DIGITS = b'0123456789'

    def __init__(self, aqua_dict):

        self.port = None
        self.speed = None
        self.open_attempts_max = None
        self.open_attempts_delay = None
        self.open_attempts_delay_long = None

        try:  # throws to service thread
            self.port = aqua_dict['port']       # mandatory
            self.speed = int(aqua_dict.get('speed', 2400))
            self.open_attempts_max = int(aqua_dict.get('open_attempts_max', 4))
            self.open_attempts_delay = float(aqua_dict.get('open_attempts_delay', 2.0))
            self.open_attempts_delay_long = float(aqua_dict.get('open_attempts_delay_long',
                                                                1800.0))
        except KeyError as e:
            log.error(f"{self.__class__.__name__}: config: lacking {e.args[0]}")
            raise
        except ValueError as e:
            log.error(f"{self.__class__.__name__}: config: bad value {e.args[0]}")
            raise

        self.f = None
        self.timer = None
        self.stop = None

    def run(self, q, stop):
        """insert readings from Aquagauge driver onto queue"""
        log.debug(f"{self.__class__.__name__} start")

        self.stop = stop

        try:
            ch = self.getc()
            while not stop.is_set():
                reading = [None for _i in range(Aquagauge.MAX_SENSORS)]

                while True:         # process all fields of one record

                    # check for completion of a record
                    if ch == Aquagauge.END_KEY:
                        q.put(reading)
                        ch = self.getc()
                        break       # record finished

                    # identify key
                    if ch not in Aquagauge.KEYS:
                        log.warning(f"{self.__class__.__name__}: invalid key {ch}")
                        ch = self.getc()
                        continue    # field finished (failed)
                    sensor_id = Aquagauge.KEYS.index(ch)
                    ch = self.getc()

                    # calculate value
                    if ch not in Aquagauge.DIGITS:
                        log.warning(f"{self.__class__.__name__}: missing value")
                        continue    # field finished (failed)
                    value = Aquagauge.DIGITS.index(ch)
                    while True:
                        ch = self.getc()
                        if ch not in Aquagauge.DIGITS:
                            break   # value finished (success)
                        value = 10*value + Aquagauge.DIGITS.index(ch)
                    reading[sensor_id] = value

        except Terminate as e:
            log.debug(f"{self.__class__.__name__}: terminated", exc_info=e)
            q.put(None)         # signal service thread that we are stopping
        log.debug(f"{self.__class__.__name__} finish")

    def getc(self):
        """get next char, in robust fashion"""

        # open device if not already open
        if self.f is None:
            self.open()

        # before potential wait on read, check if we have been asked to stop
        if self.stop.is_set():
            raise Terminate("stop")

        # try to read next char
        try:
            ch = self.f.read(1)
            if ch:
                return ch       # finished (success)
            # EOF, so drop through to error recovery
            log.warning(f"{self.__class__.__name__}: EOF")
        except serial.SerialException as e:
            log.error(f"{self.__class__.__name__}: read error", exc_info=e)

        # error recovery = close device and allow it to be re-opened
        try:
            self.f.close()
        except serial.SerialException as e:
            log.error(f"{self.__class__.__name__}: close error", exc_info=e)
        self.f = None

    def open(self):
        """open device, in robust fashion"""

        while True:             # try forever...

            # have a few short attempts
            for attempt in range(self.open_attempts_max):
                # before potential wait on port open, check if we have been asked to stop
                if self.stop.is_set():
                    raise Terminate("stop")

                try:
                    # 8 bits, 1 stop bit, no parity, no xon/xoff, no rts/dtr
                    self.f = serial.Serial(port=self.port, baudrate=self.speed)
                    log.debug(f"{self.__class__.__name__}: {self.port}: open succeeded")
                    return      # finished open (success)
                except ValueError as e:
                    log.error(f"{self.__class__.__name__}: bad port config: {repr(e)}")
                except serial.SerialException as e:
                    log.error(f"{self.__class__.__name__}: {e.args[1]}")
                self.delay(self.open_attempts_delay)

            # have a long delay and hope for external intervention
            log.warning(f"{self.__class__.__name__}: long sleep"
                        f" waiting for port problem to be cleared")
            self.delay(self.open_attempts_delay_long - self.open_attempts_delay)

    def delay(self, secs):
        """delay for a while using cancellable timer, or sleep if timer fails"""

        def nothing(): pass

        try:
            self.timer = threading.Timer(secs, nothing)
            self.timer.start()
            self.timer.join()       # cancellable :-)
            self.timer = None
        except threading.ThreadError as e:
            log.debug(f"{self.__class__.__name__}: timer failed {repr(e)}, call sleep")
            time.sleep(secs)

    def kill(self):
        """make best effort to kill thread"""

        if self.timer:
            self.timer.cancel()

On 7 Jul 2020, at 10:29 am, weather list <[email protected]> wrote:

I’d be very interested in the details of your setup for the depth sensor if you’re willing to share.

On 5 Jul, 2020, at 23:16, Graham Eddy <[email protected]> wrote:

i have added external river depth sensor (pressure not ultrasonic but same principle) - wrote a data service that inserts its readings into weewx loop records; defined new database column for it to persist.
g-eddy

On 6 Jul 2020, at 9:25 am, Wes Witt <[email protected]> wrote:

Is there an extension for adding snow data to weewx? i'm considering adding an ultrasonic sensor to my weather station which would provide snow depth data. how have people implemented this before? just add a database field for snowdepth? what about snowrate?

just thinking that i may not need to reinvent the wheel if someone has already done this.

-Wes

--
You received this message because you are subscribed to the Google Groups "weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [email protected].
To view this discussion on the web visit https://groups.google.com/d/msgid/weewx-user/638DA29F-AFC3-431E-8743-518ED26D4899%40gmail.com.

Reply via email to