Try this version.
On Wed, Mar 18, 2020 at 4:14 AM Thomas Keffer <[email protected]> wrote:
> He's right. The extension has not been completely ported over to Python 3.
>
>
> On Tue, Mar 17, 2020 at 10:45 PM Gert Andersen <[email protected]>
> wrote:
>
>> Hi Vince
>>
>> Thanks for looking.
>>
>> I have downloaded mqtt.py from the link you mentioned and installed it.
>> Still syntax error at line 215
>>
>> If you look at line 215, you have:
>> *except KeyError, e:*
>> and I think it should be
>> *except KeyError as e:*
>> under Python 3
>>
>> Even if I change that line, I just get another error.
>>
>> So, I might do something wrong, but I can't see what. If you got this
>> working, you must have done something differently
>>
>> Gert
>>
>> On Tuesday, March 17, 2020 at 10:50:41 PM UTC+1, Vince Skahan wrote:
>>>
>>>
>>> Mar 17 22:05:25 weewx4Test weewx[7169] CRITICAL __main__: **** name
>>>> 'queue' is not defined
>>>>
>>>>
>>>
>>> You're not working off the latest version on github it seems
>>>
>>> The top of the old file has a line saying simply "import queue".
>>> The top of the new version has python3 and python2 supported
>>>
>>> Download the current file
>>> https://raw.githubusercontent.com/matthewwall/weewx-mqtt/master/bin/user/mqtt.py
>>> and install it in place of your messed up one. It should work.
>>>
>>>
>>>
>>> --
>> You received this message because you are subscribed to the Google Groups
>> "weewx-development" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/weewx-development/a6fa5d8a-dd64-47ed-bf7a-1c7286f52499%40googlegroups.com
>> <https://groups.google.com/d/msgid/weewx-development/a6fa5d8a-dd64-47ed-bf7a-1c7286f52499%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>>
>
--
You received this message because you are subscribed to the Google Groups
"weewx-development" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/weewx-development/CAPq0zEB_eJF6p89p3TQEF_2GfQPWo-noXRRf_u2BsFM5T6BmNA%40mail.gmail.com.
# Copyright 2013 Matthew Wall
# Distributed under the terms of the GNU Public License (GPLv3)
"""
Upload data to MQTT server
This service requires the python bindings for mqtt:
pip install paho-mqtt
Minimal configuration:
[StdRestful]
[[MQTT]]
server_url = mqtt://username:password@localhost:1883/
topic = weather
unit_system = METRIC
Use of the inputs map to customer name, format, or units:
[StdRestful]
[[MQTT]]
...
unit_system = METRIC # default to metric
[[[inputs]]]
[[[[outTemp]]]]
name = inside_temperature # use a label other than outTemp
format = %.2f # two decimal places of precision
units = degree_F # convert outTemp to F, others in C
[[[[windSpeed]]]]
units = knot # convert the wind speed to knots
Use of TLS to encrypt connection to broker. The TLS options will be passed to
Paho client tls_set method. Refer to Paho client documentation for details:
https://eclipse.org/paho/clients/python/docs/
[StdRestful]
[[MQTT]]
...
[[[tls]]]
# CA certificates file (mandatory)
ca_certs = /etc/ssl/certs/ca-certificates.crt
# PEM encoded client certificate file (optional)
certfile = /home/user/.ssh/id.crt
# private key file (optional)
keyfile = /home/user/.ssh/id.key
# Certificate requirements imposed on the broker (optional).
# Options are 'none', 'optional' or 'required'.
# Default is 'required'.
cert_reqs = required
# SSL/TLS protocol (optional).
# Options include sslv1, sslv2, sslv23, tls, tlsv1.
# Default is 'tlsv1'
# Not all options are supported by all systems.
tls_version = tlsv1
# Allowable encryption ciphers (optional).
# To specify multiple cyphers, delimit with commas and enclose
# in quotes.
#ciphers =
"""
try:
import queue as Queue
except ImportError:
import Queue
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
import paho.mqtt.client as mqtt
import random
import sys
import time
try:
import cjson as json
setattr(json, 'dumps', json.encode)
setattr(json, 'loads', json.decode)
except (ImportError, AttributeError):
try:
import simplejson as json
except ImportError:
import json
import weewx
import weewx.restx
import weewx.units
from weeutil.weeutil import to_int, to_bool, accumulateLeaves
VERSION = "0.20"
if weewx.__version__ < "3":
raise weewx.UnsupportedFeature("weewx 3 is required, found %s" %
weewx.__version__)
try:
# weewx4 logging
import weeutil.logger
import logging
log = logging.getLogger(__name__)
def logdbg(msg):
log.debug(msg)
def loginf(msg):
log.info(msg)
def logerr(msg):
log.error(msg)
except ImportError:
# old-style weewx logging
import syslog
def logmsg(level, msg):
syslog.syslog(level, 'restx: MQTT: %s' % msg)
def logdbg(msg):
logmsg(syslog.LOG_DEBUG, msg)
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def logerr(msg):
logmsg(syslog.LOG_ERR, msg)
def _compat(d, old_label, new_label):
if old_label in d and new_label not in d:
d.setdefault(new_label, d[old_label])
d.pop(old_label)
def _obfuscate_password(url):
parts = urlparse(url)
if parts.password is not None:
# split out the host portion manually. We could use
# parts.hostname and parts.port, but then you'd have to check
# if either part is None. The hostname would also be lowercased.
host_info = parts.netloc.rpartition('@')[-1]
parts = parts._replace(netloc='{}:xxx@{}'.format(
parts.username, host_info))
url = parts.geturl()
return url
# some unit labels are rather lengthy. this reduces them to something shorter.
UNIT_REDUCTIONS = {
'degree_F': 'F',
'degree_C': 'C',
'inch': 'in',
'mile_per_hour': 'mph',
'mile_per_hour2': 'mph',
'km_per_hour': 'kph',
'km_per_hour2': 'kph',
'meter_per_second': 'mps',
'meter_per_second2': 'mps',
'degree_compass': None,
'watt_per_meter_squared': 'Wpm2',
'uv_index': None,
'percent': None,
'unix_epoch': None,
}
# return the units label for an observation
def _get_units_label(obs, unit_system):
(unit_type, _) = weewx.units.getStandardUnitType(unit_system, obs)
return UNIT_REDUCTIONS.get(unit_type, unit_type)
# get the template for an observation based on the observation key
def _get_template(obs_key, overrides, append_units_label, unit_system):
tmpl_dict = dict()
if append_units_label:
label = _get_units_label(obs_key, unit_system)
if label is not None:
tmpl_dict['name'] = "%s_%s" % (obs_key, label)
for x in ['name', 'format', 'units']:
if x in overrides:
tmpl_dict[x] = overrides[x]
return tmpl_dict
class MQTT(weewx.restx.StdRESTbase):
def __init__(self, engine, config_dict):
"""This service recognizes standard restful options plus the following:
Required parameters:
server_url: URL of the broker, e.g., something of the form
mqtt://username:password@localhost:1883/
Default is None
Optional parameters:
unit_system: one of US, METRIC, or METRICWX
Default is None; units will be those of data in the database
topic: the MQTT topic under which to post
Default is 'weather'
append_units_label: should units label be appended to name
Default is True
obs_to_upload: Which observations to upload. Possible values are
none or all. When none is specified, only items in the inputs list
will be uploaded. When all is specified, all observations will be
uploaded, subject to overrides in the inputs list.
Default is all
inputs: dictionary of weewx observation names with optional upload
name, format, and units
Default is None
tls: dictionary of TLS parameters used by the Paho client to establish
a secure connection with the broker.
Default is None
"""
super(MQTT, self).__init__(engine, config_dict)
loginf("service version is %s" % VERSION)
try:
site_dict = config_dict['StdRESTful']['MQTT']
site_dict = accumulateLeaves(site_dict, max_level=1)
site_dict['server_url']
except KeyError as e:
logerr("Data will not be uploaded: Missing option %s" % e)
return
# for backward compatibility: 'units' is now 'unit_system'
_compat(site_dict, 'units', 'unit_system')
site_dict.setdefault('client_id', '')
site_dict.setdefault('topic', 'weather')
site_dict.setdefault('append_units_label', True)
site_dict.setdefault('augment_record', True)
site_dict.setdefault('obs_to_upload', 'all')
site_dict.setdefault('retain', False)
site_dict.setdefault('qos', 0)
site_dict.setdefault('aggregation', 'individual,aggregate')
usn = site_dict.get('unit_system', None)
if usn is not None:
site_dict['unit_system'] = weewx.units.unit_constants[usn]
if 'tls' in config_dict['StdRESTful']['MQTT']:
site_dict['tls'] = dict(config_dict['StdRESTful']['MQTT']['tls'])
if 'inputs' in config_dict['StdRESTful']['MQTT']:
site_dict['inputs'] = dict(config_dict['StdRESTful']['MQTT']['inputs'])
site_dict['append_units_label'] = to_bool(site_dict.get('append_units_label'))
site_dict['augment_record'] = to_bool(site_dict.get('augment_record'))
site_dict['retain'] = to_bool(site_dict.get('retain'))
site_dict['qos'] = to_int(site_dict.get('qos'))
binding = site_dict.pop('binding', 'archive')
loginf("binding to %s" % binding)
# if we are supposed to augment the record with data from weather
# tables, then get the manager dict to do it. there may be no weather
# tables, so be prepared to fail.
try:
if site_dict.get('augment_record'):
_manager_dict = weewx.manager.get_manager_dict_from_config(
config_dict, 'wx_binding')
site_dict['manager_dict'] = _manager_dict
except weewx.UnknownBinding:
pass
self.archive_queue = Queue.Queue()
self.archive_thread = MQTTThread(self.archive_queue, **site_dict)
self.archive_thread.start()
if 'archive' in binding:
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
if 'loop' in binding:
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
if 'topic' in site_dict:
loginf("topic is %s" % site_dict['topic'])
if usn is not None:
loginf("desired unit system is %s" % usn)
loginf("data will be uploaded to %s" %
_obfuscate_password(site_dict['server_url']))
if 'tls' in site_dict:
loginf("network encryption/authentication will be attempted")
def new_archive_record(self, event):
self.archive_queue.put(event.record)
def new_loop_packet(self, event):
self.archive_queue.put(event.packet)
class TLSDefaults(object):
def __init__(self):
import ssl
# Paho acceptable TLS options
self.TLS_OPTIONS = [
'ca_certs', 'certfile', 'keyfile',
'cert_reqs', 'tls_version', 'ciphers'
]
# map for Paho acceptable TLS cert request options
self.CERT_REQ_OPTIONS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
# Map for Paho acceptable TLS version options. Some options are
# dependent on the OpenSSL install so catch exceptions
self.TLS_VER_OPTIONS = dict()
try:
self.TLS_VER_OPTIONS['sslv2'] = ssl.PROTOCOL_SSLv2
except AttributeError:
pass
try:
self.TLS_VER_OPTIONS['sslv3'] = ssl.PROTOCOL_SSLv3
except AttributeError:
pass
self.TLS_VER_OPTIONS['sslv23'] = ssl.PROTOCOL_SSLv23
self.TLS_VER_OPTIONS['tlsv1'] = ssl.PROTOCOL_TLSv1
try:
self.TLS_VER_OPTIONS['tls'] = ssl.PROTOCOL_TLS
except AttributeError:
pass
class MQTTThread(weewx.restx.RESTThread):
def __init__(self, queue, server_url,
client_id='', topic='', unit_system=None, skip_upload=False,
augment_record=True, retain=False, aggregation='individual',
inputs={}, obs_to_upload='all', append_units_label=True,
manager_dict=None, tls=None,
post_interval=None, stale=None,
log_success=True, log_failure=True,
timeout=60, max_tries=3, retry_wait=5,
max_backlog=sys.maxsize):
super(MQTTThread, self).__init__(queue,
protocol_name='MQTT',
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
max_tries=max_tries,
timeout=timeout,
retry_wait=retry_wait)
self.server_url = server_url
self.client_id = client_id
self.topic = topic
self.upload_all = True if obs_to_upload.lower() == 'all' else False
self.append_units_label = append_units_label
self.tls_dict = {}
if tls is not None:
# we have TLS options so construct a dict to configure Paho TLS
dflts = TLSDefaults()
for opt in tls:
if opt == 'cert_reqs':
if tls[opt] in dflts.CERT_REQ_OPTIONS:
self.tls_dict[opt] = dflts.CERT_REQ_OPTIONS.get(tls[opt])
elif opt == 'tls_version':
if tls[opt] in dflts.TLS_VER_OPTIONS:
self.tls_dict[opt] = dflts.TLS_VER_OPTIONS.get(tls[opt])
elif opt in dflts.TLS_OPTIONS:
self.tls_dict[opt] = tls[opt]
logdbg("TLS parameters: %s" % self.tls_dict)
self.inputs = inputs
self.unit_system = unit_system
self.augment_record = augment_record
self.retain = retain
self.aggregation = aggregation
self.templates = dict()
self.skip_upload = skip_upload
def filter_data(self, record):
# if uploading everything, we must check the upload variables list
# every time since variables may come and go in a record. use the
# inputs to override any generic template generation.
if self.upload_all:
for f in record:
if f not in self.templates:
self.templates[f] = _get_template(f,
self.inputs.get(f, {}),
self.append_units_label,
record['usUnits'])
# otherwise, create the list of upload variables once, based on the
# user-specified list of inputs.
elif not self.templates:
for f in self.inputs:
self.templates[f] = _get_template(f, self.inputs[f],
self.append_units_label,
record['usUnits'])
# loop through the templates, populating them with data from the record
data = dict()
for k in self.templates:
try:
v = float(record.get(k))
name = self.templates[k].get('name', k)
fmt = self.templates[k].get('format', '%s')
to_units = self.templates[k].get('units')
if to_units is not None:
(from_unit, from_group) = weewx.units.getStandardUnitType(
record['usUnits'], k)
from_t = (v, from_unit, from_group)
v = weewx.units.convert(from_t, to_units)[0]
s = fmt % v
data[name] = s
except (TypeError, ValueError):
pass
# FIXME: generalize this
if 'latitude' in data and 'longitude' in data:
parts = [str(data['latitude']), str(data['longitude'])]
if 'altitude_meter' in data:
parts.append(str(data['altitude_meter']))
elif 'altitude_foot' in data:
parts.append(str(data['altitude_foot']))
data['position'] = ','.join(parts)
return data
def process_record(self, record, dbm):
import socket
if self.augment_record and dbm is not None:
record = self.get_record(record, dbm)
if self.unit_system is not None:
record = weewx.units.to_std_system(record, self.unit_system)
data = self.filter_data(record)
if weewx.debug >= 2:
logdbg("data: %s" % data)
if self.skip_upload:
loginf("skipping upload")
return
url = urlparse(self.server_url)
for _count in range(self.max_tries):
try:
client_id = self.client_id
if not client_id:
pad = "%032x" % random.getrandbits(128)
client_id = 'weewx_%s' % pad[:8]
mc = mqtt.Client(client_id=client_id)
if url.username is not None and url.password is not None:
mc.username_pw_set(url.username, url.password)
# if we have TLS opts configure TLS on our broker connection
if len(self.tls_dict) > 0:
mc.tls_set(**self.tls_dict)
mc.connect(url.hostname, url.port)
mc.loop_start()
if self.aggregation.find('aggregate') >= 0:
tpc = self.topic + '/loop'
(res, mid) = mc.publish(tpc, json.dumps(data),
retain=self.retain, qos=self.qos)
if res != mqtt.MQTT_ERR_SUCCESS:
logerr("publish failed for %s: %s" % (tpc, res))
if self.aggregation.find('individual') >= 0:
for key in data:
tpc = self.topic + '/' + key
(res, mid) = mc.publish(tpc, data[key],
retain=self.retain)
if res != mqtt.MQTT_ERR_SUCCESS:
logerr("publish failed for %s: %s" % (tpc, res))
mc.loop_stop()
mc.disconnect()
return
except (socket.error, socket.timeout, socket.herror) as e:
logdbg("Failed upload attempt %d: %s" % (_count+1, e))
time.sleep(self.retry_wait)
else:
raise weewx.restx.FailedPost("Failed upload after %d tries" %
(self.max_tries,))