From 032a77c0b1a2e0e2c247f511c0dc6d2555e691fb Mon Sep 17 00:00:00 2001 From: Matheus Date: Fri, 31 Oct 2025 22:09:32 -0300 Subject: feat:Worker gravando no banco O worker recebe um pacote JSON e escreve no banco. --- compose.yaml | 1 + mosquitto-config/mosquitto.conf | 2 +- mqtt_client/main.py | 56 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/compose.yaml b/compose.yaml index 549379a..9af60c6 100644 --- a/compose.yaml +++ b/compose.yaml @@ -30,6 +30,7 @@ services: - mosquitto depends_on: - mosquitto + - postgres postgres: image: postgres:13 container_name: postgres diff --git a/mosquitto-config/mosquitto.conf b/mosquitto-config/mosquitto.conf index 7e941f9..11a428a 100644 --- a/mosquitto-config/mosquitto.conf +++ b/mosquitto-config/mosquitto.conf @@ -1,3 +1,3 @@ listener 1883 0.0.0.0 allow_anonymous true -# password_file /mosquitto/config/pass-file +password_file /mosquitto/config/pass-file diff --git a/mqtt_client/main.py b/mqtt_client/main.py index 109ca49..f460520 100644 --- a/mqtt_client/main.py +++ b/mqtt_client/main.py @@ -2,9 +2,13 @@ import paho.mqtt.client as mqtt from paho.mqtt.enums import MQTTProtocolVersion import json from time import sleep -from database import engine -from models import Base +from database import engine, SessionLocal +from models import Base, Reading import os +from time import sleep + +print("Sleeping for 10 seconds to wait for db setup...") +sleep(10) print("Creating ORM SQL Tables..") Base.metadata.create_all(bind=engine) @@ -16,13 +20,53 @@ def on_connect(client, userdata, flags, reason_code, properties): # Me inscrevo em todos os tópicos sobre clima client.subscribe("weather/#") +# TODO: Avisar o time do ESP para usar este formato: +# Tópico MQTT: /weather/ +# Corpo: +# { +# "sensor": , +# "unit": ", +# "reading_values: [ +# , +# . . . +# ], +# "reading_timestamps": [ +# , +# . . . +# ] +# } # Simplesmente imprime a mensagem como texto. def on_message(client, userdata, msg): payload = json.loads(msg.payload) - print(msg.topic) - print(f"Value: {payload['value']}") - print(f"Unit: {payload['unit']}") - print(f"Timestamp: {payload['timestamp']}") + topic = msg.topic.split('/')[-1] + if topic.isnumeric(): + stationId = int(topic) + else: + stationId = None + + try: + sensor = int(payload["sensor"]) + measure = int(payload["unit"]) + readings = [] + # É para reading_values e reading_timestamps terem o mesmo tamanho, + # Mas não dá para saber. + for i in range(0, min(len(payload["reading_values"]), len(payload["reading_timestamps"]))): + readings.append({ + "value": float(payload["reading_values"][i]), + "timestamp": int(payload["reading_timestamps"][i]) + }) + + except: + print(f"ERRO! Leitura mal formatada {payload}") + return + + session = SessionLocal() + session.begin() + for reading in readings: + print(reading) + session.add(Reading(sensor_device_id=stationId, measure_id=measure, value=reading['value'])) + session.commit() + session.close() try: user_name = os.environ["MQTT_CLIENT_USER"] -- cgit v1.2.3