1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import paho.mqtt.client as mqtt
from paho.mqtt.enums import MQTTProtocolVersion
import json
from time import sleep
from database import engine, SessionLocal
from models import Base, Reading
import os
from time import sleep
from datetime import datetime
print("Sleeping for 10 seconds to wait for db setup...")
sleep(10)
print("Creating ORM SQL Tables..")
Base.metadata.create_all(bind=engine)
print("Tables created successfully.")
def on_connect(client, userdata, flags, reason_code, properties):
print(f"Conectado: {reason_code}")
# Me inscrevo em todos os tópicos sobre clima
client.subscribe("weather/#")
# TODO: Avisar o time do ESP para usar este formato:
# Tópico MQTT: /weather/<stationid>
# Corpo:
# {
# "sensor": <sensor device id>,
# "unit": <measure id>",
# "reading_values: [
# <valor da medida, pode ser inteiro ou decimal>,
# . . .
# ],
# "reading_timestamps": [
# <timestamp das medidas>,
# . . .
# ]
# }
# Simplesmente imprime a mensagem como texto.
def on_message(client, userdata, msg):
payload = json.loads(msg.payload)
topic = msg.topic.split('/')[-1]
print(f"GOT MESSAGE {payload}")
if topic.isnumeric():
stationId = int(topic)
else:
stationId = None
try:
sensor = int(payload["sensor"])
measure = int(payload["unit"])
readings = []
# É para reading_values e reading_timestamps terem o mesmo tamanho,
# Mas não dá para saber.
for i in range(0, min(len(payload["reading_values"]), len(payload["reading_timestamps"]))):
readings.append({
"value": float(payload["reading_values"][i]),
"timestamp": int(payload["reading_timestamps"][i])
})
except:
print(f"ERRO! Leitura mal formatada {payload}")
return
session = SessionLocal()
session.begin()
for reading in readings:
print(reading)
session.add(Reading(sensor_device_id=stationId, measure_id=measure, value=reading['value'], time=datetime.fromtimestamp(reading['timestamp'],)))
session.commit()
session.close()
try:
user_name = os.environ["MQTT_CLIENT_USER"]
user_pass = os.environ["MQTT_CLIENT_PASSWORD"]
except KeyError:
print("credentials not supplied in environment variables. Going unauthenticated...")
user_name = None
user_pass = None
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=MQTTProtocolVersion.MQTTv5)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
#mqttc.username_pw_set(user_name, user_pass)
connected = False
while(not connected):
try:
mqttc.connect("mosquitto", 1883, 60, '', 0, True)
connected = True
except ConnectionRefusedError:
print("Failed to connect. Retrying...")
sleep(5)
mqttc.loop_forever()
|