111 lines
2.8 KiB
Python
111 lines
2.8 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import queue
|
|
import sys
|
|
|
|
import confuse
|
|
|
|
from energy_usage.influx_client import InfluxClient
|
|
from energy_usage.mqtt_client import MqttClient
|
|
from energy_usage.sep import parse_sep, usage_to_datapoints
|
|
|
|
|
|
def parse_args(arg_str=None):
|
|
if arg_str is None:
|
|
arg_str = sys.argv[1:]
|
|
|
|
parser = argparse.ArgumentParser('energy-usage')
|
|
|
|
parser.add_argument(
|
|
'-d', '--debug', dest='debug', action='store_true',
|
|
help='Enable debug output')
|
|
parser.add_argument(
|
|
'-n', '--noop', dest='noop', action='store_true',
|
|
help="Don't make any modifications, just show what would be done")
|
|
|
|
args = parser.parse_args(arg_str)
|
|
|
|
return parser.prog, args
|
|
|
|
|
|
def load_config(args):
|
|
config = confuse.Configuration('energy-usage', 'energy_usage')
|
|
config.set_args(args)
|
|
|
|
validate_config(config)
|
|
|
|
return config
|
|
|
|
|
|
def validate_config(config):
|
|
required = [
|
|
(config['mqtt']['username'], str),
|
|
(config['mqtt']['password'], str),
|
|
(config['mqtt']['topic'], str)
|
|
]
|
|
|
|
for item in required:
|
|
key, datatype = item
|
|
key.get(datatype)
|
|
|
|
|
|
def setup_logging(prog, config):
|
|
logger = logging.getLogger()
|
|
|
|
if config['debug'].get(bool):
|
|
logger.setLevel(logging.DEBUG)
|
|
else:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
formatter = logging.Formatter('{0}[{1}]: %(levelname)s %(message)s'.format(prog, str(os.getpid())))
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
|
|
def main():
|
|
prog, args = parse_args()
|
|
|
|
config = load_config(args)
|
|
|
|
logger = setup_logging(prog, config)
|
|
|
|
msg_q = queue.Queue()
|
|
|
|
mqtt_client = MqttClient(
|
|
server=config['mqtt']['server'].get(str),
|
|
port=config['mqtt']['port'].get(int),
|
|
username=config['mqtt']['username'].get(str),
|
|
password=config['mqtt']['password'].get(str),
|
|
topic=config['mqtt']['topic'].get(str),
|
|
ca_certs=config['ca_certs'].get(str),
|
|
msg_q=msg_q)
|
|
mqtt_client.connect()
|
|
mqtt_client.run()
|
|
|
|
influx_client = InfluxClient(
|
|
server=config['influx']['server'].get(str),
|
|
port=config['influx']['port'].get(int),
|
|
database='default',
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
msg = msg_q.get()
|
|
logger.debug(msg.topic + " " + str(msg.payload))
|
|
usage = parse_sep(msg.topic, msg.payload)
|
|
if usage:
|
|
usage_datapoints = usage_to_datapoints(usage)
|
|
logger.debug('Writing metrics to influx: %s', usage_datapoints)
|
|
if not config["noop"].get(bool):
|
|
influx_client.write_points(usage_datapoints, noop=config["noop"].get(bool))
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|