commit 2b5c99e0cd3a93d1a3cdb006d42020e3e2efee8a Author: Ben Roberts Date: Sun Sep 6 00:10:50 2020 +0100 Initial version diff --git a/energy_usage/__init__.py b/energy_usage/__init__.py new file mode 100644 index 0000000..72bdd01 --- /dev/null +++ b/energy_usage/__init__.py @@ -0,0 +1 @@ +VERSION = "0.1" diff --git a/energy_usage/config_default.yaml b/energy_usage/config_default.yaml new file mode 100644 index 0000000..35d304c --- /dev/null +++ b/energy_usage/config_default.yaml @@ -0,0 +1,6 @@ +--- +ca_certs: /etc/ssl/certs/ca-certificates.crt +debug: false +mqtt: + server: glowmqtt.energyhive.com + port: 8883 diff --git a/energy_usage/influx_client.py b/energy_usage/influx_client.py new file mode 100644 index 0000000..937df60 --- /dev/null +++ b/energy_usage/influx_client.py @@ -0,0 +1,17 @@ +from influxdb import InfluxDBClient + + +class InfluxClient: + def __init__(self, server, port, database): + self.server = server + self.port = port + self.database = database + + self.client = InfluxDBClient( + host=self.server, + port=self.port, + database=self.database) + + def write_points(self, body): + self.client.write_points(body) + diff --git a/energy_usage/main.py b/energy_usage/main.py new file mode 100644 index 0000000..9c85a43 --- /dev/null +++ b/energy_usage/main.py @@ -0,0 +1,107 @@ +import argparse +import logging +import os +import queue +import sys + +import confuse + +from energy_usage.influx_client import InfluxClient +from energy_usage.mqtt_client import MqttClient +from energy_usage.sep import parse_sep, usage_to_datapoints + + +def parse_args(arg_str=None): + if arg_str is None: + arg_str = sys.argv[1:] + + parser = argparse.ArgumentParser('energy-usage') + + parser.add_argument( + '-d', '--debug', dest='debug', action='store_true', + help='Enable debug output') + parser.add_argument( + '-n', '--noop', dest='noop', action='store_true', + help="Don't make any modifications, just show what would be done") + + args = parser.parse_args(arg_str) + + return parser.prog, args + + +def load_config(args): + config = confuse.Configuration('energy-usage', 'energy_usage') + config.set_args(args) + + validate_config(config) + + return config + + +def validate_config(config): + required = [ + (config['mqtt']['username'], str), + (config['mqtt']['password'], str), + (config['mqtt']['topic'], str) + ] + + for item in required: + key, datatype = item + key.get(datatype) + + +def setup_logging(prog, config): + logger = logging.getLogger() + + if config['debug'].get(bool): + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter('{0}[{1}]: %(levelname)s %(message)s'.format(prog, str(os.getpid()))) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + +def main(): + prog, args = parse_args() + + config = load_config(args) + + logger = setup_logging(prog, config) + + msg_q = queue.Queue() + + mqtt_client = MqttClient( + server=config['mqtt']['server'].get(str), + port=config['mqtt']['port'].get(int), + username=config['mqtt']['username'].get(str), + password=config['mqtt']['password'].get(str), + topic=config['mqtt']['topic'].get(str), + ca_certs=config['ca_certs'].get(str), + msg_q=msg_q) + mqtt_client.connect() + mqtt_client.run() + + influx_client = InfluxClient( + server=config['influx']['server'].get(str), + port=config['influx']['port'].get(int), + database='default', + ) + + while True: + try: + msg = msg_q.get() + logger.debug(msg.topic + " " + str(msg.payload)) + usage = parse_sep(msg.topic, msg.payload) + usage_datapoints = usage_to_datapoints(usage) + influx_client.write_points(usage_datapoints) + except KeyboardInterrupt: + break + + +if __name__ == '__main__': + main() diff --git a/energy_usage/mqtt_client.py b/energy_usage/mqtt_client.py new file mode 100644 index 0000000..ff002c4 --- /dev/null +++ b/energy_usage/mqtt_client.py @@ -0,0 +1,46 @@ +import logging + +import paho.mqtt.client as mqtt + +class MqttClient: + def __init__(self, server, port, username, password, topic, ca_certs, msg_q): + self.username = username + self.password = password + self.topic = topic + self.ca_certs = ca_certs + self.server = server + self.port = port + self.msg_q = msg_q + + self.logger = logging.getLogger(__name__) + + self.client = mqtt.Client() + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + def __del__(self): + self.client.loop_stop() + + def connect(self): + self.client.username_pw_set(self.username, password=self.password) + self.client.tls_set(ca_certs=self.ca_certs) + self.client.connect(self.server, self.port, 60) + + def run(self): + # Blocking call that processes network traffic, dispatches callbacks and + # handles reconnecting. + # Other loop*() functions are available that give a threaded interface and a + # manual interface. + self.client.loop_start() + + # The callback for when the client receives a CONNACK response from the server. + def on_connect(self, client, userdata, flags, rc): + self.logger.info("Connected with result code " + str(rc)) + + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe(self.topic) + + # The callback for when a PUBLISH message is received from the server. + def on_message(self, client, userdata, msg): + self.msg_q.put(msg) diff --git a/energy_usage/sep.py b/energy_usage/sep.py new file mode 100644 index 0000000..f6af82a --- /dev/null +++ b/energy_usage/sep.py @@ -0,0 +1,84 @@ +""" SEP protocol parser +""" +import datetime +import json + + +def parse_sep(topic, payload_str): + """ + + Credit to https://gist.github.com/ndfred/b373eeafc4f5b0870c1b8857041289a9 + + :param topic: str Topic SEP payload was received from + :param payload_str: str SEP payload message as a string + :return: + """ + payload = json.loads(payload_str) + timestamp = datetime.datetime.fromtimestamp(payload["gmtime"], tz=datetime.timezone.utc) + electricity_consumption = int(payload["elecMtr"]["0702"]["04"]["00"], 16) + electricity_daily_consumption = int(payload["elecMtr"]["0702"]["04"]["01"], 16) + electricity_weekly_consumption = int(payload["elecMtr"]["0702"]["04"]["30"], 16) + electricity_monthly_consumption = int(payload["elecMtr"]["0702"]["04"]["40"], 16) + electricity_multiplier = int(payload["elecMtr"]["0702"]["03"]["01"], 16) + electricity_divisor = int(payload["elecMtr"]["0702"]["03"]["02"], 16) + electricity_meter = int(payload["elecMtr"]["0702"]["00"]["00"], 16) + electricity_mpan = payload["elecMtr"]["0702"]["03"]["07"] + gas_daily_consumption = int(payload["gasMtr"]["0702"]["0C"]["01"], 16) + gas_weekly_consumption = int(payload["gasMtr"]["0702"]["0C"]["30"], 16) + gas_monthly_consumption = int(payload["gasMtr"]["0702"]["0C"]["40"], 16) + gas_multiplier = int(payload["gasMtr"]["0702"]["03"]["01"], 16) + gas_divisor = int(payload["gasMtr"]["0702"]["03"]["02"], 16) + gas_meter = int(payload["gasMtr"]["0702"]["00"]["00"], 16) + gas_mpan = payload["gasMtr"]["0702"]["03"]["07"] + device_gid = payload["gid"] + + assert(int(payload["elecMtr"]["0702"]["03"]["00"], 16) == 0) # kWh + assert(int(payload["gasMtr"]["0702"]["03"]["01"], 16) == 1) # m3 + assert(int(payload["gasMtr"]["0702"]["03"]["12"], 16) == 0) # kWh + + data = { + 'timestamp': timestamp, + 'tags': { + 'topic': topic, + 'gid': device_gid, + }, + 'electricity': { + 'tags': { + 'mpan': electricity_mpan, + }, + 'metrics': { + 'draw': electricity_consumption * electricity_multiplier / electricity_divisor, + 'consumption_daily': electricity_daily_consumption * electricity_multiplier / electricity_divisor, + 'consumption_weekly': electricity_weekly_consumption * electricity_multiplier / electricity_divisor, + 'consumption_monthly': electricity_monthly_consumption * electricity_multiplier / electricity_divisor, + 'meter_reading': electricity_meter * electricity_multiplier / electricity_divisor, + }, + }, + 'gas': { + 'tags': { + 'mpan': gas_mpan, + }, + 'metrics': { + 'consumption_daily': gas_daily_consumption * gas_multiplier / gas_divisor, + 'consumption_weekly': gas_weekly_consumption * gas_multiplier / gas_divisor, + 'consumption_monthly': gas_monthly_consumption * gas_multiplier / gas_divisor, + 'meter_reading': gas_meter * gas_multiplier / gas_divisor, + }, + }, + } + + return data + + +def usage_to_datapoints(usage): + datapoints = [] + + for utility in ['electricity', 'gas']: + datapoints.append({ + "measurement": utility, + "tags": {**usage['tags'], **usage[utility]['tags']}, + "time": usage['timestamp'].isoformat(), + "fields": usage[utility]['metrics'], + }) + + return datapoints diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cef196a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +confuse~=1.3.0 +paho-mqtt~=1.5.0