summaryrefslogtreecommitdiff
path: root/mqtt_client/main.py
blob: f460520ca984b52124e36dd482c7f9bb738d8008 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import paho.mqtt.client as mqtt
from paho.mqtt.enums import MQTTProtocolVersion
import json
from time import sleep
from database import engine, SessionLocal
from models import Base, Reading
import os
from time import sleep

print("Sleeping for 10 seconds to wait for db setup...")
sleep(10)

print("Creating ORM SQL Tables..")
Base.metadata.create_all(bind=engine)
print("Tables created successfully.")


def on_connect(client, userdata, flags, reason_code, properties):
    print(f"Conectado: {reason_code}")
    # Me inscrevo em todos os tópicos sobre clima
    client.subscribe("weather/#")

# TODO: Avisar o time do ESP para usar este formato:
# Tópico MQTT: /weather/<stationid>
# Corpo:
# {
#   "sensor": <sensor device id>,
#   "unit": <measure id>",
#   "reading_values: [
#        <valor da medida, pode ser inteiro ou decimal>,
#       . . .
#    ],
#   "reading_timestamps": [
#        <timestamp das medidas>,
#       . . .
#    ]
# }
# Simplesmente imprime a mensagem como texto.
def on_message(client, userdata, msg):
    payload = json.loads(msg.payload)
    topic = msg.topic.split('/')[-1]
    if topic.isnumeric():
        stationId = int(topic)
    else:
        stationId = None

    try:
        sensor = int(payload["sensor"])
        measure = int(payload["unit"])
        readings = []
        # É para reading_values e reading_timestamps terem o mesmo tamanho,
        # Mas não dá para saber.
        for i in range(0, min(len(payload["reading_values"]), len(payload["reading_timestamps"]))):
            readings.append({
                "value": float(payload["reading_values"][i]),
                "timestamp": int(payload["reading_timestamps"][i])
            })

    except:
        print(f"ERRO! Leitura mal formatada {payload}")
        return

    session = SessionLocal()
    session.begin()
    for reading in readings:
        print(reading)
        session.add(Reading(sensor_device_id=stationId, measure_id=measure, value=reading['value']))
    session.commit()
    session.close()

try:
    user_name = os.environ["MQTT_CLIENT_USER"]
    user_pass = os.environ["MQTT_CLIENT_PASSWORD"]
except KeyError:
    print("credentials not supplied in environment variables. Going unauthenticated...")
    user_name = None
    user_pass = None

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=MQTTProtocolVersion.MQTTv5)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.username_pw_set(user_name, user_pass)

connected = False

while(not connected):
    try:
        mqttc.connect("mosquitto", 1883, 60, '', 0, True)
        connected = True
    except ConnectionRefusedError:
        print("Failed to connect. Retrying...")
        sleep(5)

mqttc.loop_forever()