Files
Ai/location_data/openeo_service.py
2026-05-11 04:38:44 +03:30

1266 lines
44 KiB
Python

from __future__ import annotations
import json
import logging
import math
import os
import time
from dataclasses import dataclass
from datetime import date
from decimal import Decimal
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from config.proxy import apply_requests_proxy, build_proxy_url_from_proxychains_env
from .models import AnalysisGridCell
logger = logging.getLogger(__name__)
DEFAULT_OPENEO_BACKEND_URL = "https://openeofed.dataspace.copernicus.eu"
DEFAULT_OPENEO_PROVIDER = "openeo"
DEFAULT_OPENEO_PROXY_URL = "socks5h://host.docker.internal:10808"
DEFAULT_OPENEO_TIMEOUT_SECONDS = 600.0
DEFAULT_OPENEO_HTTP_RETRY_TOTAL = 5
DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR = 2.0
DEFAULT_OPENEO_PAYLOAD_ARCHIVE_DIR = "logs/openeo_payloads"
SENTINEL2_COLLECTION = "SENTINEL2_L2A"
SENTINEL1_COLLECTION = "SENTINEL1_GRD"
VALID_SCL_CLASSES = (4, 5, 6)
METRIC_NAMES = (
"ndvi",
"ndwi",
"soil_vv",
"soil_vv_db",
)
CLUSTER_METRIC_NAMES = (
"ndvi",
"ndwi",
"soil_vv_db",
)
class OpenEOServiceError(Exception):
"""Base exception for openEO service failures."""
class OpenEOAuthenticationError(OpenEOServiceError):
"""Raised when authentication with the openEO backend fails."""
class OpenEOExecutionError(OpenEOServiceError):
"""Raised when a metric process graph can not be executed successfully."""
class TimeoutOverrideSession(requests.Session):
"""Requests session that enforces a minimum timeout for all outbound calls."""
def __init__(self, timeout_seconds: float):
super().__init__()
self.timeout_seconds = timeout_seconds
self.last_response_preview = ""
self.last_response_content_type = ""
self.last_response_url = ""
def request(self, method, url, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None or timeout < self.timeout_seconds:
kwargs["timeout"] = self.timeout_seconds
request_log = {
"method": str(method).upper(),
"url": url,
"timeout": kwargs.get("timeout"),
"params": kwargs.get("params"),
"json": kwargs.get("json"),
"data": kwargs.get("data"),
"headers": _sanitize_headers(kwargs.get("headers")),
"proxy_url": _sanitize_proxy_url(self.proxies.get("https") or self.proxies.get("http")),
}
logger.info("openEO request payload: %s", _serialize_for_log(request_log))
started_at = time.monotonic()
try:
response = super().request(method, url, **kwargs)
except Exception as exc:
logger.exception(
"openEO request failed after %.3fs: %s",
time.monotonic() - started_at,
_serialize_for_log(
{
"method": str(method).upper(),
"url": url,
"error": repr(exc),
}
),
)
raise
logger.info(
"openEO response received after %.3fs: %s",
time.monotonic() - started_at,
_serialize_for_log(
{
"method": str(method).upper(),
"url": url,
"status_code": response.status_code,
"headers": _sanitize_headers(dict(response.headers)),
}
),
)
self.last_response_url = str(response.url)
self.last_response_content_type = str(response.headers.get("Content-Type", ""))
self.last_response_preview = response.text[:1000] if response.text else ""
return response
@dataclass(frozen=True)
class OpenEOConnectionSettings:
backend_url: str = DEFAULT_OPENEO_BACKEND_URL
auth_method: str = "client_credentials"
timeout_seconds: float = DEFAULT_OPENEO_TIMEOUT_SECONDS
client_id: str = ""
client_secret: str = ""
provider_id: str = ""
username: str = ""
password: str = ""
allow_interactive_oidc: bool = False
proxy_url: str = ""
http_retry_total: int = DEFAULT_OPENEO_HTTP_RETRY_TOTAL
http_retry_backoff_factor: float = DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR
@classmethod
def from_env(cls) -> "OpenEOConnectionSettings":
return cls(
backend_url=os.environ.get("OPENEO_BACKEND_URL", DEFAULT_OPENEO_BACKEND_URL).strip(),
auth_method=os.environ.get("OPENEO_AUTH_METHOD", "client_credentials").strip().lower(),
timeout_seconds=float(
os.environ.get("OPENEO_TIMEOUT_SECONDS", str(int(DEFAULT_OPENEO_TIMEOUT_SECONDS))).strip()
or str(int(DEFAULT_OPENEO_TIMEOUT_SECONDS))
),
client_id=os.environ.get("OPENEO_AUTH_CLIENT_ID", "").strip(),
client_secret=os.environ.get("OPENEO_AUTH_CLIENT_SECRET", "").strip(),
provider_id=os.environ.get("OPENEO_AUTH_PROVIDER_ID", "").strip(),
username=os.environ.get("OPENEO_USERNAME", "").strip(),
password=os.environ.get("OPENEO_PASSWORD", "").strip(),
allow_interactive_oidc=os.environ.get("OPENEO_ALLOW_INTERACTIVE_OIDC", "0").strip().lower()
in {"1", "true", "yes", "on"},
proxy_url=_resolve_openeo_proxy_url_from_env(),
http_retry_total=int(
os.environ.get("OPENEO_HTTP_RETRY_TOTAL", str(DEFAULT_OPENEO_HTTP_RETRY_TOTAL)).strip()
or str(DEFAULT_OPENEO_HTTP_RETRY_TOTAL)
),
http_retry_backoff_factor=float(
os.environ.get(
"OPENEO_HTTP_RETRY_BACKOFF_FACTOR",
str(DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR),
).strip()
or str(DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR)
),
)
def _resolve_openeo_proxy_url_from_env() -> str:
configured_proxy_url = os.environ.get("OPENEO_PROXY_URL", DEFAULT_OPENEO_PROXY_URL).strip()
if configured_proxy_url and configured_proxy_url != DEFAULT_OPENEO_PROXY_URL:
return configured_proxy_url
# Keep openEO traffic proxied even when process-wide proxychains is disabled.
derived_proxy_url = build_proxy_url_from_proxychains_env(require_enabled=False)
if derived_proxy_url:
return derived_proxy_url
return configured_proxy_url
def _sanitize_headers(headers: dict[str, Any] | None) -> dict[str, Any] | None:
if not headers:
return headers
return {key: _mask_sensitive_value(key, value) for key, value in headers.items()}
def _sanitize_proxy_url(proxy_url: str | None) -> str | None:
if not proxy_url:
return proxy_url
return proxy_url
def _serialize_for_log(payload: Any) -> str:
return json.dumps(_mask_sensitive_payload(payload), ensure_ascii=True, default=str, sort_keys=True)
def _mask_sensitive_payload(value: Any, parent_key: str = "") -> Any:
if isinstance(value, dict):
return {str(key): _mask_sensitive_payload(item, str(key)) for key, item in value.items()}
if isinstance(value, list):
return [_mask_sensitive_payload(item, parent_key) for item in value]
if isinstance(value, tuple):
return [_mask_sensitive_payload(item, parent_key) for item in value]
return _mask_sensitive_value(parent_key, value)
def _mask_sensitive_value(key: str, value: Any) -> Any:
normalized_key = (key or "").lower()
if normalized_key in {
"authorization",
"access_token",
"refresh_token",
"id_token",
"client_secret",
"password",
}:
return "***redacted***"
return value
def is_openeo_auth_configured(settings: OpenEOConnectionSettings | None = None) -> bool:
settings = settings or OpenEOConnectionSettings.from_env()
if settings.auth_method == "client_credentials":
return bool(settings.client_id and settings.client_secret)
if settings.auth_method == "password":
return bool(settings.username and settings.password)
if settings.auth_method == "oidc":
return settings.allow_interactive_oidc
return False
def build_openeo_requests_session(settings: OpenEOConnectionSettings) -> requests.Session:
session = TimeoutOverrideSession(settings.timeout_seconds)
session.headers.setdefault("Accept", "application/json")
adapter = HTTPAdapter(max_retries=_build_openeo_http_retry(settings))
session.mount("http://", adapter)
session.mount("https://", adapter)
return apply_requests_proxy(session, settings.proxy_url)
def _build_openeo_http_retry(settings: OpenEOConnectionSettings) -> Retry:
return Retry(
total=settings.http_retry_total,
connect=settings.http_retry_total,
read=settings.http_retry_total,
status=settings.http_retry_total,
backoff_factor=settings.http_retry_backoff_factor,
allowed_methods=None,
status_forcelist=(429, 500, 502, 503, 504),
raise_on_status=False,
)
def connect_openeo(settings: OpenEOConnectionSettings | None = None):
"""
Build an authenticated openEO connection using environment-driven configuration.
Preferred authentication mode in production is OIDC client credentials.
"""
settings = settings or OpenEOConnectionSettings.from_env()
try:
import openeo
from openeo.rest.auth.oidc import (
OidcClientCredentialsAuthenticator,
OidcClientInfo,
OidcProviderInfo,
OidcResourceOwnerPasswordAuthenticator,
)
except ImportError as exc: # pragma: no cover - runtime dependency guard
raise OpenEOServiceError("The `openeo` Python client is required for remote sensing jobs.") from exc
session = build_openeo_requests_session(settings)
try:
connection = openeo.connect(
settings.backend_url,
session=session,
default_timeout=settings.timeout_seconds,
)
except requests.exceptions.JSONDecodeError as exc:
preview = (session.last_response_preview or "").strip()
content_type = session.last_response_content_type or "unknown"
response_url = session.last_response_url or settings.backend_url
raise OpenEOServiceError(
"openEO endpoint returned a non-JSON response while loading capabilities. "
f"url={response_url!r} content_type={content_type!r} preview={preview[:300]!r}. "
"This usually means the proxy returned an HTML page instead of the API response."
) from exc
def resolve_oidc_context(
provider_id: str | None,
client_id: str | None,
client_secret: str | None,
) -> tuple[str, OidcClientInfo]:
resolved_provider_id, _ = connection._get_oidc_provider(provider_id, parse_info=False)
providers_payload = connection.get("/credentials/oidc", expected_status=200).json()
provider_map = {provider["id"]: provider for provider in providers_payload["providers"]}
provider_data = provider_map.get(resolved_provider_id)
if not provider_data:
raise OpenEOAuthenticationError(
f"OIDC provider metadata for {resolved_provider_id!r} was not returned by the backend."
)
provider_info = OidcProviderInfo(
provider_id=provider_data["id"],
title=provider_data["title"],
issuer=provider_data["issuer"],
scopes=provider_data.get("scopes"),
default_clients=provider_data.get("default_clients"),
requests_session=session,
)
if not client_id:
raise OpenEOAuthenticationError(
"OPENEO_AUTH_CLIENT_ID must be configured for this openEO auth flow."
)
return resolved_provider_id, OidcClientInfo(
client_id=client_id,
client_secret=client_secret,
provider=provider_info,
)
try:
if settings.auth_method == "client_credentials":
if not settings.client_id or not settings.client_secret:
raise OpenEOAuthenticationError(
"OPENEO_AUTH_CLIENT_ID and OPENEO_AUTH_CLIENT_SECRET must be configured."
)
provider_id, client_info = resolve_oidc_context(
settings.provider_id or None,
settings.client_id,
settings.client_secret,
)
authenticator = OidcClientCredentialsAuthenticator(
client_info=client_info,
requests_session=session,
)
return connection._authenticate_oidc(
authenticator,
provider_id=provider_id,
store_refresh_token=False,
oidc_auth_renewer=authenticator,
)
if settings.auth_method == "password":
if not settings.username or not settings.password:
raise OpenEOAuthenticationError(
"OPENEO_USERNAME and OPENEO_PASSWORD must be configured for password auth."
)
provider_id, client_info = resolve_oidc_context(
settings.provider_id or None,
settings.client_id or None,
settings.client_secret or None,
)
authenticator = OidcResourceOwnerPasswordAuthenticator(
client_info=client_info,
username=settings.username,
password=settings.password,
requests_session=session,
)
return connection._authenticate_oidc(
authenticator,
provider_id=provider_id,
store_refresh_token=False,
)
if settings.auth_method == "oidc":
if not settings.allow_interactive_oidc:
raise OpenEOAuthenticationError(
"Interactive OIDC auth is disabled. Use client credentials in Celery workers."
)
auth_kwargs = {}
if settings.provider_id:
auth_kwargs["provider_id"] = settings.provider_id
return connection.authenticate_oidc(**auth_kwargs)
raise OpenEOAuthenticationError(f"Unsupported OPENEO_AUTH_METHOD: {settings.auth_method}")
except Exception as exc:
if isinstance(exc, OpenEOServiceError):
raise
raise OpenEOAuthenticationError(f"Failed to authenticate with openEO backend: {exc}") from exc
def build_feature_collection(cells: list[AnalysisGridCell]) -> dict[str, Any]:
features = []
for cell in cells:
features.append(
{
"type": "Feature",
"id": cell.cell_code,
"properties": {
"cell_code": cell.cell_code,
"block_code": cell.block_code,
"soil_location_id": cell.soil_location_id,
},
"geometry": cell.geometry,
}
)
return {"type": "FeatureCollection", "features": features}
def build_spatial_extent(cells: list[AnalysisGridCell]) -> dict[str, float]:
if not cells:
raise ValueError("At least one analysis grid cell is required.")
west = None
east = None
south = None
north = None
for cell in cells:
for lon, lat in _iter_geometry_lon_lat_pairs(cell.geometry):
west = lon if west is None else min(west, lon)
east = lon if east is None else max(east, lon)
south = lat if south is None else min(south, lat)
north = lat if north is None else max(north, lat)
return {
"west": float(west),
"south": float(south),
"east": float(east),
"north": float(north),
}
def _iter_geometry_lon_lat_pairs(geometry: dict[str, Any] | None):
geometry = dict(geometry or {})
geometry_type = geometry.get("type")
coordinates = geometry.get("coordinates") or []
if geometry_type == "Polygon":
for ring in coordinates:
for point in ring or []:
if len(point) >= 2:
yield point[0], point[1]
return
if geometry_type == "MultiPolygon":
for polygon in coordinates:
for ring in polygon or []:
for point in ring or []:
if len(point) >= 2:
yield point[0], point[1]
return
def build_empty_metric_payload() -> dict[str, Any]:
return {metric_name: None for metric_name in METRIC_NAMES}
def initialize_metric_result_map(cells: list[AnalysisGridCell]) -> dict[str, dict[str, Any]]:
return {cell.cell_code: build_empty_metric_payload() for cell in cells}
def compute_remote_sensing_metrics(
cells: list[AnalysisGridCell],
*,
temporal_start: date | str,
temporal_end: date | str,
selected_features: list[str] | None = None,
progress_callback=None,
connection=None,
) -> dict[str, Any]:
"""
Compute all requested remote sensing metrics in batch mode per metric.
Returns a normalized structure keyed by `cell_code`, plus execution metadata
that can be stored by Celery tasks and Django models.
"""
if not cells:
return {
"results": {},
"metadata": {
"backend": DEFAULT_OPENEO_PROVIDER,
"collections_used": [],
"job_refs": {},
"failed_metrics": [],
},
}
connection = connection or connect_openeo()
feature_collection = build_feature_collection(cells)
spatial_extent = build_spatial_extent(cells)
log_openeo_request_summary(
cells=cells,
temporal_start=temporal_start,
temporal_end=temporal_end,
spatial_extent=spatial_extent,
selected_features=selected_features or list(METRIC_NAMES),
)
expected_feature_ids = [cell.cell_code for cell in cells]
results = initialize_metric_result_map(cells)
metadata = {
"backend": DEFAULT_OPENEO_PROVIDER,
"backend_url": DEFAULT_OPENEO_BACKEND_URL,
"collections_used": [
SENTINEL2_COLLECTION,
SENTINEL1_COLLECTION,
],
"job_refs": {},
"failed_metrics": [],
"payload_diagnostics": {},
}
metric_runners = [
("ndvi", compute_ndvi),
("ndwi", compute_ndwi),
("soil_vv", compute_soil_vv),
]
for metric_name, runner in metric_runners:
try:
if progress_callback is not None:
progress_callback(metric_name=metric_name, state="running", metadata=metadata)
metric_payload = runner(
connection=connection,
feature_collection=feature_collection,
spatial_extent=spatial_extent,
temporal_start=temporal_start,
temporal_end=temporal_end,
expected_feature_ids=expected_feature_ids,
)
merge_metric_results(results, metric_payload["results"])
metadata["job_refs"][metric_name] = metric_payload.get("job_ref")
metadata["payload_diagnostics"][metric_name] = metric_payload.get("payload_diagnostics", {})
if progress_callback is not None:
progress_callback(
metric_name=metric_name,
state="completed",
metadata=metadata,
metric_payload=metric_payload,
)
except Exception as exc:
if progress_callback is not None:
progress_callback(metric_name=metric_name, state="failed", metadata=metadata, error=str(exc))
raise OpenEOExecutionError(f"Failed to compute metric `{metric_name}`: {exc}") from exc
for cell_code, payload in results.items():
soil_vv = payload.get("soil_vv")
payload["soil_vv_db"] = linear_to_db(soil_vv)
return {"results": results, "metadata": metadata}
def log_openeo_request_summary(
*,
cells: list[AnalysisGridCell],
temporal_start: date | str,
temporal_end: date | str,
spatial_extent: dict[str, float],
selected_features: list[str],
) -> None:
start_date = _parse_date_value(temporal_start)
end_date = _parse_date_value(temporal_end)
logger.info(
"openEO request summary: %s",
_serialize_for_log(
{
"cell_count": len(cells),
"date_range_days": max((end_date - start_date).days, 0) + 1,
"area_m2": round(_estimate_extent_area_m2(spatial_extent), 2),
"metrics": selected_features,
"spatial_extent": spatial_extent,
"temporal_start": start_date.isoformat(),
"temporal_end": end_date.isoformat(),
}
),
)
def _estimate_extent_area_m2(spatial_extent: dict[str, float]) -> float:
west = float(spatial_extent["west"])
east = float(spatial_extent["east"])
south = float(spatial_extent["south"])
north = float(spatial_extent["north"])
mean_lat_rad = math.radians((south + north) / 2.0)
meters_per_degree_lat = 111_320.0
meters_per_degree_lon = 111_320.0 * math.cos(mean_lat_rad)
width_m = max(east - west, 0.0) * meters_per_degree_lon
height_m = max(north - south, 0.0) * meters_per_degree_lat
return max(width_m, 0.0) * max(height_m, 0.0)
def compute_ndvi(
*,
connection,
feature_collection,
spatial_extent,
temporal_start,
temporal_end,
expected_feature_ids: list[str] | None = None,
) -> dict[str, Any]:
cube = connection.load_collection(
SENTINEL2_COLLECTION,
spatial_extent=spatial_extent,
temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)],
bands=["B03", "B04", "B08", "SCL"],
)
scl = cube.band("SCL")
invalid_mask = (scl != VALID_SCL_CLASSES[0]) & (scl != VALID_SCL_CLASSES[1]) & (scl != VALID_SCL_CLASSES[2])
red = cube.band("B04") * 0.0001
nir = cube.band("B08") * 0.0001
ndvi = ((nir - red) / (nir + red)).mask(invalid_mask.resample_cube_spatial(red))
aggregated, job_ref = _run_aggregate_spatial_job(
ndvi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"),
metric_name="ndvi",
)
payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="ndvi", job_ref=job_ref)
return {
"results": parse_aggregate_spatial_response(
aggregated,
"ndvi",
job_ref=job_ref,
expected_feature_ids=expected_feature_ids,
),
"job_ref": job_ref,
"payload_diagnostics": payload_diagnostics,
}
def compute_ndwi(
*,
connection,
feature_collection,
spatial_extent,
temporal_start,
temporal_end,
expected_feature_ids: list[str] | None = None,
) -> dict[str, Any]:
cube = connection.load_collection(
SENTINEL2_COLLECTION,
spatial_extent=spatial_extent,
temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)],
bands=["B03", "B08", "SCL"],
)
scl = cube.band("SCL")
invalid_mask = (scl != VALID_SCL_CLASSES[0]) & (scl != VALID_SCL_CLASSES[1]) & (scl != VALID_SCL_CLASSES[2])
green = cube.band("B03") * 0.0001
nir = cube.band("B08") * 0.0001
ndwi = ((green - nir) / (green + nir)).mask(invalid_mask.resample_cube_spatial(green))
aggregated, job_ref = _run_aggregate_spatial_job(
ndwi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"),
metric_name="ndwi",
)
payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="ndwi", job_ref=job_ref)
return {
"results": parse_aggregate_spatial_response(
aggregated,
"ndwi",
job_ref=job_ref,
expected_feature_ids=expected_feature_ids,
),
"job_ref": job_ref,
"payload_diagnostics": payload_diagnostics,
}
def compute_soil_vv(
*,
connection,
feature_collection,
spatial_extent,
temporal_start,
temporal_end,
expected_feature_ids: list[str] | None = None,
) -> dict[str, Any]:
cube = connection.load_collection(
SENTINEL1_COLLECTION,
spatial_extent=spatial_extent,
temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)],
bands=["VV"],
)
vv = cube.band("VV")
aggregated, job_ref = _run_aggregate_spatial_job(
vv.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"),
metric_name="soil_vv",
)
payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="soil_vv", job_ref=job_ref)
return {
"results": parse_aggregate_spatial_response(
aggregated,
"soil_vv",
job_ref=job_ref,
expected_feature_ids=expected_feature_ids,
),
"job_ref": job_ref,
"payload_diagnostics": payload_diagnostics,
}
def _run_aggregate_spatial_job(process: Any, *, metric_name: str) -> tuple[Any, str | None]:
title = f"crop-logic-{metric_name}"
description = f"Remote sensing aggregate_spatial execution for metric `{metric_name}`."
logger.info(
"openEO process graph prepared: %s",
_serialize_for_log(
{
"metric_name": metric_name,
"title": title,
"description": description,
"process_graph": process.flat_graph() if hasattr(process, "flat_graph") else None,
}
),
)
if hasattr(process, "create_job"):
job = process.create_job(
title=title,
description=description,
out_format="JSON",
)
logger.info(
"openEO batch job created: %s",
_serialize_for_log({"metric_name": metric_name, "job_ref": _extract_job_ref(job)}),
)
started_job = job.start_and_wait()
if started_job is not None:
job = started_job
logger.info(
"openEO batch job finished: %s",
_serialize_for_log({"metric_name": metric_name, "job_ref": _extract_job_ref(job)}),
)
return _load_job_result_payload(job, metric_name=metric_name), _extract_job_ref(job)
logger.info("openEO process uses synchronous execution fallback for metric `%s`.", metric_name)
return process.execute(), None
def _load_job_result_payload(job: Any, *, metric_name: str) -> Any:
results = job.get_results()
job_ref = _extract_job_ref(job)
if hasattr(results, "download_files"):
with TemporaryDirectory(prefix="openeo-job-") as temp_dir:
results.download_files(temp_dir)
downloaded_files = sorted(str(path.relative_to(temp_dir)) for path in Path(temp_dir).rglob("*") if path.is_file())
logger.info(
"openEO batch job files downloaded: %s",
_serialize_for_log({"job_ref": job_ref, "files": downloaded_files}),
)
payload, payload_path = _load_first_json_payload_with_source(Path(temp_dir), job_ref=job_ref)
if payload is not None:
if payload_path is not None:
_persist_raw_payload_file(
source_path=payload_path,
metric_name=metric_name,
job_ref=job_ref,
)
return payload
if hasattr(results, "get_metadata"):
metadata = results.get_metadata()
if isinstance(metadata, dict) and metadata.get("data") is not None:
_persist_raw_payload_value(
payload=metadata["data"],
metric_name=metric_name,
job_ref=job_ref,
)
return metadata["data"]
raise OpenEOExecutionError(
f"openEO batch job `{job_ref or 'unknown'}` completed but no JSON result payload could be loaded."
)
def _load_first_json_payload(directory: Path, *, job_ref: str | None = None) -> Any | None:
payload, _source_path = _load_first_json_payload_with_source(directory, job_ref=job_ref)
return payload
def _load_first_json_payload_with_source(
directory: Path,
*,
job_ref: str | None = None,
) -> tuple[Any | None, Path | None]:
asset_payload, asset_path = _load_stac_asset_payload(directory, job_ref=job_ref)
if asset_payload is not None:
return asset_payload, asset_path
for candidate in sorted(directory.rglob("*.json")):
payload = _read_json_file(candidate, job_ref=job_ref)
if payload is None:
continue
if _looks_like_stac_metadata_payload(payload):
continue
return payload, candidate
return None, None
def _load_stac_asset_payload(directory: Path, *, job_ref: str | None = None) -> tuple[Any | None, Path | None]:
for candidate in sorted(directory.rglob("*.json")):
payload = _read_json_file(candidate, job_ref=job_ref)
if not _looks_like_stac_metadata_payload(payload):
continue
for asset_name, asset_path in _iter_stac_asset_paths(payload, directory):
if asset_path.suffix.lower() != ".json":
continue
if not asset_path.exists():
logger.warning(
"openEO STAC asset file is missing: %s",
_serialize_for_log(
{
"job_ref": job_ref,
"stac_path": str(candidate),
"asset_name": asset_name,
"asset_path": str(asset_path),
}
),
)
continue
logger.info(
"openEO batch job selecting STAC asset payload: %s",
_serialize_for_log(
{
"job_ref": job_ref,
"stac_path": str(candidate),
"asset_name": asset_name,
"asset_path": str(asset_path),
}
),
)
return _read_json_file(asset_path, job_ref=job_ref), asset_path
return None, None
def _persist_raw_payload_file(
*,
source_path: Path,
metric_name: str,
job_ref: str | None,
) -> None:
archive_path = _build_payload_archive_path(
metric_name=metric_name,
job_ref=job_ref,
source_name=source_path.name,
)
if archive_path is None:
return
raw_bytes = source_path.read_bytes()
archive_path.parent.mkdir(parents=True, exist_ok=True)
archive_path.write_bytes(raw_bytes)
logger.info(
"openEO raw payload archived: %s",
_serialize_for_log(
{
"job_ref": job_ref,
"metric_name": metric_name,
"source_path": str(source_path),
"archive_path": str(archive_path),
}
),
)
def _persist_raw_payload_value(
*,
payload: Any,
metric_name: str,
job_ref: str | None,
) -> None:
archive_path = _build_payload_archive_path(
metric_name=metric_name,
job_ref=job_ref,
source_name="metadata.json",
)
if archive_path is None:
return
archive_path.parent.mkdir(parents=True, exist_ok=True)
archive_path.write_text(
json.dumps(payload, ensure_ascii=True, indent=2, sort_keys=False, default=str),
encoding="utf-8",
)
logger.info(
"openEO raw payload archived from metadata: %s",
_serialize_for_log(
{
"job_ref": job_ref,
"metric_name": metric_name,
"archive_path": str(archive_path),
}
),
)
def _build_payload_archive_path(
*,
metric_name: str,
job_ref: str | None,
source_name: str,
) -> Path | None:
archive_dir = str(os.environ.get("OPENEO_PAYLOAD_ARCHIVE_DIR", DEFAULT_OPENEO_PAYLOAD_ARCHIVE_DIR)).strip()
if not archive_dir:
return None
safe_job_ref = _sanitize_filename_component(job_ref or "unknown-job")
safe_metric_name = _sanitize_filename_component(metric_name or "unknown-metric")
safe_source_name = _sanitize_filename_component(source_name or "payload.json")
return Path(archive_dir) / f"{safe_job_ref}__{safe_metric_name}__{safe_source_name}"
def _sanitize_filename_component(value: str) -> str:
text = str(value or "").strip() or "unknown"
sanitized = "".join(character if character.isalnum() or character in {"-", "_", "."} else "_" for character in text)
return sanitized or "unknown"
def _iter_stac_asset_paths(payload: Any, directory: Path) -> list[tuple[str, Path]]:
if not isinstance(payload, dict):
return []
assets = payload.get("assets")
if not isinstance(assets, dict):
return []
resolved_paths: list[tuple[str, Path]] = []
for asset_name, asset_details in assets.items():
if not isinstance(asset_details, dict):
continue
href = asset_details.get("href")
if not href:
continue
raw_path = Path(str(href))
if raw_path.is_absolute():
resolved = directory / raw_path.name
else:
resolved = directory / raw_path
resolved_paths.append((str(asset_name), resolved))
return resolved_paths
def _looks_like_stac_metadata_path(path: Path) -> bool:
name = path.name.lower()
return name in {"item.json", "collection.json"} or name.endswith(".stac-item.json")
def _looks_like_stac_metadata_payload(payload: Any) -> bool:
return isinstance(payload, dict) and "assets" in payload and any(
key in payload for key in ("stac_version", "stac_extensions", "extent", "summaries")
)
def _read_json_file(path: Path, *, job_ref: str | None = None) -> Any:
raw_text = path.read_text(encoding="utf-8", errors="replace")
if not raw_text.strip():
logger.warning(
"openEO batch job JSON file is empty: %s",
_serialize_for_log({"job_ref": job_ref, "path": str(path), "preview": raw_text[:500]}),
)
return None
try:
return json.loads(raw_text)
except json.JSONDecodeError as exc:
logger.exception(
"openEO batch job JSON parsing failed: %s",
_serialize_for_log(
{
"job_ref": job_ref,
"path": str(path),
"error": str(exc),
"preview": raw_text[:1000],
}
),
)
raise OpenEOExecutionError(
f"Failed to parse openEO batch result file `{path.name}` for job `{job_ref or 'unknown'}`: {exc}"
) from exc
def _extract_job_ref(job: Any) -> str | None:
for attribute_name in ("job_id", "id"):
value = getattr(job, attribute_name, None)
if value:
return str(value)
if hasattr(job, "describe_job"):
metadata = job.describe_job()
if isinstance(metadata, dict) and metadata.get("id"):
return str(metadata["id"])
return None
def parse_aggregate_spatial_response(
payload: Any,
metric_name: str,
*,
job_ref: str | None = None,
expected_feature_ids: list[str] | None = None,
) -> dict[str, dict[str, Any]]:
"""
Parse different JSON shapes returned by openEO aggregate_spatial executions.
"""
if payload is None:
return {}
if isinstance(payload, dict) and payload.get("type") == "FeatureCollection":
return _parse_feature_collection_results(payload, metric_name)
if isinstance(payload, dict) and "features" in payload:
return _parse_feature_collection_results(payload, metric_name)
if isinstance(payload, dict):
return _parse_mapping_results(
payload,
metric_name,
job_ref=job_ref,
expected_feature_ids=expected_feature_ids,
)
if isinstance(payload, list):
return _parse_list_results(
payload,
metric_name,
job_ref=job_ref,
expected_feature_ids=expected_feature_ids,
)
raise OpenEOExecutionError(f"Unsupported openEO aggregate_spatial response type: {type(payload)!r}")
def _parse_feature_collection_results(payload: dict[str, Any], metric_name: str) -> dict[str, dict[str, Any]]:
results: dict[str, dict[str, Any]] = {}
for feature in payload.get("features", []):
feature_id = str(
feature.get("id")
or (feature.get("properties") or {}).get("cell_code")
or (feature.get("properties") or {}).get("id")
)
if not feature_id:
continue
properties = feature.get("properties") or {}
_log_feature_mismatch(feature_id, properties, metric_name)
value = _extract_aggregate_value(properties)
results[feature_id] = {metric_name: _coerce_float(value)}
return results
def _parse_mapping_results(
payload: dict[str, Any],
metric_name: str,
*,
job_ref: str | None = None,
expected_feature_ids: list[str] | None = None,
) -> dict[str, dict[str, Any]]:
if "data" in payload and isinstance(payload["data"], (dict, list)):
return parse_aggregate_spatial_response(
payload["data"],
metric_name,
job_ref=job_ref,
expected_feature_ids=expected_feature_ids,
)
results: dict[str, dict[str, Any]] = {}
for feature_id, value in payload.items():
if feature_id in {"type", "links", "meta"}:
continue
normalized_feature_id = _normalize_feature_id(
feature_id,
expected_feature_ids=expected_feature_ids,
)
if isinstance(value, dict):
_log_feature_mismatch(str(normalized_feature_id), value, metric_name)
results[str(normalized_feature_id)] = {metric_name: _coerce_float(_extract_aggregate_value(value))}
return results
def _parse_list_results(
payload: list[Any],
metric_name: str,
*,
job_ref: str | None = None,
expected_feature_ids: list[str] | None = None,
) -> dict[str, dict[str, Any]]:
results: dict[str, dict[str, Any]] = {}
for index, item in enumerate(payload):
if isinstance(item, dict):
feature_id = str(
item.get("id")
or item.get("cell_code")
or item.get("feature_id")
or _normalize_feature_id(index, expected_feature_ids=expected_feature_ids)
)
_log_feature_mismatch(feature_id, item, metric_name)
value = _extract_aggregate_value(item)
else:
feature_id = str(_normalize_feature_id(index, expected_feature_ids=expected_feature_ids))
value = _extract_aggregate_value(item)
results[feature_id] = {metric_name: _coerce_float(value)}
return results
def _normalize_feature_id(
raw_feature_id: Any,
*,
expected_feature_ids: list[str] | None = None,
) -> str:
feature_id = str(raw_feature_id)
if not expected_feature_ids:
return feature_id
try:
index = int(feature_id)
except (TypeError, ValueError):
return feature_id
if index < 0 or index >= len(expected_feature_ids):
return feature_id
return str(expected_feature_ids[index])
def _log_raw_payload_summary(payload: Any, *, metric_name: str, job_ref: str | None = None) -> dict[str, Any]:
payload_cells = _extract_payload_cells(payload)
payload_keys_sample = [cell_code for cell_code, _raw in payload_cells[:5]]
available_features = sorted(_collect_payload_feature_names(payload))
returned_cell_count = len(payload_cells)
is_empty = returned_cell_count == 0
if is_empty:
logger.warning("openEO payload is empty for job_ref=%s", job_ref)
logger.info(
"openEO payload summary: %s",
_serialize_for_log(
{
"metric_name": metric_name,
"job_ref": job_ref,
"returned_cell_count": returned_cell_count,
"payload_keys_sample": payload_keys_sample,
"available_features": available_features,
}
),
)
return {
"returned_cell_count": returned_cell_count,
"payload_keys_sample": payload_keys_sample,
"available_features": available_features,
}
def _extract_payload_cells(payload: Any) -> list[tuple[str, Any]]:
if payload is None:
return []
if isinstance(payload, dict) and payload.get("type") == "FeatureCollection":
cells = []
for feature in payload.get("features", []):
feature_id = str(
feature.get("id")
or (feature.get("properties") or {}).get("cell_code")
or (feature.get("properties") or {}).get("id")
or ""
)
if feature_id:
cells.append((feature_id, feature.get("properties") or {}))
return cells
if isinstance(payload, dict) and "features" in payload and isinstance(payload["features"], list):
return _extract_payload_cells({"type": "FeatureCollection", "features": payload["features"]})
if isinstance(payload, dict) and "data" in payload and isinstance(payload["data"], (dict, list)):
return _extract_payload_cells(payload["data"])
if isinstance(payload, dict):
return [
(str(feature_id), value)
for feature_id, value in payload.items()
if feature_id not in {"type", "links", "meta", "data"}
]
if isinstance(payload, list):
cells = []
for index, item in enumerate(payload):
if isinstance(item, dict):
feature_id = str(item.get("id") or item.get("cell_code") or item.get("feature_id") or index)
else:
feature_id = str(index)
cells.append((feature_id, item))
return cells
return []
def _collect_payload_feature_names(payload: Any) -> set[str]:
names: set[str] = set()
for _cell_code, raw_value in _extract_payload_cells(payload):
if isinstance(raw_value, dict):
names.update(str(key) for key in raw_value.keys())
return names
def _log_feature_mismatch(cell_code: str, raw_value: dict[str, Any], metric_name: str) -> None:
available_keys = sorted(str(key) for key in raw_value.keys())
if not available_keys:
return
recognized_keys = set(CLUSTER_METRIC_NAMES) | {
metric_name,
"mean",
"value",
"result",
"average",
"id",
"cell_code",
}
if not any(key in recognized_keys for key in available_keys):
logger.warning("Feature mismatch for cell=%s, available_keys=%s", cell_code, available_keys)
def _extract_aggregate_value(value: Any) -> Any:
if isinstance(value, dict):
for key in ("mean", "value", "result", "average"):
if key in value:
return _extract_aggregate_value(value[key])
if len(value) == 1:
return _extract_aggregate_value(next(iter(value.values())))
return None
if isinstance(value, list):
if not value:
return None
return _extract_aggregate_value(value[0])
return value
def merge_metric_results(target: dict[str, dict[str, Any]], updates: dict[str, dict[str, Any]]) -> None:
for cell_code, values in updates.items():
target.setdefault(cell_code, build_empty_metric_payload())
target[cell_code].update(values)
def linear_to_db(value: Any) -> float | None:
numeric = _coerce_float(value)
if numeric is None or numeric <= 0:
return None
return round(10.0 * math.log10(numeric), 6)
def infer_band_name(cube, preferred: tuple[str, ...]) -> str | None:
"""
Best-effort band name selection for collections with backend-specific naming.
"""
metadata = getattr(cube, "metadata", None)
if metadata is None:
return None
band_dimension = getattr(metadata, "band_dimension", None)
bands = getattr(band_dimension, "bands", None)
if not bands:
return None
available = []
for band in bands:
name = getattr(band, "name", None) or str(band)
available.append(name)
for candidate in preferred:
if candidate in available:
return candidate
return available[0] if available else None
def _coerce_float(value: Any) -> float | None:
if value is None:
return None
if isinstance(value, Decimal):
return float(value)
try:
return float(value)
except (TypeError, ValueError):
return None
def _normalize_date(value: date | str) -> str:
if isinstance(value, date):
return value.isoformat()
return str(value)
def _parse_date_value(value: date | str) -> date:
if isinstance(value, date):
return value
return date.fromisoformat(str(value))