from __future__ import annotations import math import os from dataclasses import dataclass from datetime import date from decimal import Decimal from typing import Any import requests from config.proxy import apply_requests_proxy, build_proxy_url_from_proxychains_env from .models import AnalysisGridCell DEFAULT_OPENEO_BACKEND_URL = "https://openeofed.dataspace.copernicus.eu" DEFAULT_OPENEO_PROVIDER = "openeo" DEFAULT_OPENEO_PROXY_URL = "socks5h://host.docker.internal:10808" SENTINEL2_COLLECTION = "SENTINEL2_L2A" SENTINEL3_LST_COLLECTION = "SENTINEL3_SLSTR_L2_LST" SENTINEL1_COLLECTION = "SENTINEL1_GRD" COPERNICUS_DEM_COLLECTION = "COPERNICUS_30" VALID_SCL_CLASSES = (4, 5, 6) METRIC_NAMES = ( "ndvi", "ndwi", "lst_c", "soil_vv", "soil_vv_db", "dem_m", "slope_deg", ) class OpenEOServiceError(Exception): """Base exception for openEO service failures.""" class OpenEOAuthenticationError(OpenEOServiceError): """Raised when authentication with the openEO backend fails.""" class OpenEOExecutionError(OpenEOServiceError): """Raised when a metric process graph can not be executed successfully.""" class TimeoutOverrideSession(requests.Session): """Requests session that enforces a minimum timeout for all outbound calls.""" def __init__(self, timeout_seconds: float): super().__init__() self.timeout_seconds = timeout_seconds def request(self, method, url, **kwargs): timeout = kwargs.get("timeout") if timeout is None or timeout < self.timeout_seconds: kwargs["timeout"] = self.timeout_seconds return super().request(method, url, **kwargs) @dataclass(frozen=True) class OpenEOConnectionSettings: backend_url: str = DEFAULT_OPENEO_BACKEND_URL auth_method: str = "client_credentials" timeout_seconds: float = 60.0 client_id: str = "" client_secret: str = "" provider_id: str = "" username: str = "" password: str = "" allow_interactive_oidc: bool = False proxy_url: str = "" @classmethod def from_env(cls) -> "OpenEOConnectionSettings": return cls( backend_url=os.environ.get("OPENEO_BACKEND_URL", DEFAULT_OPENEO_BACKEND_URL).strip(), auth_method=os.environ.get("OPENEO_AUTH_METHOD", "client_credentials").strip().lower(), timeout_seconds=float(os.environ.get("OPENEO_TIMEOUT_SECONDS", "60").strip() or "60"), client_id=os.environ.get("OPENEO_AUTH_CLIENT_ID", "").strip(), client_secret=os.environ.get("OPENEO_AUTH_CLIENT_SECRET", "").strip(), provider_id=os.environ.get("OPENEO_AUTH_PROVIDER_ID", "").strip(), username=os.environ.get("OPENEO_USERNAME", "").strip(), password=os.environ.get("OPENEO_PASSWORD", "").strip(), allow_interactive_oidc=os.environ.get("OPENEO_ALLOW_INTERACTIVE_OIDC", "0").strip().lower() in {"1", "true", "yes", "on"}, proxy_url=_resolve_openeo_proxy_url_from_env(), ) def _resolve_openeo_proxy_url_from_env() -> str: configured_proxy_url = os.environ.get("OPENEO_PROXY_URL", DEFAULT_OPENEO_PROXY_URL).strip() if configured_proxy_url and configured_proxy_url != DEFAULT_OPENEO_PROXY_URL: return configured_proxy_url # Keep openEO traffic proxied even when process-wide proxychains is disabled. derived_proxy_url = build_proxy_url_from_proxychains_env(require_enabled=False) if derived_proxy_url: return derived_proxy_url return configured_proxy_url def is_openeo_auth_configured(settings: OpenEOConnectionSettings | None = None) -> bool: settings = settings or OpenEOConnectionSettings.from_env() if settings.auth_method == "client_credentials": return bool(settings.client_id and settings.client_secret) if settings.auth_method == "password": return bool(settings.username and settings.password) if settings.auth_method == "oidc": return settings.allow_interactive_oidc return False def build_openeo_requests_session(settings: OpenEOConnectionSettings) -> requests.Session: session = TimeoutOverrideSession(settings.timeout_seconds) return apply_requests_proxy(session, settings.proxy_url) def connect_openeo(settings: OpenEOConnectionSettings | None = None): """ Build an authenticated openEO connection using environment-driven configuration. Preferred authentication mode in production is OIDC client credentials. """ settings = settings or OpenEOConnectionSettings.from_env() try: import openeo from openeo.rest.auth.oidc import ( OidcClientCredentialsAuthenticator, OidcClientInfo, OidcProviderInfo, OidcResourceOwnerPasswordAuthenticator, ) except ImportError as exc: # pragma: no cover - runtime dependency guard raise OpenEOServiceError("The `openeo` Python client is required for remote sensing jobs.") from exc session = build_openeo_requests_session(settings) connection = openeo.connect( settings.backend_url, session=session, default_timeout=settings.timeout_seconds, ) def resolve_oidc_context( provider_id: str | None, client_id: str | None, client_secret: str | None, ) -> tuple[str, OidcClientInfo]: resolved_provider_id, _ = connection._get_oidc_provider(provider_id, parse_info=False) providers_payload = connection.get("/credentials/oidc", expected_status=200).json() provider_map = {provider["id"]: provider for provider in providers_payload["providers"]} provider_data = provider_map.get(resolved_provider_id) if not provider_data: raise OpenEOAuthenticationError( f"OIDC provider metadata for {resolved_provider_id!r} was not returned by the backend." ) provider_info = OidcProviderInfo( provider_id=provider_data["id"], title=provider_data["title"], issuer=provider_data["issuer"], scopes=provider_data.get("scopes"), default_clients=provider_data.get("default_clients"), requests_session=session, ) if not client_id: raise OpenEOAuthenticationError( "OPENEO_AUTH_CLIENT_ID must be configured for this openEO auth flow." ) return resolved_provider_id, OidcClientInfo( client_id=client_id, client_secret=client_secret, provider=provider_info, ) try: if settings.auth_method == "client_credentials": if not settings.client_id or not settings.client_secret: raise OpenEOAuthenticationError( "OPENEO_AUTH_CLIENT_ID and OPENEO_AUTH_CLIENT_SECRET must be configured." ) provider_id, client_info = resolve_oidc_context( settings.provider_id or None, settings.client_id, settings.client_secret, ) authenticator = OidcClientCredentialsAuthenticator( client_info=client_info, requests_session=session, ) return connection._authenticate_oidc( authenticator, provider_id=provider_id, store_refresh_token=False, oidc_auth_renewer=authenticator, ) if settings.auth_method == "password": if not settings.username or not settings.password: raise OpenEOAuthenticationError( "OPENEO_USERNAME and OPENEO_PASSWORD must be configured for password auth." ) provider_id, client_info = resolve_oidc_context( settings.provider_id or None, settings.client_id or None, settings.client_secret or None, ) authenticator = OidcResourceOwnerPasswordAuthenticator( client_info=client_info, username=settings.username, password=settings.password, requests_session=session, ) return connection._authenticate_oidc( authenticator, provider_id=provider_id, store_refresh_token=False, ) if settings.auth_method == "oidc": if not settings.allow_interactive_oidc: raise OpenEOAuthenticationError( "Interactive OIDC auth is disabled. Use client credentials in Celery workers." ) auth_kwargs = {} if settings.provider_id: auth_kwargs["provider_id"] = settings.provider_id return connection.authenticate_oidc(**auth_kwargs) raise OpenEOAuthenticationError(f"Unsupported OPENEO_AUTH_METHOD: {settings.auth_method}") except Exception as exc: if isinstance(exc, OpenEOServiceError): raise raise OpenEOAuthenticationError(f"Failed to authenticate with openEO backend: {exc}") from exc def build_feature_collection(cells: list[AnalysisGridCell]) -> dict[str, Any]: features = [] for cell in cells: features.append( { "type": "Feature", "id": cell.cell_code, "properties": { "cell_code": cell.cell_code, "block_code": cell.block_code, "soil_location_id": cell.soil_location_id, }, "geometry": cell.geometry, } ) return {"type": "FeatureCollection", "features": features} def build_spatial_extent(cells: list[AnalysisGridCell]) -> dict[str, float]: if not cells: raise ValueError("At least one analysis grid cell is required.") west = None east = None south = None north = None for cell in cells: coordinates = ((cell.geometry or {}).get("coordinates") or [[]])[0] for lon, lat in coordinates: west = lon if west is None else min(west, lon) east = lon if east is None else max(east, lon) south = lat if south is None else min(south, lat) north = lat if north is None else max(north, lat) return { "west": float(west), "south": float(south), "east": float(east), "north": float(north), } def build_empty_metric_payload() -> dict[str, Any]: return {metric_name: None for metric_name in METRIC_NAMES} def initialize_metric_result_map(cells: list[AnalysisGridCell]) -> dict[str, dict[str, Any]]: return {cell.cell_code: build_empty_metric_payload() for cell in cells} def compute_remote_sensing_metrics( cells: list[AnalysisGridCell], *, temporal_start: date | str, temporal_end: date | str, connection=None, ) -> dict[str, Any]: """ Compute all requested remote sensing metrics in batch mode per metric. Returns a normalized structure keyed by `cell_code`, plus execution metadata that can be stored by Celery tasks and Django models. """ if not cells: return { "results": {}, "metadata": { "backend": DEFAULT_OPENEO_PROVIDER, "collections_used": [], "slope_supported": False, "job_refs": {}, "failed_metrics": [], }, } connection = connection or connect_openeo() feature_collection = build_feature_collection(cells) spatial_extent = build_spatial_extent(cells) results = initialize_metric_result_map(cells) metadata = { "backend": DEFAULT_OPENEO_PROVIDER, "backend_url": DEFAULT_OPENEO_BACKEND_URL, "collections_used": [ SENTINEL2_COLLECTION, SENTINEL3_LST_COLLECTION, SENTINEL1_COLLECTION, COPERNICUS_DEM_COLLECTION, ], "slope_supported": True, "job_refs": {}, "failed_metrics": [], } metric_runners = [ ("ndvi", compute_ndvi), ("ndwi", compute_ndwi), ("lst_c", compute_lst_c), ("soil_vv", compute_soil_vv), ("dem_m", compute_dem_m), ("slope_deg", compute_slope_deg), ] for metric_name, runner in metric_runners: try: metric_payload = runner( connection=connection, feature_collection=feature_collection, spatial_extent=spatial_extent, temporal_start=temporal_start, temporal_end=temporal_end, ) merge_metric_results(results, metric_payload["results"]) metadata["job_refs"][metric_name] = metric_payload.get("job_ref") if metric_name == "slope_deg" and not metric_payload.get("supported", True): metadata["slope_supported"] = False except Exception as exc: if metric_name == "slope_deg": metadata["slope_supported"] = False metadata["failed_metrics"].append( {"metric": metric_name, "error": str(exc), "non_fatal": True} ) continue raise OpenEOExecutionError(f"Failed to compute metric `{metric_name}`: {exc}") from exc for cell_code, payload in results.items(): soil_vv = payload.get("soil_vv") payload["soil_vv_db"] = linear_to_db(soil_vv) return {"results": results, "metadata": metadata} def compute_ndvi(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: cube = connection.load_collection( SENTINEL2_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], bands=["B03", "B04", "B08", "SCL"], ) scl = cube.band("SCL") invalid_mask = (scl != VALID_SCL_CLASSES[0]) & (scl != VALID_SCL_CLASSES[1]) & (scl != VALID_SCL_CLASSES[2]) red = cube.band("B04") * 0.0001 nir = cube.band("B08") * 0.0001 ndvi = ((nir - red) / (nir + red)).mask(invalid_mask.resample_cube_spatial(red)) aggregated = ndvi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() return {"results": parse_aggregate_spatial_response(aggregated, "ndvi")} def compute_ndwi(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: cube = connection.load_collection( SENTINEL2_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], bands=["B03", "B08", "SCL"], ) scl = cube.band("SCL") invalid_mask = (scl != VALID_SCL_CLASSES[0]) & (scl != VALID_SCL_CLASSES[1]) & (scl != VALID_SCL_CLASSES[2]) green = cube.band("B03") * 0.0001 nir = cube.band("B08") * 0.0001 ndwi = ((green - nir) / (green + nir)).mask(invalid_mask.resample_cube_spatial(green)) aggregated = ndwi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() return {"results": parse_aggregate_spatial_response(aggregated, "ndwi")} def compute_lst_c(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: cube = connection.load_collection( SENTINEL3_LST_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], ) band_name = infer_band_name(cube, preferred=("LST", "LST_in", "LST", "band_0")) lst_k = cube.band(band_name) if band_name else cube lst_c = lst_k - 273.15 aggregated = lst_c.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() return {"results": parse_aggregate_spatial_response(aggregated, "lst_c")} def compute_soil_vv(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: cube = connection.load_collection( SENTINEL1_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], bands=["VV"], ) vv = cube.band("VV") aggregated = vv.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() return {"results": parse_aggregate_spatial_response(aggregated, "soil_vv")} def compute_dem_m(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: cube = connection.load_collection( COPERNICUS_DEM_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], ) band_name = infer_band_name(cube, preferred=("DEM", "elevation", "band_0")) dem = cube.band(band_name) if band_name else cube aggregated = dem.aggregate_spatial(geometries=feature_collection, reducer="mean").execute() return {"results": parse_aggregate_spatial_response(aggregated, "dem_m")} def compute_slope_deg(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: cube = connection.load_collection( COPERNICUS_DEM_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], ) band_name = infer_band_name(cube, preferred=("DEM", "elevation", "band_0")) dem = cube.band(band_name) if band_name else cube try: slope_rad = dem.slope() slope_deg = slope_rad * (180.0 / math.pi) aggregated = slope_deg.aggregate_spatial(geometries=feature_collection, reducer="mean").execute() return { "results": parse_aggregate_spatial_response(aggregated, "slope_deg"), "supported": True, } except Exception: return { "results": {feature["id"]: {"slope_deg": None} for feature in feature_collection.get("features", [])}, "supported": False, } def parse_aggregate_spatial_response(payload: Any, metric_name: str) -> dict[str, dict[str, Any]]: """ Parse different JSON shapes returned by openEO aggregate_spatial executions. """ if payload is None: return {} if isinstance(payload, dict) and payload.get("type") == "FeatureCollection": return _parse_feature_collection_results(payload, metric_name) if isinstance(payload, dict) and "features" in payload: return _parse_feature_collection_results(payload, metric_name) if isinstance(payload, dict): return _parse_mapping_results(payload, metric_name) if isinstance(payload, list): return _parse_list_results(payload, metric_name) raise OpenEOExecutionError(f"Unsupported openEO aggregate_spatial response type: {type(payload)!r}") def _parse_feature_collection_results(payload: dict[str, Any], metric_name: str) -> dict[str, dict[str, Any]]: results: dict[str, dict[str, Any]] = {} for feature in payload.get("features", []): feature_id = str( feature.get("id") or (feature.get("properties") or {}).get("cell_code") or (feature.get("properties") or {}).get("id") ) if not feature_id: continue properties = feature.get("properties") or {} value = _extract_aggregate_value(properties) results[feature_id] = {metric_name: _coerce_float(value)} return results def _parse_mapping_results(payload: dict[str, Any], metric_name: str) -> dict[str, dict[str, Any]]: if "data" in payload and isinstance(payload["data"], (dict, list)): return parse_aggregate_spatial_response(payload["data"], metric_name) results: dict[str, dict[str, Any]] = {} for feature_id, value in payload.items(): if feature_id in {"type", "links", "meta"}: continue results[str(feature_id)] = {metric_name: _coerce_float(_extract_aggregate_value(value))} return results def _parse_list_results(payload: list[Any], metric_name: str) -> dict[str, dict[str, Any]]: results: dict[str, dict[str, Any]] = {} for index, item in enumerate(payload): if isinstance(item, dict): feature_id = str(item.get("id") or item.get("cell_code") or item.get("feature_id") or index) value = _extract_aggregate_value(item) else: feature_id = str(index) value = item results[feature_id] = {metric_name: _coerce_float(value)} return results def _extract_aggregate_value(value: Any) -> Any: if isinstance(value, dict): for key in ("mean", "value", "result", "average"): if key in value: return _extract_aggregate_value(value[key]) if len(value) == 1: return _extract_aggregate_value(next(iter(value.values()))) return None if isinstance(value, list): if not value: return None return _extract_aggregate_value(value[0]) return value def merge_metric_results(target: dict[str, dict[str, Any]], updates: dict[str, dict[str, Any]]) -> None: for cell_code, values in updates.items(): target.setdefault(cell_code, build_empty_metric_payload()) target[cell_code].update(values) def linear_to_db(value: Any) -> float | None: numeric = _coerce_float(value) if numeric is None or numeric <= 0: return None return round(10.0 * math.log10(numeric), 6) def infer_band_name(cube, preferred: tuple[str, ...]) -> str | None: """ Best-effort band name selection for collections with backend-specific naming. """ metadata = getattr(cube, "metadata", None) if metadata is None: return None band_dimension = getattr(metadata, "band_dimension", None) bands = getattr(band_dimension, "bands", None) if not bands: return None available = [] for band in bands: name = getattr(band, "name", None) or str(band) available.append(name) for candidate in preferred: if candidate in available: return candidate return available[0] if available else None def _coerce_float(value: Any) -> float | None: if value is None: return None if isinstance(value, Decimal): return float(value) try: return float(value) except (TypeError, ValueError): return None def _normalize_date(value: date | str) -> str: if isinstance(value, date): return value.isoformat() return str(value)