yes for you Gary and the others files,
byows is the driver.
have a good night.
Patrick
On Saturday, December 8, 2018 at 3:40:43 PM UTC+1, Patrick Tranchant wrote:
>
> hello I am a newbie from France ( sorry for my english )
>
> I want to use weewx on a raspberry with a weather station built by myself
> (view on Magpi)
> I don't see my station on your website to configure :
>
> Weather Station Hardware Comparison: I don't found my weather station.
> Do you have a solution or how to configure Weewx
>
> thank you for your help
>
> Patrick
>
--
You received this message because you are subscribed to the Google Groups
"weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.
#!/usr/bin/python
# Created by Pat O'Brien 2019 - https://obrienlabs.net
#
# This driver takes the sensors used in the Raspberry Pi
# build your own weather station project and converts it
# into a format that can be used with weewx.
#
# The Raspberry Pi project can be found here:
# https://projects.raspberrypi.org/en/projects/build-your-own-weather-station
#
import os # POB REMOVE
os.environ['GPIOZERO_PIN_FACTORY'] = os.environ.get('GPIOZERO_PIN_FACTORY', 'mock') # POB REMOVE
from gpiozero import Button
import time
import math
import bme280_sensor_2
import wind_direction_byo_5
import statistics
import ds18b20_therm
import datetime
import syslog
import weedb
import weewx.drivers
import weeutil.weeutil
import weewx.wxformulas
temp_probe = ds18b20_therm.DS18B20()
def logmsg(dst, msg):
syslog.syslog(dst, 'BYOWS: %s' % msg)
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def logerror(msg):
logmsg(syslog.LOG_ERROR, msg)
def logdebug(msg):
logmsg(syslog.LOG_DEBUG, msg)
def loader(config_dict, engine):
station = BYOWS(**config_dict['BYOWS'])
return station
class BYOWS(weewx.drivers.AbstractDevice):
""" Driver for the Raspberry Pi Bring Your Own Weather Station. """
def __init__(self, **stn_dict) :
""" Initialize object. """
self.station_hardware = stn_dict.get('hardware')
# TODO CONVERT TO WEEWX OPTIONS?
self.interval = 60 # Measurements recorded every 1 minute
self.wind_count = 0 # Counts how many half-rotations
self.radius_cm = 9.0 # Radius of your anemometer
self.wind_interval = 5 # How often (secs) to sample speed
self.wind_gust = 0
self.cm_in_a_km = 100000.0
self.secs_in_an_hour = 3600
self.adjustment = 1.18
self.bucket_size = 0.2794 # mm
self.rain_count = 0
self.store_speeds = []
self.store_directions = []
self.wind_speed_sensor = Button(5)
self.wind_speed_sensor.when_pressed = self.spin() # is this a valid call for the below class function?
self.rain_sensor = Button(6)
self.rain_sensor.when_pressed = self.bucket_tipped() # is this a valid call for the below class function?
def hardware_name(self):
return self.station_hardware
# Every half-rotations, add 1 to count
def spin(self):
self.wind_count = self.wind_count + 1
#print("spin" + str(self.wind_count))
def calculate_speed(self, time_sec):
circumference_cm = (2 * math.pi) * self.radius_cm
rotations = self.wind_count / 2.0
# Calculate distance travelled by a cup in km
dist_km = (circumference_cm * rotations) / self.cm_in_a_km
# Speed = distance / time
km_per_sec = dist_km / time_sec
km_per_hour = km_per_sec * self.secs_in_an_hour
# Calculate Speed
final_speed = km_per_hour * self.adjustment
return final_speed
def bucket_tipped(self):
self.rain_count = self.rain_count + 1
#print (self.rain_count * self.bucket_size)
def reset_rainfall(self):
self.rain_count = 0
def reset_wind(self):
self.wind_count = 0
def reset_gust(self):
self.wind_gust = 0
#===============================================================================
# LOOP record decoding functions
#===============================================================================
def genLoopPackets(self):
""" Generator function that continuously returns loop packets """
for _packet in self.genPackets():
yield _packet
def genPackets(self):
""" Generate measurement packets. """
global temp_probe
while True:
start_time = time.time()
while time.time() - start_time <= self.interval:
wind_start_time = time.time()
self.reset_wind()
while time.time() - wind_start_time <= self.wind_interval:
self.store_directions.append( wind_direction_byo_5.get_value() )
final_speed = self.calculate_speed( self.wind_interval ) # Add this speed to the list
self.store_speeds.append( final_speed )
wind_average = wind_direction_byo_5.get_average( self.store_directions )
self.wind_gust = max( self.store_speeds )
wind_speed = statistics.mean( self.store_speeds )
rainfall = self.rain_count * self.bucket_size # mm. if units are US, do we need to convert to inch?
self.reset_rainfall()
self.store_speeds = []
self.store_directions = []
ground_temp = temp_probe.read_temp()
humidity, pressure, ambient_temp = bme280_sensor_2.read_all()
# Build the weewx loop packet
packet = { 'dateTime': int( time.time() ), 'usUnits': weewx.US }
packet['outTemp'] = float( ambient_temp )
packet['outHumidity'] = float( humidity )
packet['soilTemp1'] = float( ground_temp )
packet['pressure'] = float( pressure )
packet['rain'] = rainfall
packet['windDir'] = float( wind_average )
packet['windSpeed'] = float( wind_speed )
packet['windGust'] = float( self.wind_gust )
# Send to the loop
yield packet
# Delay reading sensors for the interval time?
#time.sleep( self.interval )
import math
from gpiozero import MCP3008
import time
adc = MCP3008(channel=0)
count = 0
values = []
volts = {0.4: 0.0,
1.4: 22.5,
1.2: 45.0,
2.8: 67.5,
2.7: 90.0,
2.9: 112.5,
2.2: 135.0,
2.5: 157.5,
1.8: 180.0,
2.0: 202.5,
0.7: 225.0,
0.8: 247.5,
0.1: 270.0,
0.3: 292.5,
0.2: 315.0,
0.6: 337.5}
def get_average(angles):
sin_sum = 0.0
cos_sum = 0.0
for angle in angles:
r = math.radians(angle)
sin_sum += math.sin(r)
cos_sum += math.cos(r)
flen = float(len(angles))
s = sin_sum / flen
c = cos_sum / flen
arc = math.degrees(math.atan(s / c))
average = 0.0
if s > 0 and c > 0:
average = arc
elif c < 0:
average = arc + 180
elif s < 0 and c > 0:
average = arc + 360
return 0.0 if average == 360 else average
def get_value(length=5):
data = []
print("Measuring wind direction for %d seconds..." % length)
start_time = time.time()
while time.time() - start_time <= length:
wind =round(adc.value*3.3,1)
if not wind in volts: # keep only good measurements
print('unknown value ' + str(wind))
else:
data.append(volts[wind])
return get_average(data)
#while True:
# wind =round(adc.value*3.3,1)
# if not wind in volts:
# print('unknown value' + str(wind) + ' ' + str(volts[wind]))
# else:
# print('found ' + str(wind) + ' ' + str(volts[wind]))
import bme280
import smbus2
from time import sleep
port = 1
address = 0x76
bus = smbus2.SMBus(port)
bme280.load_calibration_params(bus,address)
def read_all():
bme280_data = bme280.sample(bus,address)
return bme280_data.humidity, bme280_data.pressure, bme280_data.temperature
#!/usr/bin/python3
import os, glob, time
# add the lines below to /etc/modules (reboot to take effect)
# w1-gpio
# w1-therm
class DS18B20(object):
def __init__(self):
self.device_file = glob.glob("/sys/bus/w1/devices/28*")[0] + "/w1_slave"
def read_temp_raw(self):
f = open(self.device_file, "r")
lines = f.readlines()
f.close()
return lines
def crc_check(self, lines):
return lines[0].strip()[-3:] == "YES"
def read_temp(self):
temp_c = -255
attempts = 0
lines = self.read_temp_raw()
success = self.crc_check(lines)
while not success and attempts < 3:
time.sleep(.2)
lines = self.read_temp_raw()
success = self.crc_check(lines)
attempts += 1
if success:
temp_line = lines[1]
equal_pos = temp_line.find("t=")
if equal_pos != -1:
temp_string = temp_line[equal_pos+2:]
temp_c = float(temp_string)/1000.0
return temp_c
if __name__ == "__main__":
obj = DS18B20()
print("Temp: %s C" % obj.read_temp())