Files
Ai/location_data/soil_adapters.py
T
2026-04-29 01:27:29 +03:30

287 lines
8.4 KiB
Python

from __future__ import annotations
import hashlib
import math
import random
import time
from abc import ABC, abstractmethod
try:
import requests
except ImportError: # pragma: no cover - handled when live adapter is used
requests = None
SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query"
PROPERTIES = [
"bdod",
"cec",
"cfvo",
"clay",
"nitrogen",
"ocd",
"ocs",
"phh2o",
"sand",
"silt",
"soc",
"wv0010",
"wv0033",
"wv1500",
]
VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"]
DEPTHS = ["0-5cm", "5-15cm", "15-30cm"]
DEPTH_INDEX = {depth: index for index, depth in enumerate(DEPTHS)}
def _clamp(value: float, lower: float, upper: float) -> float:
return max(lower, min(upper, value))
def _round_field(name: str, value: float) -> float:
if name in {"nitrogen", "soc", "ocs", "wv0010", "wv0033", "wv1500"}:
return round(value, 3)
return round(value, 2)
class BaseSoilDataAdapter(ABC):
source_name = "base"
@abstractmethod
def fetch_depth_fields(self, lon: float, lat: float, depth: str) -> dict:
"""Return normalized field values for a single soil depth."""
class SoilGridsAdapter(BaseSoilDataAdapter):
source_name = "soilgrids"
def __init__(self, base_url: str = SOILGRIDS_BASE, timeout: float = 60):
self.base_url = base_url
self.timeout = timeout
def fetch_depth_fields(self, lon: float, lat: float, depth: str) -> dict:
if requests is None:
raise RuntimeError("requests package is required for SoilGridsAdapter")
params = {
"lon": lon,
"lat": lat,
"depth": depth,
}
for prop in PROPERTIES:
params.setdefault("property", []).append(prop)
for value in VALUES:
params.setdefault("value", []).append(value)
response = requests.get(
self.base_url,
params=params,
headers={"accept": "application/json"},
timeout=self.timeout,
)
response.raise_for_status()
return self._parse_response_to_fields(response.json())
def _parse_response_to_fields(self, data: dict) -> dict:
fields = {prop: None for prop in PROPERTIES}
layers = data.get("properties", {}).get("layers", [])
for layer in layers:
name = layer.get("name")
if name not in fields:
continue
depths_list = layer.get("depths", [])
if not depths_list:
continue
values = depths_list[0].get("values", {})
mean_value = values.get("mean")
if mean_value is not None:
fields[name] = float(mean_value)
return fields
class MockSoilDataAdapter(BaseSoilDataAdapter):
source_name = "mock"
def __init__(
self,
delay_seconds: float = 0.8,
seed_namespace: str = "croplogic-soil",
):
self.delay_seconds = max(0.0, delay_seconds)
self.seed_namespace = seed_namespace
def fetch_depth_fields(self, lon: float, lat: float, depth: str) -> dict:
if depth not in DEPTH_INDEX:
raise ValueError(f"Unsupported soil depth: {depth}")
if self.delay_seconds:
time.sleep(self.delay_seconds)
depth_index = DEPTH_INDEX[depth]
texture_score = self._layered_noise(lon, lat, "texture")
organic_score = self._layered_noise(lon, lat, "organic")
moisture_score = self._layered_noise(lon, lat, "moisture")
mineral_score = self._layered_noise(lon, lat, "mineral")
stone_score = self._layered_noise(lon, lat, "stone")
ph_score = self._layered_noise(lon, lat, "ph")
sand, clay, silt = self._build_texture(
texture_score=texture_score,
organic_score=organic_score,
depth_index=depth_index,
)
soc = _clamp(
0.7
+ (organic_score * 1.9)
+ (clay * 0.012)
- (depth_index * 0.28)
+ ((1 - moisture_score) * 0.08),
0.45,
4.2,
)
nitrogen = _clamp(
0.04
+ (soc * 0.085)
+ ((1 - (sand / 100.0)) * 0.025)
+ ((2 - depth_index) * 0.008),
0.03,
0.42,
)
ocd = _clamp(
10.0 + (soc * 8.5) + (organic_score * 4.0) - (depth_index * 2.6),
7.0,
46.0,
)
ocs = _clamp(
1.0 + (soc * 1.55) - (depth_index * 0.28) + (organic_score * 0.12),
0.5,
8.5,
)
cec = _clamp(
7.0
+ (clay * 0.33)
+ (soc * 1.7)
+ ((1 - (sand / 100.0)) * 2.6)
+ (mineral_score * 1.4),
5.0,
38.0,
)
cfvo = _clamp(1.0 + (stone_score * 12.0) + (depth_index * 2.4), 0.0, 35.0)
bdod = _clamp(
1.06
+ (sand * 0.0038)
+ (depth_index * 0.06)
- (soc * 0.035)
+ (stone_score * 0.03),
0.95,
1.62,
)
phh2o = _clamp(
6.2
+ ((ph_score - 0.5) * 1.1)
+ (depth_index * 0.08)
- (organic_score * 0.12),
5.6,
8.1,
)
wv1500 = _clamp(
0.05
+ (clay * 0.0016)
+ (soc * 0.012)
- (sand * 0.0003)
+ (depth_index * 0.004),
0.05,
0.22,
)
wv0033 = _clamp(
wv1500 + 0.07 + (clay * 0.0015) + (soc * 0.01) - (sand * 0.0002),
wv1500 + 0.04,
0.38,
)
wv0010 = _clamp(
wv0033 + 0.03 + (soc * 0.006) + (moisture_score * 0.01),
wv0033 + 0.015,
0.48,
)
fields = {
"bdod": bdod,
"cec": cec,
"cfvo": cfvo,
"clay": clay,
"nitrogen": nitrogen,
"ocd": ocd,
"ocs": ocs,
"phh2o": phh2o,
"sand": sand,
"silt": silt,
"soc": soc,
"wv0010": wv0010,
"wv0033": wv0033,
"wv1500": wv1500,
}
return {name: _round_field(name, value) for name, value in fields.items()}
def _build_texture(
self,
texture_score: float,
organic_score: float,
depth_index: int,
) -> tuple[float, float, float]:
sand = _clamp(
30.0
+ (texture_score * 28.0)
+ ((organic_score - 0.5) * 3.5)
- (depth_index * 2.5),
18.0,
72.0,
)
clay = _clamp(
13.0
+ ((1 - texture_score) * 18.0)
+ (depth_index * 5.5)
+ ((organic_score - 0.5) * 2.0),
8.0,
42.0,
)
minimum_silt = 12.0
total = sand + clay
if total > 100.0 - minimum_silt:
excess = total - (100.0 - minimum_silt)
sand -= excess * 0.65
clay -= excess * 0.35
silt = 100.0 - sand - clay
return sand, clay, silt
def _layered_noise(self, lon: float, lat: float, key: str) -> float:
regional = self._smooth_noise(lon, lat, f"{key}:regional", scale=1.7)
local = self._smooth_noise(lon, lat, f"{key}:local", scale=0.32)
micro = self._smooth_noise(lon, lat, f"{key}:micro", scale=0.08)
return _clamp((regional * 0.55) + (local * 0.3) + (micro * 0.15), 0.0, 1.0)
def _smooth_noise(self, lon: float, lat: float, key: str, scale: float) -> float:
grid_x = lon / scale
grid_y = lat / scale
x0 = math.floor(grid_x)
y0 = math.floor(grid_y)
tx = grid_x - x0
ty = grid_y - y0
v00 = self._cell_noise(key, x0, y0)
v10 = self._cell_noise(key, x0 + 1, y0)
v01 = self._cell_noise(key, x0, y0 + 1)
v11 = self._cell_noise(key, x0 + 1, y0 + 1)
tx = tx * tx * (3.0 - (2.0 * tx))
ty = ty * ty * (3.0 - (2.0 * ty))
top = (v00 * (1 - tx)) + (v10 * tx)
bottom = (v01 * (1 - tx)) + (v11 * tx)
return (top * (1 - ty)) + (bottom * ty)
def _cell_noise(self, key: str, grid_x: int, grid_y: int) -> float:
seed_input = f"{self.seed_namespace}:{key}:{grid_x}:{grid_y}"
digest = hashlib.sha256(seed_input.encode("ascii")).digest()
seed = int.from_bytes(digest[:8], "big", signed=False)
return random.Random(seed).random()