""" SEP protocol parser """ import datetime import json def parse_sep(topic, payload_str): """ Credit to https://gist.github.com/ndfred/b373eeafc4f5b0870c1b8857041289a9 :param topic: str Topic SEP payload was received from :param payload_str: str SEP payload message as a string :return: """ def _get_metric(path, type=int): value = payload try: for hop in path: value = value[hop] if type is int: return int(value, 16) else: return type(value) except KeyError: return None payload = json.loads(payload_str) timestamp = datetime.datetime.fromtimestamp(payload["gmtime"], tz=datetime.timezone.utc) electricity_consumption = _get_metric(["elecMtr", "0702", "04", "00"]) electricity_daily_consumption = _get_metric(["elecMtr", "0702", "04", "01"]) electricity_weekly_consumption = _get_metric(["elecMtr", "0702", "04", "30"]) electricity_monthly_consumption = _get_metric(["elecMtr", "0702", "04", "40"]) electricity_multiplier = _get_metric(["elecMtr", "0702", "03", "01"]) electricity_divisor = _get_metric(["elecMtr", "0702", "03", "02"]) electricity_meter = _get_metric(["elecMtr", "0702", "00", "00"]) electricity_mpan = _get_metric(["elecMtr", "0702", "03", "07"], str) gas_daily_consumption = _get_metric(["gasMtr", "0702", "0C", "01"]) gas_weekly_consumption = _get_metric(["gasMtr", "0702", "0C", "30"]) gas_monthly_consumption = _get_metric(["gasMtr", "0702", "0C", "40"]) gas_multiplier = _get_metric(["gasMtr", "0702", "03", "01"]) gas_divisor = _get_metric(["gasMtr", "0702", "03", "02"]) gas_meter = _get_metric(["gasMtr", "0702", "00", "00"]) gas_mpan = _get_metric(["gasMtr", "0702", "03", "07"], str) device_gid = _get_metric(["gid"], str) assert(int(payload["elecMtr"]["0702"]["03"]["00"], 16) == 0) # kWh assert(int(payload["gasMtr"]["0702"]["03"]["01"], 16) == 1) # m3 assert(int(payload["gasMtr"]["0702"]["03"]["12"], 16) == 0) # kWh data = { 'timestamp': timestamp, 'tags': { 'topic': topic, 'gid': device_gid, }, 'electricity': { 'tags': { 'mpan': electricity_mpan, }, 'metrics': { 'draw': electricity_consumption * electricity_multiplier / electricity_divisor, 'consumption_daily': electricity_daily_consumption * electricity_multiplier / electricity_divisor, 'consumption_weekly': electricity_weekly_consumption * electricity_multiplier / electricity_divisor, 'consumption_monthly': electricity_monthly_consumption * electricity_multiplier / electricity_divisor, 'meter_reading': electricity_meter * electricity_multiplier / electricity_divisor, }, }, 'gas': { 'tags': { 'mpan': gas_mpan, }, 'metrics': { 'consumption_daily': gas_daily_consumption * gas_multiplier / gas_divisor, 'consumption_weekly': gas_weekly_consumption * gas_multiplier / gas_divisor, 'consumption_monthly': gas_monthly_consumption * gas_multiplier / gas_divisor, 'meter_reading': gas_meter * gas_multiplier / gas_divisor, }, }, } return data def usage_to_datapoints(usage): datapoints = [] for utility in ['electricity', 'gas']: datapoints.append({ "measurement": utility, "tags": {**usage['tags'], **usage[utility]['tags']}, "time": usage['timestamp'].isoformat(), "fields": usage[utility]['metrics'], }) return datapoints