here is my service. In this version I took your advice and decided to use
the weewx-sdr driver completely unmodified and put all my stuff in a
service. Sorry the weewx-sdr driver (the original, not my version) reads
its input from AsyncReader (not my modification). That uses a thread.
That's what I was referring to. But since I already had my (slightly
modified) version of AsyncReader from before I spoke to you, I reused it in
the service (see the attachment). It has a queue. It stores lines of
input from the arduino over usb and discards any which are not json. This
is completely separate from the weewx-sdr and its AsyncReader. Plus this
AsyncReader can also be set up as a writer which enables me to have the
option of a simple time server on the raspberry pi to set the RTC on the
arduino if needed.
Anyway I could always get rid of the AsyncReader and just use
serial.readline().
Perhaps the example in the weewx customization guide is for a one-shot read
of the indoors sensors. Instead I made it loop over the lines of input in
the queue and use an accumulator to average them.
The callback for NEW_ARCHIVE_RECORD for my service would (I think) be
called before StdArchive.new_archive_record(). My service only has access
to the event and event.record. Does it also have access to
StdArchive.old_accumulator ?
In StdArchive.new_archive_record(event) it gets the event which was
modified by my service so the record includes my data.
*So my question is* what about old_accumulator in
StdArchive.new_archive_record() ? Is it strictly optional ? That
accumulator only has the outside data not the data added by my
new_archive_record() callback.
Of course my data is already averaged.
In StdArchive.new_archive_record() the event.record has more observation
types than the old_accumulator. Does this pose a problem ?
what do you mean by augmenting? does this involve choosing
self.record_generation = hardware ?
On Wednesday, April 26, 2023 at 6:12:50 PM UTC-7 gjr80 wrote:
> Yes, that section covers it fairly well. The two main things to watch are
> not delaying the main WeeWX engine loop and ensuring the data you add to
> the loop packet follows the unit system used in the loop packet. Delaying
> the main loop is often associated with accessing data via the internet or
> some other network. A common approach is to develop the data service
> without using its own thread, this makes debugging much easier. If delay is
> an issue you can move the service to its own thread later. Unit consistency
> is achieved by checking the usUnits field in the loop packet and then
> converting your service' data to the correct units. This is important for
> obs whose units that vary across unit systems (eg temperature) but not an
> issue if the same unit is used across all unit systems (eg wind direction).
> A robust, well written data service will not assume the loop packet always
> uses the same unit system.
>
> Gary
>
> On Thursday, 27 April 2023 at 01:24:52 UTC+10 [email protected] wrote:
>
>> Thank you I expect you are referring to the Customization guide
>> "Customizing the WeeWX service engine; Adding a second data source"; I will
>> try doing it that way; Thanks again;
>>
>> On Tuesday, April 25, 2023 at 10:16:18 PM UTC-7 gjr80 wrote:
>>
>>> I'm not sure if you are providing a running commentary or seeking help.
>>> If the latter then I have no idea where to start. Personally, I think your
>>> architecture is way too complex; you appear to be running a highly modified
>>> driver that seeks to amalgamate data from two sources. I imagine you will
>>> strike all sorts of corner cases depending on what arrives when. I would
>>> also question the utility of reading indoor obs every seven seconds, seems
>>> way too frequent to me. All told I doubt you are going to find too many
>>> folks here to help.
>>>
>>> Why not run a standard sdr driver to feed WeeWX with loop packets from
>>> the Atlas with a simple, non-threaded data service bound to the new loop
>>> packet arrival to read the 'indoor data' (I assume this is pressure,
>>> temperature and humidity) and augment the loop packet. Far more modular and
>>> easier to test/develop (and get help), you will be running a standard sdr
>>> driver, and since you are already getting your hands dirty modifying the
>>> sdr driver, writing a small data service to handle the Arduino input should
>>> be a walk in the park. If the Arduino is serially connected to the RPi you
>>> should not have great latency in accessing data, so a suitably short
>>> timeout on the serial reads should provide you with you indoor data without
>>> blocking the main WeeWX thread. Once proved, the serial read could be moved
>>> to a thread if you really had the need.
>>>
>>> Gary
>>>
>>
--
You received this message because you are subscribed to the Google Groups
"weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/weewx-user/c1706af5-d27a-4f7c-b979-88cd26de9064n%40googlegroups.com.
"""
It is important that the function download_total_power() does not delay very long
because it will sit right in the main loop of the WeeWX engine.
If it iss going to cause a delay of more than a couple seconds
you might want to put it in a separate thread and
feed the results to AddElectricity through a queue.
Why not run a standard sdr driver to feed WeeWX with loop packets from the Atlas
with a simple, non-threaded data service bound to the new loop packet arrival
to read the indoor data (pressure, temperature and humidity) and augment the loop packet.
Far more modular and easier to test/develop (and get help),
you will be running a standard sdr driver,
and since you are already getting your hands dirty modifying the sdr driver,
writing a small data service to handle the Arduino input should be a walk in the park.
If the Arduino is serially connected to the RPi
you should not have great latency in accessing data,
so a suitably short timeout on the serial reads should provide you with your
indoor data without blocking the main WeeWX thread.
Once proved, the serial read could be moved to a thread if you really had the need.
"""
import weewx
from weewx.engine import StdService
# thp stands for indoors; temp; humidity; pressure;
#DEFAULT_FNAME_THPUSB = '/dev/ttyACM0'
DEFAULT_FNAME_THPUSB = '/dev/ttyACMwa'
# RTC_PERIOD_STX = 1000 * 20 # server TX signal every 20 sec;
WRITE_ATTEMPT_MAX = 4
#####################################################################################
# fixme check all timeouts
class AsyncReader(threading.Thread):
# when the timeserver is not used time is just a blank or sentinel or placeholder value
TS_TIME1 = re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d')
TS_TIME2 = re.compile('"time": 0')
TS_TIME3 = re.compile('YYYY-MM-DD HH:MM:SS')
def __init__(self, fd, queue1, label, timeserver_used):
threading.Thread.__init__(self)
self._fd = fd
self._queue = queue1
self._running = False
self._read_mode = True
# fixme remove timeserver
# self._timeserver_used = timeserver_used
self._timeserver_used = False
self._write_attempt = 0
self.setDaemon(True)
self.setName(label)
def run_read_mode(self):
# fixme I think this should be longer than the loop time for the arduino
timeout = 12
# select(rlist,wlist,xlist,timeout); wait until ready for reading (rlist);
ready,_,_ = select.select([self._fd], [], [], timeout)
if not ready:
return
line = self._fd.readline().rstrip().decode()
# fixme verbose
logdbg("read line = '%s'" % (line))
if not line:
return
# we are not processing any lines other than json so just keep only json;
# to see the debugging info comment this line out;
if not line.startswith("{"):
return
if self._timeserver_used:
if line == "request_datetime":
self._read_mode = False
self._write_attempt = 0
return
else:
line = self.insert_time(line)
self._queue.put(line)
def run_write_mode(self):
timeout = 12
# select(rlist,wlist,xlist,timeout); wait until ready for writing (wlist);
_,ready,_ = select.select([], [self._fd], [], timeout)
if not ready:
self._write_attempt += 1
if self._write_attempt >= self.WRITE_ATTEMPT_MAX:
logdbg("AsyncReader write timeout")
self._read_mode = True
return
# int(1677178029.12345) seconds since epoch
time_now_epoch = int(time.time())
# b'1677178029\n'
buf = bytes(f'{time_now_epoch}\n', 'ascii')
self._fd.write(buf)
self._fd.flush()
loginf("AsyncReader wrote timestamp={} buf={}".format(time_now_epoch, buf))
self._read_mode = True
def run(self):
logdbg(f"start AsyncReader for {self.getName()} timeserver_used = {self.timeserver_used}")
self._running = True
while self._running:
if self._read_mode:
self.run_read_mode()
else:
self.run_write_mode()
def stop_running(self):
self._running = False
def insert_time(self, line):
# float; seconds since epoch;
time_now_epoch = time.time()
time_now_str = f'\"time\": {time_now_epoch}'
# substitute time_now_str for ALL matched blank time(s)
line = re.sub(AsyncReader.TS_TIME2, time_now_str, line)
return line
class AddAtlasInside(StdService):
def __init__(self, engine, config_dict):
# Initialize my superclass first:
super(AddAtlasInside, self).__init__(engine, config_dict)
# Bind to any new archive record events:
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
# ttyACM0 for wpa == Temperature; Humidity; Pressure sensor
self._fname_thpusb = config_dict.get('fname_thpusb', DEFAULT_FNAME_THPUSB)
archive_dict = config_dict.get('StdArchive', {})
self.archive_interval = to_int(archive_dict.get('archive_interval', 300))
self.loop_hilo = to_bool(archive_dict.get('looop_hilo', True))
# stdout_queue is used by AsyncReader;
self.stdout_queue = queue.Queue()
self._fptr_thpusb = None
self.wpa_reader = None
try:
# self._fptr_thpusb = open(self._fname_thpusb, "rb")
# timeout = 5 # changed 4/24/23
timeout = 14
self._fptr_thpusb = serial.Serial(port=self._fname_thpusb,
baudrate=115200,
timeout=timeout)
self.wpa_reader = AsyncReader(self._fptr_thpusb,
self.stdout_queue,
"wpa-reader-thread",
timeserver_used = True)
self.wpa_reader.start()
except (OSError, ValueError) as e:
raise weewx.WeeWxIOError("failed to start process '%s': %s" %
(cmd, e))
def shutDown(self):
# originally close() was before the shutdown of the readers
self.wpa_reader.stop_running()
self.wpa_reader.join(0.5)
# originally this was before the shutdown of the readers
self._fptr_thpusb.close()
if self.wpa_reader.is_alive():
loginf('timed out waiting for %s' % self.wpa_reader.getName())
else:
loginf('completed %s' % self.wpa_reader.getName())
self.wpa_reader = None
loginf('shutdown complete')
def _new_accumulator(self, timestamp):
start_ts = weeutil.weeutil.startOfInterval(timestamp, self.archive_interval)
end_ts = start_ts + self.archive_interval
new_accumulator = weewx.accum.Accum(weeutil.weeutil.TimeSpan(start_ts, end_ts))
return new_accumulator
def download_inside_weather_record(self):
# problem; in StdArchive.new_archive_record
# old_accumulator and accumulator do not have the new inside data yet;
def new_archive_record(self, event):
done = False
accumulator = self._new_accumulator(event.packet['dateTime'])
while not done:
try:
line = self.wpa_reader.stdout_queue.get(True, 3)
packet = json.loads(line)
# fixme event.record already has time
packet['dateTime'] = packet['time']
del packet['time']
# packet['usUnits'] = weewx.US
if 'usUnits' in pkt:
packet['usUnits'] = pkt.pop('usUnits', 0)
self.accumulator.addRecord(packet, self.loop_hilo)
except queue.Empty as exc:
done = True
except json.JSONDecodeError as exc:
# fixme
pass
except weewx.accum.OutOfSpan as exc:
# fixme
pass
accumulated_record = self.accumulator.getRecord()
event.record.update(accumulated_record)
# total_power = download_total_power()
# event.record['electricity'] = total_power
# eee eof