73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
import json
|
|
import time
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from django.core.management.base import BaseCommand
|
|
|
|
from ingest.constants import API_KEY, API_TARGET_URL, REQUEST_INTERVAL_SECONDS, STATIC_SENSOR_PAYLOAD
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Send the static soil sensor payload to the upstream API every 10 seconds."
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--once",
|
|
action="store_true",
|
|
help="Send the request once and exit.",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
run_once = options["once"]
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f"Starting sensor sender -> {API_TARGET_URL} (interval: {REQUEST_INTERVAL_SECONDS}s)"
|
|
)
|
|
)
|
|
|
|
while True:
|
|
self.send_payload()
|
|
if run_once:
|
|
break
|
|
time.sleep(REQUEST_INTERVAL_SECONDS)
|
|
|
|
def send_payload(self):
|
|
body = json.dumps(STATIC_SENSOR_PAYLOAD).encode("utf-8")
|
|
request = Request(
|
|
API_TARGET_URL,
|
|
data=body,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"api_key": API_KEY,
|
|
},
|
|
method="POST",
|
|
)
|
|
|
|
try:
|
|
with urlopen(request, timeout=15) as response:
|
|
response_body = response.read().decode("utf-8", errors="replace")
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Sent payload successfully - status {response.status}"
|
|
)
|
|
)
|
|
if response_body:
|
|
self.stdout.write(response_body)
|
|
except HTTPError as exc:
|
|
error_body = exc.read().decode("utf-8", errors="replace")
|
|
self.stderr.write(
|
|
self.style.ERROR(
|
|
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Upstream error - status {exc.code}"
|
|
)
|
|
)
|
|
if error_body:
|
|
self.stderr.write(error_body)
|
|
except URLError as exc:
|
|
self.stderr.write(
|
|
self.style.ERROR(
|
|
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection error - {exc.reason}"
|
|
)
|
|
)
|