here is the hopefully final version. seems to have finally fixed gaps.
what happened with the version in the previous post was ...
The NEW_LOOP_PACKET callback in the service I wrote waits too long to get
the next line of inside (temp press humid) data. Meanwhile the outside
data (from weewx-sdr from the Atlas) gets backed up. You can see this by
setting "log_lines=1" which is an option specific to weewx-sdr. Because
there was a lot of data including duplicates from the Atlas and because the
arduino was too slow to keep up with it I emulated the AsyncReader thread
from weewx-sdr. My version of AsyncReader keeps only the last valid json
line. It can return that line *instantly* when processing
NEW_LOOP_PACKET. That means none of the Atlas lines back up and none of
them get skipped.
So it is completely non blocking and this should be what GJR described as
close as I could do.
If you want the arduino code, just make anything that prints a json version
of the indoors data such as
{"inTemp": 60.0, "inHumidity": 40, etc. }
these must be the only lines from the arduino beginning with "{".
Everything else is skipped so the arduino can produce as much print
statement debugging as you want.
On Monday, May 1, 2023 at 9:59:36 PM UTC-7 William Garber wrote:
>
> here is another version that blocks less.
> On Sunday, April 30, 2023 at 11:05:37 PM UTC-7 William Garber wrote:
>
>> Until now the Atlas emitted NEW_LOOP_PACKET events about every 7 seconds,
>> so I had my indoor weather data arduino (wpa) set to emit at the same
>> interval. I just sped that up to every 3 seconds (just the wpa emitter).
>> You can "$ cat /dev/ttyACMwa" which prints the serial data output over usb
>> from the arduino (wpa) directly to the linux console. That shows (the new
>> interval) is 3 seconds. I will run it over a long time to see if there are
>> any long intervals. But what I suspect is happening is ... I have three
>> weather stations. I built two with Adafruit parts plus the indoors part of
>> this one (wpa/Atlas). The "merged" reports are run as a fourth instance of
>> weewx which is not the server, just "wee_reports_merge" which is just
>> "wee_report" with the configuration file for the merged report. The worst
>> thing is that this was being run every 90 seconds (DUH). I slowed it down
>> to every 10 minutes. The archive interval for Atlas/wpa is 5 minutes.
>>
>> The hard part was reading the serial port to get the inside data (one
>> line of json data).
>> (1) flush input buffer
>> (2) read discard first line in case it is partial
>> (3) read next line or timeout
>> the readline() commands are blocking with timeout. Not sure how to do
>> non blocking but see this discussion:
>>
>> https://stackoverflow.com/questions/1093598/pyserial-how-to-read-the-last-line-sent-from-a-serial-device
>> they all seem to think that at least part of the procedure is blocking
>> (not all !)
>>
>> it works so far. will leave it to run overnight service
>> attached. service processes inside data in NEW_LOOP_PACKET as you
>> described. weewx uses service plus original unmodified weewx-sdr.py driver
>> for outside data.
>>
>> On Sunday, April 30, 2023 at 4:47:56 PM UTC-7 gjr80 wrote:
>>
>>> On Sunday, 30 April 2023 at 13:42:56 UTC+10 [email protected] wrote:
>>>
>>> Just one question please :-). Suppose the read of the arduino could
>>> possibly take a relatively long time, and you want to have a timeout after
>>> which it gives up and saves None/NULL for the indoor data.
>>>
>>> What is the max timeout that would be reasonable relative to the archive
>>> interval ?
>>>
>>>
>>> I would be more concerned about the loop packet interval than the
>>> archive interval. Your initial post indicated the Atlas emits packets every
>>> 7 seconds so the SDR driver will be emitting loop packets every 7 odd
>>> seconds. If you have a plain vanilla WeeWX install WeeWX will not be doing
>>> much else during the loop packet interval other than calculating derived
>>> obs so dwelling for up to, say, a second should have no significant effect.
>>>
>>> What happens if the delay goes past the end of the main archive interval
>>> the event you are handling was in?
>>>
>>>
>>> Delaying past the end of the archive interval will not have a
>>> significant impact (within reason). Say your service delays 20 seconds past
>>> the end of the archive period, when the driver gets it's turn again it will
>>> emit another loop packet which will cause an archive record to be generated
>>> by WeeWX and ultimately the report cycle is run maybe 20+ seconds later
>>> than usual (note the exact behaviour of the driver is very much driver
>>> dependent; some drivers may skip loop packets, others may emit a loop
>>> packet immediately and yet others may delay emitting a loop packets - the
>>> SDR driver is threaded and I believe it is the former). So really you will
>>> probably only noticed delayed report output.
>>>
>>> Where you will probably get more problems from delaying the WeeWX main
>>> loop is in the generation/processing of loop packets. As mentioned above
>>> driver behaviour varies from driver to driver. For example, the vantage
>>> driver obtains loop packets every 2 odd seconds; if a loop packet is missed
>>> it is gone forever. Other drivers poll the hardware much less frequently,
>>> say every 50 odd seconds, in that case there could be an entire minute
>>> might go by with no data. The consequences of a missed loop packet depends
>>> on the system config. A vantage station with a five minute archive interval
>>> would see around 120 loop packets per archive period, so the loss of one
>>> loop packet will have no real impact. Consider the second system with loop
>>> packets arriving every, say, 50 seconds; if it had an archive interval of
>>> one minute, you could conceivably see no loop packets in an archive
>>> interval and hence no archive record is generated and no report cycle
>>> occurs.
>>>
>>> Remember loop packet data is accumulated by WeeWX and many obs in the
>>> resulting archive record are simply the average value of the obs from all
>>> loop packets seen during the archive interval. So for slow changing obs,
>>> such as air temperature/atmospheric pressure, losing the odd loop packet
>>> during an archive interval will have no real impact on the archive record
>>> data provided there are numerous other loop packets received in the archive
>>> interval. This may not be the case for rapidly changing obs such as wind
>>> speed and direction or cumulative obs such as rainfall, in these cases it
>>> may be important not to lose loop packets.
>>>
>>> One way to deal with sources that have significant latency is to place
>>> the code that interacts with the source in it's own thread, this usually
>>> entails one or more queues to pass data to the driver/service which adds
>>> complexity. The non-threaded approach has simplicity, but risks delaying
>>> the WeeWX main loop.
>>>
>>>
>>> I think what is happening is I usually get very fast reads from the
>>> arduino but once in a while I get a really slow one. Could be
>>> something independent like me reading the database while weewx is trying to
>>> write it?
>>>
>>>
>>> Possible I guess, if WeeWX was doing some substantial report generation
>>> at the time, otherwise not likely. Do the slow response occasions align
>>> with WeeWX report generation? What happens if you take WeeWX out of the
>>> equation and run a simple python script to poll the Arduino every so many
>>> seconds and output the time taken to obtain a response?
>>>
>>> Gary
>>>
>>
--
You received this message because you are subscribed to the Google Groups
"weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/weewx-user/c0232918-d130-4097-a280-a0841edd316cn%40googlegroups.com.
"""
the previous weewx_atlas.py adafruit version was a
modified weewx-sdr driver with AsyncReader with a queue
which merged stdout from rtl_433 over usb and wpa arduino over usb;
deprecated 4/28/23 at advice of GJR (Gary) on weewx blog;
the original unmodified driver is now used and my stuff was moved to this new service;
the new service has same name as original driver weewx_atlas.py (this file);
the new custom service is AddAtlasInside; see [Engine][[Services]] section at end under
data_services = user.weewx_atlas.AddAtlasInside,
AddAtlasInside processes NEW_LOOP_PACKET events only;
(A) get_wpa_arduino_data();
read line from usb serial port and json.loads();
return packeti which is a json dict of inside weather;
does not need to contain time or usunits;
(B) new_loop_packet();
packeto = event.packet.copy();
packeti = get_wpa_arduino_data();
a few edits to packeti;
event.packet.update(packeti);
(update is a standard dict() member that merges two dicts);
(C) WeeWX engine accumulates (averages) packets when running callback for NEW_ARCHIVE_RECORD;
GJR comments;
It is important that the function download_total_power() does not delay very long
because it will sit right in the main loop of the WeeWX engine.
If it is going to cause a delay of more than a couple seconds
you might want to put it in a separate thread and
feed the results to AddElectricity through a queue.
Why not run a standard sdr driver to feed WeeWX with loop packets from the Atlas
with a simple, non-threaded data service bound to the new loop packet arrival
to read the indoor data (pressure, temperature and humidity) and augment the loop packet.
Far more modular and easier to test/develop (and get help),
you will be running a standard sdr driver,
and since you are already getting your hands dirty modifying the sdr driver,
writing a small data service to handle the Arduino input should be a walk in the park.
If the Arduino is serially connected to the RPi
you should not have great latency in accessing data,
so a suitably short timeout on the serial reads should provide you with your
indoor data without blocking the main WeeWX thread.
Once proved, the serial read could be moved to a thread if you really had the need.
"""
from __future__ import with_statement
import threading
import select
import signal
import sys
import time
from datetime import datetime
import argparse
import logging
import logging.handlers
import serial
import weewx.units
import weecfg
import weeutil.logger
import weeutil.weeutil
from weeutil.weeutil import to_int
from weewx.engine import StdService, StdEngine
try:
import cjson as json
setattr(json, 'dumps', json.encode)
setattr(json, 'loads', json.decode)
except (ImportError, AttributeError):
try:
import simplejson as json
except ImportError:
import json
log = logging.getLogger(__name__)
SERVICE_VERSION = '0.00'
DEFAULT_CONFIG='/etc/weewx/weewx_atlas.conf'
# thp stands for indoors; temp; humidity; pressure;
# DEFAULT_FNAME_USB_THP = '/dev/ttyACM0'
DEFAULT_FNAME_USB_THP = '/dev/ttyACMwa'
DEFAULT_DEBUG = 1
READ_ATTEMPT_MAX = 140
class AsyncReader(threading.Thread):
"""
read all input from wpa (weather port Atlas) arduino;
reject lines that do not begin with "{" i.e. keep only json data;
keep only self._last_line == most recent line of json data;
"""
def __init__(self, fd, label):
threading.Thread.__init__(self)
self._fd = fd
self._last_line = None
self._lock_last_line = threading.Lock()
self._partial = ''
self._attempt = 0
self._running = False
self.setDaemon(True)
self.setName(label)
def read_all_with_error(self):
"""read whole input buffer; return empty if error or timeout;"""
# block until one more char or timeout
tmp = self._fd.read(1).decode()
if len(tmp) != 1:
log.error('read_all_with_error() timeout;')
return None
# small block to collect data; buffer should not overflow in this time;
# zero works okay
# time.sleep(0.100)
# time.sleep(0.010)
nwait = self._fd.inWaiting()
tmp += self._fd.read(nwait).decode()
if len(tmp) != 1 + nwait:
log.error('read_all_with_error() timeout;')
return None
return tmp
def get_last_line_of_json_data(self):
"""
main iteration;
read entire chunk of all data waiting in input buffer received from arduino;
blocking with timeout of serial port; only blocks on reading first char;
split and loop over lines;
store last line which begins with '{' in self._last_line;
if final line in split list does not end with newline it is a partial line
so save for next iteration to be concatenated with rest of line;
"""
buf = self.read_all_with_error()
if not buf:
self._partial = ''
return
line_list = buf.splitlines(keepends = True)
if not line_list:
self._partial = ''
return
# concatenate partial last line on previous call + line_list[0]
line_list[0] = self._partial + line_list[0]
# find line == last line beginning with '{' and ending with newline;
line = None
self._partial = ''
for jjj, tmp in enumerate(line_list):
if tmp[0] == '{':
if tmp[-1] == '\n':
# found a valid line; keep;
line = tmp
elif jjj == len(line_list) - 1:
# found partial last line;
self._partial = tmp
if line:
log.debug('after attempt = %d line = %s', self._attempt, line)
with self._lock_last_line:
self._last_line = line
self._attempt = 0
return
self._attempt += 1
def get_last_line_of_json_data1(self):
"""this may be all you need;"""
timeout = 12
# select(rlist,wlist,xlist,timeout); wait until ready for reading (rlist);
ready,_,_ = select.select([self._fd], [], [], timeout)
if not ready:
return
line = self._fd.readline().rstrip().decode()
log.debug("read line = '%s'", line)
if not line:
return
if not line.startswith("{"):
return
self._last_line = line
def get_last_line(self):
"""return last line found; lock checks it is not currently being saved;"""
with self._lock_last_line:
# strings are immutable so you do not need a copy
line = self._last_line
return line
def run(self):
"""iterate while self._running;"""
log.debug("start AsyncReader for %s", self.getName())
self._running = True
while self._running:
self.get_last_line_of_json_data()
def stop_running(self):
"""will halt thread at end of newx get_last_line_of_json_data;"""
self._running = False
class AddAtlasInside(StdService):
"""
service that processes NEW_LOOP_PACKET events;
(1) get_wpa_arduino_data() returns partial packet of indoor data packeti;
(2) new_loop_packet() input is packeto (outdoor data packet); update (merge) with packeti;
"""
def __init__(self, engine, config_dict):
super().__init__(engine, config_dict)
# Bind to any weewx.NEW_LOOP_PACKET events
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
archive_dict = config_dict.get('StdArchive', {})
self._archive_interval = to_int(archive_dict.get('archive_interval', 300))
log.debug('_archive_interval = %d', self._archive_interval)
# e.g. serial usb port == ttyACM0 or ttyACMwa
sdr_dict = config_dict.get('SDR', {})
self._fname_usb_thp = sdr_dict.get('fname_usb_thp', DEFAULT_FNAME_USB_THP)
log.debug('_fname_usb_thp = %s', self._fname_usb_thp)
# open serial port
self._fptr_usb_thp = None
self.wpa_reader = None
try:
# timeout = 14
# timeout = 10
timeout = 5
self._fptr_usb_thp = serial.Serial(port=self._fname_usb_thp,
baudrate=115200,
timeout=timeout)
self.wpa_reader = AsyncReader(self._fptr_usb_thp, "wpa-reader-thread")
self.wpa_reader.start()
except (OSError, ValueError) as exc:
log.debug('startup failed')
log.debug('exc= %s', str(exc))
raise weewx.WeeWxIOError(f"failed to start AddAtlasInside; {exc}")
log.debug('AddAtlasInside startup passed')
def shutDown(self):
"""close _fptr_usb_thp ttyACM* serial port;"""
log.debug('AddAtlasInside.shutDown()')
self.wpa_reader.stop_running()
self.wpa_reader.join(0.5)
self._fptr_usb_thp.close()
if self.wpa_reader.is_alive():
log.info('timed out waiting for %s', self.wpa_reader.getName())
else:
log.info('completed %s', self.wpa_reader.getName())
self.wpa_reader = None
log.info('shutdown complete')
def new_loop_packet(self, event):
"""
callback bound to NEW_LOOP_PACKET events; this should be VERY FAST;
(1) packeto = just a copy of event.packet for reference; this is outside weather data;
(2) line = wpa_reader.get_last_line() instantly returns last line of json data or None;
(3) packeti = json.loads(line); do nothing on fail; this is inside weather data;
(4) make minor changes to packeti;
(5) event.packet.update(packeti) merges outside data with packeti;
NEW_LOOP_PACKET event.packet should already have these two (key: value) pairs;
"dateTime": 12345678.12345, "usUnits": 1,
"""
log.debug('new_loop_packet() begin ===============================================')
time_packet1 = event.packet['dateTime']
time_packet2 = datetime.utcfromtimestamp(time_packet1).strftime('%Y-%m-%d %H:%M:%S')
log.debug('time_packet1=%d time_packet2=%s', time_packet1, time_packet2)
interval_t1 = weeutil.weeutil.startOfInterval(time_packet1, self._archive_interval)
interval_t2 = interval_t1 + self._archive_interval
log.debug('interval_t1=%d interval_t2=%d', interval_t1, interval_t2)
# inside; this should be instant;
line = self.wpa_reader.get_last_line()
if not line:
return
try:
packeti = json.loads(line)
except json.JSONDecodeError as exc:
log.error('new_loop_packet() json.loads failed line = %s', line)
log.error('new_loop_packet() JSONDecodeError exc = %s', str(exc))
return
# outside
packeto = event.packet.copy()
# minor changes to packeti
# for this specific weather station we assign arbitrary caseTemp1 caseHumid1 cpuTemp1;
if 'outTemp' in packeto.keys():
packeti['caseTemp1' ] = packeto['outTemp']
packeti['cpuTemp1' ] = packeto['outTemp']
if 'outHumidity' in packeto.keys():
packeti['caseHumid1'] = packeto['outHumidity']
# remove aaa_millis which was just for debugging;
if 'aaa_millis' in packeti.keys():
del packeti['aaa_millis']
# event.packet = merge packeti with packeto
event.packet.update(packeti)
log.debug('new_loop_packet(); packet inside; packeti=%s', packeti)
log.debug('new_loop_packet(); packet outside; packeto=%s', packeto)
log.debug('new_loop_packet(); final augmented; event.packet=%s', event.packet)
# fake test packet
FAKE_PACKET = {
'dateTime' : int(time.time()),
'usUnits' : weewx.US,
'windSpeed' : 50.0,
'windDir' : 50.0,
'outTemp' : 50.0,
'outHumidity' : 50.0,
'lux' : 50.0,
'UV' : 50.0,
'rain_total' : 50.0,
'caseTemp1' : 50.0,
'caseHumid1' : 50.0,
'cpuTemp1' : 50.0,
'signal2' : 50.0,
'signal3' : 50.0,
'signal4' : 50.0,
'signal5' : 50.0,
'signal6' : 50.0,
'signal7' : 50.0,
}
def process_options_get_config_dict():
"""
parse command args and
(1) handle simplest options then exit or
(2) return config_dict and continue
"""
usage = """weewx_atlas [--debug] [--help] [--version]"""
print('argparse.ArgumentParser')
parser = argparse.ArgumentParser(description=usage)
parser.add_argument('--version',
dest='version',
action='store_true',
help='display driver version')
parser.add_argument('--debug',
dest='debug',
default=DEFAULT_DEBUG,
action='store_true',
help='display diagnostic information while running')
parser.add_argument('--config',
default=DEFAULT_CONFIG,
action='store',
help='configuration file with sensor map')
print('parser.parse_args')
options = parser.parse_args()
if options.version:
print(f"weewx_atlas version {SERVICE_VERSION}")
sys.exit(1)
if options.debug:
print('log.setLevel(logging.DEBUG)')
log.setLevel(logging.DEBUG)
print(f'{options.config=}')
log.debug('options.config = %s', str(options.config))
log.debug('reading config_dict')
_, config_dict = weecfg.read_config(options.config)
archive_interval_demo_use = True
archive_interval_demo = 30 # seconds
archive_dict = config_dict.get('StdArchive', {})
archive_interval_in = archive_dict.get('archive_interval', None)
if archive_interval_demo_use:
log.debug('using archive_interval_demo; alter config;')
archive_dict['archive_interval'] = str(archive_interval_demo)
log.debug('archive_interval_in = %s', str(archive_interval_in))
log.debug('archive_interval_out = %s', str(archive_interval_demo))
else:
log.debug('using archive interval from options.config')
log.debug('archive_interval_out = %s', str(archive_interval_in))
return config_dict
def main():
"""
test service; not used during weewx run; to run test do;
PYTHONPATH=$PYTHONPATH:/usr/share/weewx python /usr/share/weewx/user/weewx_atlas.py [OPTIONS]
"""
# signal handling
# usb port close is probably already done by station destructor; just in case;
svc = None
def testing_handle_interrupt(signum, stack):
"""catch keyboard interrupts; clean up; close usb port;"""
log.error('Handling interrupt. Closing port.')
print(f'{signum=}')
print(f'{stack=}')
svc.shutDown()
log.error('closed port. exiting.')
sys.exit(0)
signal.signal(signal.SIGTERM, testing_handle_interrupt)
signal.signal(signal.SIGINT, testing_handle_interrupt)
log.addHandler(logging.StreamHandler(sys.stdout))
log.addHandler(logging.handlers.SysLogHandler(address="/dev/log"))
print('log.setLevel INFO')
log.setLevel(logging.INFO)
config_dict = process_options_get_config_dict()
log.debug('StdEngine constructor')
eng = StdEngine(config_dict)
log.debug('AddAtlasInside service constructor')
svc = AddAtlasInside(eng, config_dict)
while True:
log.debug('main loop begin %s', 'M' * 70)
# log.debug('sleeping archive_interval= %d', archive_interval)
# time.sleep(archive_interval)
event = weewx.Event(weewx.NEW_LOOP_PACKET)
event.packet = FAKE_PACKET.copy()
event.packet['dateTime'] = int(time.time())
log.debug('begin event.packet = %s', str(event.packet))
svc.new_loop_packet(event)
log.debug('end event.packet = %s', str(event.packet))
log.debug('main loop end %s', 'M' * 70)
if __name__ == '__main__':
main()
# eee eof