from __future__ import annotations import hashlib import math import random import time from abc import ABC, abstractmethod try: import requests except ImportError: # pragma: no cover - handled when live adapter is used requests = None SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query" PROPERTIES = [ "bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs", "phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500", ] VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"] DEPTHS = ["0-5cm", "5-15cm", "15-30cm"] DEPTH_INDEX = {depth: index for index, depth in enumerate(DEPTHS)} def _clamp(value: float, lower: float, upper: float) -> float: return max(lower, min(upper, value)) def _round_field(name: str, value: float) -> float: if name in {"nitrogen", "soc", "ocs", "wv0010", "wv0033", "wv1500"}: return round(value, 3) return round(value, 2) class BaseSoilDataAdapter(ABC): source_name = "base" @abstractmethod def fetch_depth_fields(self, lon: float, lat: float, depth: str) -> dict: """Return normalized field values for a single soil depth.""" class SoilGridsAdapter(BaseSoilDataAdapter): source_name = "soilgrids" def __init__(self, base_url: str = SOILGRIDS_BASE, timeout: float = 60): self.base_url = base_url self.timeout = timeout def fetch_depth_fields(self, lon: float, lat: float, depth: str) -> dict: if requests is None: raise RuntimeError("requests package is required for SoilGridsAdapter") params = { "lon": lon, "lat": lat, "depth": depth, } for prop in PROPERTIES: params.setdefault("property", []).append(prop) for value in VALUES: params.setdefault("value", []).append(value) response = requests.get( self.base_url, params=params, headers={"accept": "application/json"}, timeout=self.timeout, ) response.raise_for_status() return self._parse_response_to_fields(response.json()) def _parse_response_to_fields(self, data: dict) -> dict: fields = {prop: None for prop in PROPERTIES} layers = data.get("properties", {}).get("layers", []) for layer in layers: name = layer.get("name") if name not in fields: continue depths_list = layer.get("depths", []) if not depths_list: continue values = depths_list[0].get("values", {}) mean_value = values.get("mean") if mean_value is not None: fields[name] = float(mean_value) return fields class MockSoilDataAdapter(BaseSoilDataAdapter): source_name = "mock" def __init__( self, delay_seconds: float = 0.8, seed_namespace: str = "croplogic-soil", ): self.delay_seconds = max(0.0, delay_seconds) self.seed_namespace = seed_namespace def fetch_depth_fields(self, lon: float, lat: float, depth: str) -> dict: if depth not in DEPTH_INDEX: raise ValueError(f"Unsupported soil depth: {depth}") if self.delay_seconds: time.sleep(self.delay_seconds) depth_index = DEPTH_INDEX[depth] texture_score = self._layered_noise(lon, lat, "texture") organic_score = self._layered_noise(lon, lat, "organic") moisture_score = self._layered_noise(lon, lat, "moisture") mineral_score = self._layered_noise(lon, lat, "mineral") stone_score = self._layered_noise(lon, lat, "stone") ph_score = self._layered_noise(lon, lat, "ph") sand, clay, silt = self._build_texture( texture_score=texture_score, organic_score=organic_score, depth_index=depth_index, ) soc = _clamp( 0.7 + (organic_score * 1.9) + (clay * 0.012) - (depth_index * 0.28) + ((1 - moisture_score) * 0.08), 0.45, 4.2, ) nitrogen = _clamp( 0.04 + (soc * 0.085) + ((1 - (sand / 100.0)) * 0.025) + ((2 - depth_index) * 0.008), 0.03, 0.42, ) ocd = _clamp( 10.0 + (soc * 8.5) + (organic_score * 4.0) - (depth_index * 2.6), 7.0, 46.0, ) ocs = _clamp( 1.0 + (soc * 1.55) - (depth_index * 0.28) + (organic_score * 0.12), 0.5, 8.5, ) cec = _clamp( 7.0 + (clay * 0.33) + (soc * 1.7) + ((1 - (sand / 100.0)) * 2.6) + (mineral_score * 1.4), 5.0, 38.0, ) cfvo = _clamp(1.0 + (stone_score * 12.0) + (depth_index * 2.4), 0.0, 35.0) bdod = _clamp( 1.06 + (sand * 0.0038) + (depth_index * 0.06) - (soc * 0.035) + (stone_score * 0.03), 0.95, 1.62, ) phh2o = _clamp( 6.2 + ((ph_score - 0.5) * 1.1) + (depth_index * 0.08) - (organic_score * 0.12), 5.6, 8.1, ) wv1500 = _clamp( 0.05 + (clay * 0.0016) + (soc * 0.012) - (sand * 0.0003) + (depth_index * 0.004), 0.05, 0.22, ) wv0033 = _clamp( wv1500 + 0.07 + (clay * 0.0015) + (soc * 0.01) - (sand * 0.0002), wv1500 + 0.04, 0.38, ) wv0010 = _clamp( wv0033 + 0.03 + (soc * 0.006) + (moisture_score * 0.01), wv0033 + 0.015, 0.48, ) fields = { "bdod": bdod, "cec": cec, "cfvo": cfvo, "clay": clay, "nitrogen": nitrogen, "ocd": ocd, "ocs": ocs, "phh2o": phh2o, "sand": sand, "silt": silt, "soc": soc, "wv0010": wv0010, "wv0033": wv0033, "wv1500": wv1500, } return {name: _round_field(name, value) for name, value in fields.items()} def _build_texture( self, texture_score: float, organic_score: float, depth_index: int, ) -> tuple[float, float, float]: sand = _clamp( 30.0 + (texture_score * 28.0) + ((organic_score - 0.5) * 3.5) - (depth_index * 2.5), 18.0, 72.0, ) clay = _clamp( 13.0 + ((1 - texture_score) * 18.0) + (depth_index * 5.5) + ((organic_score - 0.5) * 2.0), 8.0, 42.0, ) minimum_silt = 12.0 total = sand + clay if total > 100.0 - minimum_silt: excess = total - (100.0 - minimum_silt) sand -= excess * 0.65 clay -= excess * 0.35 silt = 100.0 - sand - clay return sand, clay, silt def _layered_noise(self, lon: float, lat: float, key: str) -> float: regional = self._smooth_noise(lon, lat, f"{key}:regional", scale=1.7) local = self._smooth_noise(lon, lat, f"{key}:local", scale=0.32) micro = self._smooth_noise(lon, lat, f"{key}:micro", scale=0.08) return _clamp((regional * 0.55) + (local * 0.3) + (micro * 0.15), 0.0, 1.0) def _smooth_noise(self, lon: float, lat: float, key: str, scale: float) -> float: grid_x = lon / scale grid_y = lat / scale x0 = math.floor(grid_x) y0 = math.floor(grid_y) tx = grid_x - x0 ty = grid_y - y0 v00 = self._cell_noise(key, x0, y0) v10 = self._cell_noise(key, x0 + 1, y0) v01 = self._cell_noise(key, x0, y0 + 1) v11 = self._cell_noise(key, x0 + 1, y0 + 1) tx = tx * tx * (3.0 - (2.0 * tx)) ty = ty * ty * (3.0 - (2.0 * ty)) top = (v00 * (1 - tx)) + (v10 * tx) bottom = (v01 * (1 - tx)) + (v11 * tx) return (top * (1 - ty)) + (bottom * ty) def _cell_noise(self, key: str, grid_x: int, grid_y: int) -> float: seed_input = f"{self.seed_namespace}:{key}:{grid_x}:{grid_y}" digest = hashlib.sha256(seed_input.encode("ascii")).digest() seed = int.from_bytes(digest[:8], "big", signed=False) return random.Random(seed).random()