here is my service.  In this version I took your advice and decided to use 
the weewx-sdr driver completely unmodified and put all my stuff in a 
service.  Sorry the weewx-sdr driver (the original, not my version) reads 
its input from AsyncReader (not my modification).  That uses a thread.  
That's what I was referring to.  But since I already had my (slightly 
modified) version of AsyncReader from before I spoke to you, I reused it in 
the service (see the attachment).  It has a queue.  It stores lines of 
input from the arduino over usb and discards any which are not json.  This 
is completely separate from the weewx-sdr and its AsyncReader.  Plus this 
AsyncReader can also be set up as a writer which enables me to have the 
option of a simple time server on the raspberry pi to set the RTC on the 
arduino if needed.  

Anyway I could always get rid of the AsyncReader and just use 
serial.readline().

Perhaps the example in the weewx customization guide is for a one-shot read 
of the indoors sensors.  Instead I made it loop over the lines of input in 
the queue and use an accumulator to average them.

The callback for NEW_ARCHIVE_RECORD for my service would (I think) be 
called before StdArchive.new_archive_record().  My service only has access 
to the event and event.record.  Does it also have access to 
StdArchive.old_accumulator ?
In StdArchive.new_archive_record(event)   it gets the event which was 
modified by my service so the record includes my data.

*So my question is* what about old_accumulator in 
StdArchive.new_archive_record() ?  Is it strictly optional ?  That 
accumulator only has the outside data not the data added by my 
new_archive_record() callback.
Of course my data is already averaged.  
In StdArchive.new_archive_record() the event.record has more observation 
types than the old_accumulator.  Does this pose a problem ?

what do you mean by augmenting?    does this involve choosing 
self.record_generation = hardware ?
On Wednesday, April 26, 2023 at 6:12:50 PM UTC-7 gjr80 wrote:

> Yes, that section covers it fairly well. The two main things to watch are 
> not delaying the main WeeWX engine loop and ensuring the data you add to 
> the loop packet follows the unit system used in the loop packet. Delaying 
> the main loop is often associated with accessing data via the internet or 
> some other network. A common approach is to develop the data service 
> without using its own thread, this makes debugging much easier. If delay is 
> an issue you can move the service to its own thread later. Unit consistency 
> is achieved by checking the usUnits field in the loop packet and then 
> converting your service' data to the correct units. This is important for 
> obs whose units that vary across unit systems (eg temperature) but not an 
> issue if the same unit is used across all unit systems (eg wind direction). 
> A robust, well written data service will not assume the loop packet always 
> uses the same unit system.
>
> Gary
>
> On Thursday, 27 April 2023 at 01:24:52 UTC+10 [email protected] wrote:
>
>> Thank you I expect you are referring to the Customization guide 
>> "Customizing the WeeWX service engine; Adding a second data source"; I will 
>> try doing it that way; Thanks again;
>>
>> On Tuesday, April 25, 2023 at 10:16:18 PM UTC-7 gjr80 wrote:
>>
>>> I'm not sure if you are providing a running commentary or seeking help. 
>>> If the latter then I have no idea where to start. Personally, I think your 
>>> architecture is way too complex; you appear to be running a highly modified 
>>> driver that seeks to amalgamate data from two sources. I imagine you will 
>>> strike all sorts of corner cases depending on what arrives when. I would 
>>> also question the utility of reading indoor obs every seven seconds, seems 
>>> way too frequent to me. All told I doubt you are going to find too many 
>>> folks here to help.
>>>
>>> Why not run a standard sdr driver to feed WeeWX with loop packets from 
>>> the Atlas with a simple, non-threaded data service bound to the new loop 
>>> packet arrival to read the 'indoor data' (I assume this is pressure, 
>>> temperature and humidity) and augment the loop packet. Far more modular and 
>>> easier to test/develop (and get help), you will be running a standard sdr 
>>> driver, and since you are already getting your hands dirty modifying the 
>>> sdr driver, writing a small data service to handle the Arduino input should 
>>> be a walk in the park. If the Arduino is serially connected to the RPi you 
>>> should not have great latency in accessing data, so a suitably short 
>>> timeout on the serial reads should provide you with you indoor data without 
>>> blocking the main WeeWX thread. Once proved, the serial read could be moved 
>>> to a thread if you really had the need.
>>>
>>> Gary
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/weewx-user/c1706af5-d27a-4f7c-b979-88cd26de9064n%40googlegroups.com.
"""
It is important that the function download_total_power() does not delay very long
because it will sit right in the main loop of the WeeWX engine. 
If it iss going to cause a delay of more than a couple seconds
you might want to put it in a separate thread and 
feed the results to AddElectricity through a queue.

Why not run a standard sdr driver to feed WeeWX with loop packets from the Atlas
with a simple, non-threaded data service bound to the new loop packet arrival
to read the indoor data (pressure, temperature and humidity) and augment the loop packet.
Far more modular and easier to test/develop (and get help),
you will be running a standard sdr driver,
and since you are already getting your hands dirty modifying the sdr driver,
writing a small data service to handle the Arduino input should be a walk in the park.
If the Arduino is serially connected to the RPi
you should not have great latency in accessing data,
so a suitably short timeout on the serial reads should provide you with your
indoor data without blocking the main WeeWX thread.
Once proved, the serial read could be moved to a thread if you really had the need.
"""

import weewx
from weewx.engine import StdService

# thp stands for indoors; temp; humidity; pressure;

#DEFAULT_FNAME_THPUSB = '/dev/ttyACM0'
DEFAULT_FNAME_THPUSB = '/dev/ttyACMwa'
# RTC_PERIOD_STX  = 1000 * 20      #  server TX signal every 20 sec;
WRITE_ATTEMPT_MAX = 4


#####################################################################################

# fixme check all timeouts
class AsyncReader(threading.Thread):
    # when the timeserver is not used time is just a blank or sentinel or placeholder value
    TS_TIME1 = re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d')
    TS_TIME2 = re.compile('"time": 0')
    TS_TIME3 = re.compile('YYYY-MM-DD HH:MM:SS')

    def __init__(self, fd, queue1, label, timeserver_used):
        threading.Thread.__init__(self)
        self._fd = fd
        self._queue = queue1
        self._running = False
        self._read_mode = True
        # fixme remove timeserver
        # self._timeserver_used = timeserver_used
        self._timeserver_used = False
        self._write_attempt = 0
        self.setDaemon(True)
        self.setName(label)

    def run_read_mode(self):
        # fixme I think this should be longer than the loop time for the arduino
        timeout = 12
        # select(rlist,wlist,xlist,timeout); wait until ready for reading (rlist);
        ready,_,_ = select.select([self._fd], [], [], timeout)
        if not ready:
            return
        line = self._fd.readline().rstrip().decode()
        # fixme verbose
        logdbg("read line = '%s'" % (line))
        if not line:
            return
        # we are not processing any lines other than json so just keep only json;
        # to see the debugging info comment this line out;
        if not line.startswith("{"):
            return
        if self._timeserver_used:
            if line == "request_datetime":
                self._read_mode = False
                self._write_attempt = 0
                return
        else:
            line = self.insert_time(line)
        self._queue.put(line)

    def run_write_mode(self):
        timeout = 12
        # select(rlist,wlist,xlist,timeout); wait until ready for writing (wlist);
        _,ready,_ = select.select([], [self._fd], [], timeout)
        if not ready:
            self._write_attempt += 1
            if self._write_attempt >= self.WRITE_ATTEMPT_MAX:
                logdbg("AsyncReader write timeout")
                self._read_mode = True
            return
        # int(1677178029.12345) seconds since epoch
        time_now_epoch = int(time.time())
        # b'1677178029\n'
        buf = bytes(f'{time_now_epoch}\n', 'ascii')
        self._fd.write(buf)
        self._fd.flush()
        loginf("AsyncReader wrote timestamp={} buf={}".format(time_now_epoch, buf))
        self._read_mode = True

    def run(self):
        logdbg(f"start AsyncReader for {self.getName()} timeserver_used = {self.timeserver_used}")
        self._running = True
        while self._running:
            if self._read_mode:
                self.run_read_mode()
            else:
                self.run_write_mode()

    def stop_running(self):
        self._running = False

    def insert_time(self, line):
        # float; seconds since epoch;
        time_now_epoch = time.time()
        time_now_str = f'\"time\": {time_now_epoch}'
        # substitute time_now_str for ALL matched blank time(s)
        line = re.sub(AsyncReader.TS_TIME2, time_now_str, line)
        return line
        

class AddAtlasInside(StdService):

    def __init__(self, engine, config_dict):

      # Initialize my superclass first:
      super(AddAtlasInside, self).__init__(engine, config_dict)

      # Bind to any new archive record events:
      self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)

      # ttyACM0 for wpa == Temperature; Humidity; Pressure sensor
      self._fname_thpusb = config_dict.get('fname_thpusb', DEFAULT_FNAME_THPUSB)

      archive_dict = config_dict.get('StdArchive', {})
      self.archive_interval = to_int(archive_dict.get('archive_interval', 300))

      self.loop_hilo = to_bool(archive_dict.get('looop_hilo', True))
      
      # stdout_queue is used by AsyncReader; 
      self.stdout_queue = queue.Queue()

      self._fptr_thpusb = None
      self.wpa_reader = None
      try:
          # self._fptr_thpusb = open(self._fname_thpusb, "rb")
          # timeout = 5    # changed 4/24/23 
          timeout = 14
          self._fptr_thpusb = serial.Serial(port=self._fname_thpusb,
                                            baudrate=115200,
                                            timeout=timeout)

          self.wpa_reader = AsyncReader(self._fptr_thpusb,
                                        self.stdout_queue,
                                        "wpa-reader-thread",
                                        timeserver_used = True)
          self.wpa_reader.start()
      except (OSError, ValueError) as e:
          raise weewx.WeeWxIOError("failed to start process '%s': %s" %
                                   (cmd, e))
      
    def shutDown(self):

        # originally close() was before the shutdown of the readers
        self.wpa_reader.stop_running()
        self.wpa_reader.join(0.5)

        # originally this was before the shutdown of the readers
        self._fptr_thpusb.close()
        
        if self.wpa_reader.is_alive():
            loginf('timed out waiting for %s' % self.wpa_reader.getName())
        else:
            loginf('completed %s' % self.wpa_reader.getName())
        self.wpa_reader = None

        loginf('shutdown complete')


    def _new_accumulator(self, timestamp):
      start_ts = weeutil.weeutil.startOfInterval(timestamp, self.archive_interval)
      end_ts = start_ts + self.archive_interval
      new_accumulator = weewx.accum.Accum(weeutil.weeutil.TimeSpan(start_ts, end_ts))
      return new_accumulator
        
    def download_inside_weather_record(self):

    # problem; in StdArchive.new_archive_record
    # old_accumulator and accumulator do not have the new inside data yet;
        
    def new_archive_record(self, event):
        done = False
        accumulator = self._new_accumulator(event.packet['dateTime'])
        while not done:
            try:
                line = self.wpa_reader.stdout_queue.get(True, 3)
                packet = json.loads(line)
                # fixme event.record already has time
                packet['dateTime'] = packet['time']
                del packet['time']
                # packet['usUnits'] = weewx.US
                if 'usUnits' in pkt:
                    packet['usUnits'] = pkt.pop('usUnits', 0)
                self.accumulator.addRecord(packet, self.loop_hilo)
            except queue.Empty as exc:
                done = True
            except json.JSONDecodeError as exc:
                # fixme
                pass
            except weewx.accum.OutOfSpan as exc:
                # fixme
                pass
        accumulated_record = self.accumulator.getRecord()
        event.record.update(accumulated_record)


#        total_power = download_total_power()
#        event.record['electricity'] = total_power


# eee eof

Reply via email to