Files
SensorHub/ingest/management/commands/send_sensor_data.py
T
2026-04-05 00:57:50 +03:30

73 lines
2.3 KiB
Python

import json
import time
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from django.core.management.base import BaseCommand
from ingest.constants import API_KEY, API_TARGET_URL, REQUEST_INTERVAL_SECONDS, STATIC_SENSOR_PAYLOAD
class Command(BaseCommand):
help = "Send the static soil sensor payload to the upstream API every 10 seconds."
def add_arguments(self, parser):
parser.add_argument(
"--once",
action="store_true",
help="Send the request once and exit.",
)
def handle(self, *args, **options):
run_once = options["once"]
self.stdout.write(
self.style.SUCCESS(
f"Starting sensor sender -> {API_TARGET_URL} (interval: {REQUEST_INTERVAL_SECONDS}s)"
)
)
while True:
self.send_payload()
if run_once:
break
time.sleep(REQUEST_INTERVAL_SECONDS)
def send_payload(self):
body = json.dumps(STATIC_SENSOR_PAYLOAD).encode("utf-8")
request = Request(
API_TARGET_URL,
data=body,
headers={
"Content-Type": "application/json",
"api_key": API_KEY,
},
method="POST",
)
try:
with urlopen(request, timeout=15) as response:
response_body = response.read().decode("utf-8", errors="replace")
self.stdout.write(
self.style.SUCCESS(
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Sent payload successfully - status {response.status}"
)
)
if response_body:
self.stdout.write(response_body)
except HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
self.stderr.write(
self.style.ERROR(
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Upstream error - status {exc.code}"
)
)
if error_body:
self.stderr.write(error_body)
except URLError as exc:
self.stderr.write(
self.style.ERROR(
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection error - {exc.reason}"
)
)