diff --git a/.env.example b/.env.example index 447d119..9a435fb 100644 --- a/.env.example +++ b/.env.example @@ -43,7 +43,9 @@ PROXYCHAINS_CHAIN_MODE=strict_chain # openEO remote sensing auth OPENEO_BACKEND_URL=https://openeofed.dataspace.copernicus.eu -OPENEO_TIMEOUT_SECONDS=60 +OPENEO_TIMEOUT_SECONDS=600 +OPENEO_HTTP_RETRY_TOTAL=5 +OPENEO_HTTP_RETRY_BACKOFF_FACTOR=2.0 # Use `password` when filling OPENEO_USERNAME/OPENEO_PASSWORD instead of client credentials. OPENEO_AUTH_METHOD=client_credentials OPENEO_AUTH_CLIENT_ID= diff --git a/TESTING.md b/TESTING.md new file mode 100644 index 0000000..c750162 --- /dev/null +++ b/TESTING.md @@ -0,0 +1,121 @@ +# Local Test Workflow + +This project can be tested in a local virtualenv that mirrors the Python package source used by the Docker images. + +## 1. Create the virtualenv + +```bash +python3 -m venv .venv-test +source .venv-test/bin/activate +``` + +## 2. Install dependencies with the Docker mirror + +### Option A: full project install + +Use this when your local Python version matches Docker (`3.10`). + +The development Docker image installs packages from `https://mirror-pypi.runflare.com/simple`. + +```bash +source .venv-test/bin/activate +pip --isolated install \ + --prefer-binary \ + --index-url https://mirror-pypi.runflare.com/simple \ + --trusted-host mirror-pypi.runflare.com \ + -c constraints.txt \ + -r requirements.txt +``` + +### Option B: focused openEO test install + +I used this path successfully on a host with Python `3.14`, where the pinned full requirements are not Docker-compatible because the project pins older `numpy` for Python `3.10`. + +```bash +source .venv-test/bin/activate +pip --isolated install \ + --prefer-binary \ + --index-url https://mirror-pypi.runflare.com/simple \ + --trusted-host mirror-pypi.runflare.com \ + Django \ + requests \ + PySocks \ + openeo \ + python-dotenv \ + Pillow +``` + +If the mirror is temporarily slow in your environment, retry the same command once more before changing indexes. + +## 3. Run the focused openEO test file + +### Standard Django command + +```bash +source .venv-test/bin/activate +python manage.py test location_data.test_openeo_service +``` + +### Minimal runner used on a non-Docker local setup + +This avoids loading the whole project URL tree and only boots the apps needed by `location_data.test_openeo_service`. + +```bash +source .venv-test/bin/activate +python - <<'PY' +import sys, types +import django +from django.conf import settings +from django.urls import path + +url_module = types.ModuleType("local_test_urls") +url_module.urlpatterns = [path("__test__/", lambda request: None)] +sys.modules["local_test_urls"] = url_module + +settings.configure( + SECRET_KEY="test-secret", + USE_TZ=True, + DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}, + INSTALLED_APPS=[ + "django.contrib.auth", + "django.contrib.contenttypes", + "location_data", + ], + MIDDLEWARE=[], + ROOT_URLCONF="local_test_urls", + DEFAULT_AUTO_FIELD="django.db.models.BigAutoField", +) + +django.setup() +from django.test.utils import get_runner + +runner = get_runner(settings)() +failures = runner.run_tests(["location_data.test_openeo_service"]) +raise SystemExit(bool(failures)) +PY +``` + +## 4. Run the broader location-data suite + +```bash +source .venv-test/bin/activate +python manage.py test location_data +``` + +## 5. Run a syntax-only validation + +This is useful when dependencies are still being installed or when you want a very fast sanity check. + +```bash +python3 -m py_compile \ + location_data/openeo_service.py \ + location_data/tasks.py \ + location_data/test_openeo_service.py \ + config/proxy.py +``` + +## Notes + +- The openEO request timeout is configured through `OPENEO_TIMEOUT_SECONDS`. +- The current default in code and `.env.example` is `600` seconds. +- openEO request logging is emitted from `location_data.openeo_service`, including request payloads, process graphs, downloaded batch result files, and parse failures. diff --git a/docs/location_data_remote_sensing_failure_report.md b/docs/location_data_remote_sensing_failure_report.md new file mode 100644 index 0000000..a80c20f --- /dev/null +++ b/docs/location_data_remote_sensing_failure_report.md @@ -0,0 +1,446 @@ +# گزارش تحلیل خطای `location_data` برای Remote Sensing Run + +این سند بر اساس payload دریافتی از endpoint زیر تهیه شده است: + +- `GET /api/location-data/remote-sensing/runs/{task_id}/status/` + +نمونه run بررسی‌شده: + +- `task_id`: `02959179-63b2-4388-bb2e-a4d79176eca2` +- `run.id`: `6` +- بازه زمانی: `2026-04-09` تا `2026-05-09` +- وضعیت API: `failed` +- وضعیت Celery: `RETRY` + +--- + +## 1) خلاصه خیلی کوتاه + +این run تا مرحله واکشی metricها و ذخیره observationها جلو رفته، اما در مرحله ساخت subdivision داده‌محور شکست خورده است. + +خطای نهایی: + +- `هیچ observation قابل استفاده‌ای برای خوشه‌بندی باقی نماند.` + +این خطا در کد دقیقاً زمانی رخ می‌دهد که سیستم بعد از بارگذاری observationها، هیچ ردیفی را برای clustering قابل استفاده تشخیص ندهد. + +مسیر کد: + +- `location_data/tasks.py` +- `location_data/data_driven_subdivision.py` + +--- + +## 2) جدول مدل‌ها / جدول‌های درگیر + +| مدل | جدول Django | نقش | +|---|---|---| +| `SoilLocation` | `location_data_soillocation` | اطلاعات location و farm boundary | +| `BlockSubdivision` | `location_data_blocksubdivision` | تقسیم‌بندی بلوک/مزرعه | +| `RemoteSensingRun` | `location_data_remotesensingrun` | وضعیت اجرای pipeline سنجش‌ازدور | +| `AnalysisGridCell` | `location_data_analysisgridcell` | سلول‌های شبکه تحلیلی | +| `AnalysisGridObservation` | `location_data_analysisgridobservation` | مقادیر metricها برای هر cell | +| `RemoteSensingSubdivisionResult` | `location_data_remotesensingsubdivisionresult` | نتیجه clustering و subdivision نهایی | +| `RemoteSensingClusterAssignment` | `location_data_remotesensingclusterassignment` | نسبت هر cell به cluster | + +--- + +## 3) جدول معنی فیلدهای سطح اول response + +| فیلد | مقدار نمونه | توضیح | +|---|---|---| +| `code` | `200` | پاسخ HTTP موفق از API؛ لزوماً به معنای موفق بودن task نیست | +| `msg` | `success` | پیام wrapper API | +| `data.status` | `failed` | وضعیت client-facing run | +| `data.source` | `database` | payload از داده‌های ذخیره‌شده در DB ساخته شده | +| `data.task_id` | UUID | شناسه Celery task | +| `data.run` | object | snapshot اصلی run | +| `data.task` | object | جزئیات stageها، timestamps و وضعیت Celery | + +--- + +## 4) جدول معنی فیلدهای `run` + +| فیلد | مقدار نمونه | توضیح | +|---|---|---| +| `id` | `6` | شناسه رکورد run در DB | +| `block_code` | `""` | خالی یعنی کل مزرعه، نه یک بلوک خاص | +| `chunk_size_sqm` | `900` | اندازه هر cell به متر مربع | +| `temporal_start` | `2026-04-09` | شروع بازه تحلیل | +| `temporal_end` | `2026-05-09` | پایان بازه تحلیل | +| `status` | `failure` | مقدار خام DB | +| `status_label` | `failed` | نسخه نرمال‌شده برای client | +| `pipeline_status` | `failed` | تکرار client-facing status | +| `stage` | `observations_persisted` | آخرین stage ذخیره‌شده در metadata | +| `selected_features` | `ndvi`, `ndwi`, `lst_c`, `soil_vv_db`, `dem_m`, `slope_deg` | featureهایی که pipeline فکر می‌کند برای clustering لازم‌اند | +| `requested_cluster_count` | `null` | تعداد cluster صریح از کاربر نیامده و الگوریتم باید تصمیم بگیرد | +| `error_message` | متن فارسی خطا | خطای نهایی ثبت‌شده روی run | +| `started_at` | timestamp | زمان شروع واقعی run | +| `finished_at` | timestamp | زمان ثبت failure روی run | +| `created_at` | timestamp | زمان ایجاد رکورد run | +| `updated_at` | timestamp | آخرین به‌روزرسانی رکورد run | + +--- + +## 5) جدول معنی فیلدهای `run.metadata` + +| فیلد | توضیح | +|---|---| +| `scope` | scope اجرای تحلیل؛ در این نمونه `all_blocks` | +| `stage` | stage فعلی/آخر ذخیره‌شده | +| `task_id` | شناسه Celery task | +| `pipeline.name` | نام pipeline | +| `pipeline.version` | نسخه pipeline | +| `farm_uuid` | شناسه مزرعه | +| `timestamps` | زمان ورود به هر stage | +| `status_label` | وضعیت client-facing ذخیره‌شده در metadata | +| `requested_via` | مبدا درخواست؛ در این نمونه `api` | +| `stage_details` | جزئیات هر stage | +| `failure_reason` | دلیل نهایی ثبت failure | +| `selected_features` | featureهای مورد استفاده برای clustering | +| `requested_cluster_count` | cluster count درخواستی، اگر وجود داشته باشد | + +--- + +## 6) جدول معنی stageها در این run + +| stage | وضعیت مشاهده‌شده | معنی | +|---|---|---| +| `queued` | completed | task در صف قرار گرفته | +| `preparing_analysis_grid` | completed | آماده‌سازی grid تحلیلی | +| `analysis_grid_ready` | completed | grid سلول‌ها آماده بوده | +| `analysis_cells_selected` | completed | 12 سلول برای پردازش انتخاب شده‌اند | +| `fetching_remote_metrics` | completed | metricها از openEO درخواست شده‌اند | +| `remote_metrics_fetched` | completed | jobهای openEO تمام شده‌اند | +| `observations_persisted` | failed | observationها ذخیره شده‌اند ولی pipeline بعد از این مرحله fail شده | +| `failed` | completed | stage شکست نهایی ثبت شده | + +نکته مهم: + +- در payload فعلی، `run.stage` هنوز `observations_persisted` است اما `data.status` برابر `failed` است. +- این یعنی status و stage با هم perfectly sync نیستند. + +--- + +## 7) جدول معنی `stage_details` + +### 7.1) `preparing_analysis_grid` + +| فیلد | توضیح | +|---|---| +| `block_code` | بلوک هدف | +| `temporal_extent.start_date` | شروع بازه | +| `temporal_extent.end_date` | پایان بازه | + +### 7.2) `analysis_grid_ready` + +| فیلد | توضیح | +|---|---| +| `grid_summary.created` | آیا grid همین run ساخته شده یا از قبل وجود داشته | +| `grid_summary.block_code` | کد بلوک | +| `grid_summary.total_count` | تعداد کل cellها | +| `grid_summary.created_count` | تعداد cell تازه ایجادشده | +| `grid_summary.chunk_size_sqm` | اندازه cell | +| `grid_summary.existing_count` | تعداد cellهای موجود از قبل | + +### 7.3) `analysis_cells_selected` + +| فیلد | توضیح | +|---|---| +| `force_refresh` | آیا cache نادیده گرفته شده | +| `total_cell_count` | تعداد کل cellها | +| `existing_cell_count` | تعداد cellهایی که داده از قبل داشته‌اند | +| `cell_count_to_process` | تعداد cellهایی که باید پردازش شوند | + +### 7.4) `fetching_remote_metrics` + +| فیلد | توضیح | +|---|---| +| `target_cells` | لیست cellهای هدف | +| `requested_cell_count` | تعداد cellهای هدف | +| `metric_progress.total_metrics` | تعداد metricهایی که pipeline انتظار دارد | +| `metric_progress.completed_metric_count` | تعداد metricهای کامل‌شده | +| `metric_progress.completed_metrics` | metricهای کامل‌شده واقعی | +| `metric_progress.failed_metrics` | metricهای fail شده | +| `metric_progress.states` | وضعیت هر metric از نگاه progress tracker | + +### 7.5) `remote_metrics_fetched` + +| فیلد | توضیح | +|---|---| +| `failed_metric_count` | تعداد metric fail شده | +| `service_metadata.backend` | backend مورد استفاده | +| `service_metadata.job_refs` | job id هر metric در openEO | +| `service_metadata.backend_url` | آدرس backend | +| `service_metadata.failed_metrics` | خطاهای metric-level | +| `service_metadata.collections_used` | collectionهای مصرف‌شده | + +### 7.6) `observations_persisted` + +| فیلد | توضیح | +|---|---| +| `created_count` | تعداد observation ساخته‌شده | +| `updated_count` | تعداد observation به‌روزشده | + +--- + +## 8) جدول معنی `task` + +| فیلد | مقدار نمونه | توضیح | +|---|---|---| +| `current_stage` | `observations_persisted` | stage فعلی/آخرین stage ثبت‌شده | +| `current_stage_details` | `created_count=12`, `updated_count=0` | جزئیات stage جاری | +| `timestamps` | object | timeline کامل stageها | +| `stages` | list | نسخه ترتیبی از stageها | +| `metric_progress` | object | وضعیت پیشرفت metricها | +| `failure_reason` | متن فارسی | خطای ثبت‌شده | +| `celery.state` | `RETRY` | Celery task هنوز final نشده و در retry است | +| `celery.ready` | `false` | نتیجه نهایی هنوز آماده نیست | +| `celery.successful` | `false` | task هنوز success نشده | +| `celery.failed` | `false` | task هنوز fail نهایی نشده | +| `celery.info` | متن خطا | پیام retry فعلی Celery | + +--- + +## 9) دلیل دقیق خطا + +دلیل مستقیم خطا از خود کد: + +- در `location_data/data_driven_subdivision.py` اگر بعد از ساخت dataset، هیچ observation قابل استفاده برای clustering باقی نماند، این exception پرتاب می‌شود: + - `هیچ observation قابل استفاده‌ای برای خوشه‌بندی باقی نماند.` + +این یعنی pipeline به این نقطه رسیده: + +1. cellها ساخته شده‌اند +2. metricها از openEO درخواست شده‌اند +3. observationها در DB ذخیره شده‌اند +4. اما در زمان ساخت dataset برای clustering، ردیف قابل مصرفی پیدا نشده است + +### معنی عملی این خطا + +یعنی برای همه observationهای این run، سیستم نتوانسته feature vector قابل استفاده بسازد. + +در کد فعلی، observation وقتی از clustering حذف می‌شود که: + +- همه featureهای انتخاب‌شده برای آن observation برابر `None` باشند + +پس این خطا معمولاً یعنی: + +- یا تمام metricهای لازم برای همه cellها `null` شده‌اند +- یا feature set انتخابی با داده‌هایی که واقعاً ذخیره شده‌اند mismatch دارد +- یا persistence / selection طوری انجام شده که clustering به داده قابل استفاده دسترسی پیدا نکرده است + +--- + +## 10) محتمل‌ترین علت‌ها برای همین run + +### علت 1: featureهای حذف‌شده هنوز در pipeline زنده هستند + +در payload می‌بینیم: + +- `selected_features` هنوز شامل `dem_m` و `slope_deg` است +- اما در `location_data/openeo_service.py` این metricها از محاسبه حذف شده‌اند + +نتیجه: + +- API و metadata هنوز فکر می‌کنند این دو feature بخشی از clustering هستند +- ولی openEO دیگر آن‌ها را تولید نمی‌کند +- بنابراین در observationها این فیلدها عملاً `None` می‌مانند + +نکته مهم: + +- فقط `None` بودن `dem_m` و `slope_deg` به‌تنهایی برای تولید این خطا کافی نیست +- اما این mismatch یکی از مهم‌ترین نشانه‌های ناسازگاری pipeline است + +### علت 2: metricهای اصلی احتمالاً برای همه cellها مقدار قابل استفاده نداشته‌اند + +چون این خطا فقط وقتی رخ می‌دهد که هیچ observation usable باقی نماند، محتمل است که برای همه 12 cell: + +- `ndvi` +- `ndwi` +- `lst_c` +- `soil_vv_db` + +هم عملاً `None` شده باشند یا آن‌طور که clustering انتظار دارد قابل مصرف نبوده باشند. + +این حالت ممکن است از اینجا آمده باشد: + +- داده خام provider برای این محدوده/بازه تهی یا نامعتبر بوده +- masking یا aggregation مقدار usable تولید نکرده +- یا در persistence، featureهای محاسبه‌شده به‌درستی به ستون‌های clustering نرسیده‌اند + +### علت 3: progress tracker با metricهای واقعی sync نیست + +در payload: + +- `completed_metrics` شامل `soil_vv` است +- ولی `states` شامل `soil_vv_db` به حالت `pending` است + +یعنی tracker با metric واقعی اجراشده sync نیست: + +- metric واقعی remote: `soil_vv` +- metric مشتق‌شده محلی: `soil_vv_db` + +این باعث می‌شود progress ظاهراً ناقص دیده شود، حتی وقتی داده remote کامل شده است. + +### علت 4: وضعیت DB و Celery با هم متناقض‌اند + +در payload: + +- `data.status = failed` +- ولی `task.celery.state = RETRY` + +یعنی: + +- از نظر DB، run شکست خورده ثبت شده +- از نظر Celery، task هنوز در حال retry است و failure نهایی نشده + +این برای client گیج‌کننده است، چون معلوم نیست باید run را تمام‌شده و fail شده بداند یا منتظر retry بماند. + +--- + +## 11) مشکلات فعلی `location_data` که از کد مشخص هستند + +### مشکل 1: `DEFAULT_CLUSTER_FEATURES` هنوز metricهای حذف‌شده را نگه داشته + +فایل: + +- `location_data/data_driven_subdivision.py` + +وضعیت فعلی: + +- `dem_m` +- `slope_deg` + +هنوز داخل `DEFAULT_CLUSTER_FEATURES` هستند. + +اثر: + +- `selected_features` اشتباه در run metadata +- progress tracker اشتباه +- clustering contract قدیمی باقی می‌ماند + +### مشکل 2: serializer و summary هنوز `dem_m` و `slope_deg` را به API برمی‌گردانند + +فایل‌ها: + +- `location_data/serializers.py` +- `location_data/views.py` + +اثر: + +- response API هنوز metricهای حذف‌شده را نمایش می‌دهد +- summary میانگین `dem_m_mean` و `slope_deg_mean` می‌سازد +- مصرف‌کننده API فکر می‌کند این metricها هنوز پشتیبانی می‌شوند + +### مشکل 3: `_upsert_grid_observations` هنوز فیلدهای حذف‌شده را persist می‌کند + +فایل: + +- `location_data/tasks.py` + +اثر: + +- ستون‌های `dem_m` و `slope_deg` همچنان نوشته می‌شوند، اما عملاً با `None` +- metadata هنوز `slope_supported` را ذخیره می‌کند، در حالی که openEO service دیگر این مفهوم را برنمی‌گرداند + +### مشکل 4: progress metricها با metricهای واقعی اجراشده mismatch دارد + +فایل: + +- `location_data/tasks.py` + +نمونه mismatch: + +- remote metric واقعی: `soil_vv` +- feature مورد انتظار برای clustering: `soil_vv_db` + +اثر: + +- `completed_metrics` و `states` با هم ناسازگار می‌شوند +- `soil_vv_db` در response به شکل `pending` دیده می‌شود، با اینکه از `soil_vv` مشتق می‌شود + +### مشکل 5: stage و status همیشه هماهنگ نیستند + +در payload فعلی: + +- `run.status_label = failed` +- `run.stage = observations_persisted` + +اثر: + +- client نمی‌فهمد خطا دقیقاً در کدام stage رخ داده +- `task.current_stage` هم گمراه‌کننده می‌شود + +### مشکل 6: DB failure و Celery retry همزمان به client داده می‌شود + +اثر: + +- API برای یک run همزمان پیام `failed` و `RETRY` می‌دهد +- از نگاه UX، کاربر نمی‌داند باید retry خودکار را صبر کند یا خطا را final بداند + +--- + +## 12) جمع‌بندی عملی برای همین خطا + +بر اساس payload و کد، نتیجه عملی این است: + +1. openEO jobها برای `ndvi`, `ndwi`, `lst_c`, `soil_vv` اجرا شده‌اند +2. 12 observation در DB ساخته شده‌اند +3. pipeline وارد مرحله clustering شده +4. clustering هیچ observation usable پیدا نکرده +5. run در DB به حالت `failed` رفته +6. اما Celery هنوز task را در حالت `RETRY` نگه داشته + +--- + +## 13) پیشنهادهای اصلاح + +### اصلاح ضروری + +1. `dem_m` و `slope_deg` را از این نقاط هم حذف کنید: + - `location_data/data_driven_subdivision.py` + - `location_data/serializers.py` + - `location_data/views.py` + - `location_data/tasks.py` + +2. `DEFAULT_CLUSTER_FEATURES` را با metricهای واقعی sync کنید: + - `ndvi` + - `ndwi` + - `lst_c` + - `soil_vv_db` + +3. progress tracker را طوری اصلاح کنید که: + - completion `soil_vv` به completion `soil_vv_db` هم map شود + - metricهای حذف‌شده اصلاً در state دیده نشوند + +4. منطق status endpoint را طوری تنظیم کنید که: + - اگر Celery در `RETRY` است، status client نهایی `failed` نباشد + - یا حداقل یک فیلد صریح مثل `final_status=false` برگردد + +5. قبل از clustering، یک diagnostic بهتر ذخیره شود: + - تعداد observationهای usable + - تعداد observationهای all-null + - تعداد null برای هر feature + +### اصلاح پیشنهادی برای debug بهتر + +در `stage_details` این موارد اضافه شود: + +- `usable_observation_count` +- `all_features_missing_cell_codes` +- `feature_null_counts` +- `selected_features_effective` + +--- + +## 14) نتیجه نهایی + +این run به احتمال زیاد به خاطر ناسازگاری بین featureهای مورد انتظار pipeline و featureهای واقعاً تولید/ذخیره‌شده شکست خورده است، و علاوه بر آن چند inconsistency مهم در `location_data` وجود دارد: + +- metricهای حذف‌شده هنوز در feature contract حضور دارند +- progress report با metricهای واقعی sync نیست +- DB status و Celery status با هم تناقض دارند +- stage نهایی برای failure به شکل واضح و قابل اتکا به client نمایش داده نمی‌شود + +اگر بخواهید، مرحله بعدی می‌تواند این باشد که همین موارد را در کد هم اصلاح کنیم، نه فقط مستندسازی. diff --git a/integration_tests/location_data_urls.py b/integration_tests/location_data_urls.py new file mode 100644 index 0000000..6dfae36 --- /dev/null +++ b/integration_tests/location_data_urls.py @@ -0,0 +1,6 @@ +from django.urls import include, path + + +urlpatterns = [ + path("api/location-data/", include("location_data.urls")), +] diff --git a/integration_tests/test_location_data_remote_sensing_flow.py b/integration_tests/test_location_data_remote_sensing_flow.py new file mode 100644 index 0000000..bd80699 --- /dev/null +++ b/integration_tests/test_location_data_remote_sensing_flow.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from datetime import date +from types import SimpleNamespace +from unittest.mock import patch +import uuid + +from django.test import override_settings + +from farm_data.models import SensorData +from integration_tests.base import IntegrationAPITestCase +from location_data.models import AnalysisGridCell, AnalysisGridObservation, RemoteSensingRun + + +@override_settings(ROOT_URLCONF="integration_tests.location_data_urls") +class RemoteSensingApiFlowTests(IntegrationAPITestCase): + def setUp(self) -> None: + super().setUp() + self.farm_uuid = uuid.UUID("11111111-1111-1111-1111-111111111111") + self.farm = SensorData.objects.create( + farm_uuid=self.farm_uuid, + center_location=self.primary_location, + sensor_payload={}, + ) + self.temporal_end = date(2026, 5, 9) + self.temporal_start = date(2026, 4, 9) + + @patch("location_data.views.timezone.localdate", return_value=date(2026, 5, 10)) + @patch("location_data.views.run_remote_sensing_analysis_task.delay") + def test_remote_sensing_post_and_status_flow(self, mock_delay, _mock_localdate) -> None: + task_id = uuid.UUID("9e7f3aaa-6d34-4428-a196-478af7a2d7f6") + mock_delay.return_value = SimpleNamespace(id=str(task_id)) + + enqueue_response = self.client.post( + "/api/location-data/remote-sensing/", + data={ + "farm_uuid": str(self.farm_uuid), + "force_refresh": False, + }, + format="json", + ) + + self.assertEqual(enqueue_response.status_code, 202, enqueue_response.json()) + enqueue_payload = enqueue_response.json()["data"] + self.assertEqual(enqueue_payload["status"], "processing") + self.assertEqual(enqueue_payload["source"], "processing") + self.assertEqual(enqueue_payload["task_id"], str(task_id)) + self.assertEqual(enqueue_payload["temporal_extent"]["start_date"], "2026-04-09") + self.assertEqual(enqueue_payload["temporal_extent"]["end_date"], "2026-05-09") + + run = RemoteSensingRun.objects.get(pk=enqueue_payload["run"]["id"]) + self.assertEqual(run.status, RemoteSensingRun.STATUS_PENDING) + self.assertEqual(run.temporal_start, self.temporal_start) + self.assertEqual(run.temporal_end, self.temporal_end) + self.assertEqual(run.metadata["task_id"], str(task_id)) + mock_delay.assert_called_once() + + status_response = self.client.get(f"/api/location-data/remote-sensing/runs/{task_id}/status/") + self.assertEqual(status_response.status_code, 200, status_response.json()) + status_payload = status_response.json()["data"] + self.assertEqual(status_payload["status"], "pending") + self.assertEqual(status_payload["run"]["pipeline_status"], "pending") + + cell = AnalysisGridCell.objects.create( + soil_location=self.primary_location, + block_code="", + cell_code="cell-1", + chunk_size_sqm=900, + geometry=self.primary_boundary, + centroid_lat=f"{self.primary_lat:.6f}", + centroid_lon=f"{self.primary_lon:.6f}", + ) + AnalysisGridObservation.objects.create( + cell=cell, + run=run, + temporal_start=self.temporal_start, + temporal_end=self.temporal_end, + ndvi=0.61, + ndwi=0.22, + lst_c=24.5, + soil_vv=0.13, + soil_vv_db=-8.860566, + dem_m=1550.0, + slope_deg=4.2, + metadata={"backend_name": "openeo"}, + ) + run.status = RemoteSensingRun.STATUS_SUCCESS + run.metadata = {**(run.metadata or {}), "stage": "completed", "selected_features": ["ndvi", "ndwi"]} + run.save(update_fields=["status", "metadata", "updated_at"]) + + completed_response = self.client.get(f"/api/location-data/remote-sensing/runs/{task_id}/status/") + self.assertEqual(completed_response.status_code, 200, completed_response.json()) + completed_payload = completed_response.json()["data"] + self.assertEqual(completed_payload["status"], "completed") + self.assertEqual(completed_payload["summary"]["cell_count"], 1) + self.assertEqual(completed_payload["summary"]["ndvi_mean"], 0.61) + self.assertEqual(len(completed_payload["cells"]), 1) + self.assertEqual(completed_payload["cells"][0]["cell_code"], "cell-1") diff --git a/location_data/data_driven_subdivision.py b/location_data/data_driven_subdivision.py index c509591..d08b576 100644 --- a/location_data/data_driven_subdivision.py +++ b/location_data/data_driven_subdivision.py @@ -1,6 +1,8 @@ from __future__ import annotations from dataclasses import dataclass +import json +import logging from typing import Any from django.db import transaction @@ -21,18 +23,22 @@ DEFAULT_CLUSTER_FEATURES = [ "ndwi", "lst_c", "soil_vv_db", - "dem_m", - "slope_deg", ] SUPPORTED_CLUSTER_FEATURES = tuple(DEFAULT_CLUSTER_FEATURES) DEFAULT_RANDOM_STATE = 42 DEFAULT_MAX_K = 10 +logger = logging.getLogger(__name__) + class DataDrivenSubdivisionError(Exception): """Raised when remote-sensing-driven subdivision can not be computed.""" +class EmptyObservationDatasetError(DataDrivenSubdivisionError): + """Raised when upstream persistence completes without usable clustering features.""" + + @dataclass class ClusteringDataset: observations: list[AnalysisGridObservation] @@ -70,6 +76,8 @@ def create_remote_sensing_subdivision_result( dataset = build_clustering_dataset( observations=observations, selected_features=selected_features, + run=run, + location=location, ) if not dataset.observations: raise DataDrivenSubdivisionError("هیچ observation قابل استفاده‌ای برای خوشه‌بندی باقی نماند.") @@ -164,6 +172,8 @@ def build_clustering_dataset( *, observations: list[AnalysisGridObservation], selected_features: list[str] | None = None, + run: RemoteSensingRun | None = None, + location: SoilLocation | None = None, ) -> ClusteringDataset: selected_features = list(selected_features or DEFAULT_CLUSTER_FEATURES) invalid_features = [ @@ -176,6 +186,22 @@ def build_clustering_dataset( "ویژگی‌های نامعتبر برای خوشه‌بندی: " + ", ".join(sorted(invalid_features)) ) + log_context = _build_clustering_log_context( + observations=observations, + selected_features=selected_features, + run=run, + location=location, + ) + logger.info( + "Preparing clustering dataset: %s", + _serialize_log_payload( + { + **log_context, + "total_observations": len(observations), + "non_null_counts": _count_non_null_features(observations), + } + ), + ) raw_rows: list[list[float | None]] = [] raw_maps: list[dict[str, float | None]] = [] usable_observations: list[AnalysisGridObservation] = [] @@ -193,6 +219,11 @@ def build_clustering_dataset( if value is None: missing_value_counts[feature_name] += 1 if all(value is None for value in feature_map.values()): + logger.debug( + "Skipping observation cell=%s: all clustering features are null | context=%s", + observation.cell.cell_code, + _serialize_log_payload(log_context), + ) skipped_cell_codes.append(observation.cell.cell_code) skipped_reasons["all_features_missing"].append(observation.cell.cell_code) continue @@ -201,21 +232,42 @@ def build_clustering_dataset( raw_maps.append(feature_map) raw_rows.append([feature_map[feature_name] for feature_name in selected_features]) + logger.info( + "Clustering dataset filtered observations: %s", + _serialize_log_payload( + { + **log_context, + "remaining_observations": len(usable_observations), + "removed_observations": len(observations) - len(usable_observations), + } + ) + ) + + zero_usable_feature_names = [ + feature_name for feature_name, missing_count in missing_value_counts.items() if missing_count == len(observations) + ] + if zero_usable_feature_names and len(zero_usable_feature_names) < len(selected_features): + for feature_name in zero_usable_feature_names: + logger.warning( + "Feature %s has zero usable values in dataset | context=%s", + feature_name, + _serialize_log_payload(log_context), + ) + if not usable_observations: - return ClusteringDataset( - observations=[], - selected_features=selected_features, - raw_feature_rows=[], - raw_feature_maps=[], - skipped_cell_codes=skipped_cell_codes, - used_cell_codes=[], - imputed_matrix=[], - scaled_matrix=[], - imputer_statistics={feature_name: None for feature_name in selected_features}, - scaler_means={feature_name: 0.0 for feature_name in selected_features}, - scaler_scales={feature_name: 1.0 for feature_name in selected_features}, - missing_value_counts=missing_value_counts, - skipped_reasons=skipped_reasons, + error_context = { + **log_context, + "total_observations": len(observations), + "removed_observations": len(observations), + "null_counts_per_feature": missing_value_counts, + "selected_features": selected_features, + } + logger.error( + "No usable observations available for clustering: %s", + _serialize_log_payload(error_context), + ) + raise EmptyObservationDatasetError( + "Upstream processing completed but no usable feature values were persisted." ) try: @@ -487,3 +539,41 @@ def _coerce_float(value: Any) -> float | None: return float(value) except (TypeError, ValueError): return None + + +def _count_non_null_features(observations: list[AnalysisGridObservation]) -> dict[str, int]: + counts = {feature_name: 0 for feature_name in DEFAULT_CLUSTER_FEATURES} + for observation in observations: + for feature_name in DEFAULT_CLUSTER_FEATURES: + if _coerce_float(getattr(observation, feature_name, None)) is not None: + counts[feature_name] += 1 + return counts + + +def _build_clustering_log_context( + *, + observations: list[AnalysisGridObservation], + selected_features: list[str], + run: RemoteSensingRun | None, + location: SoilLocation | None, +) -> dict[str, Any]: + first_observation = observations[0] if observations else None + observation_metadata = dict(getattr(first_observation, "metadata", {}) or {}) + resolved_run = run or getattr(first_observation, "run", None) + resolved_location = location or getattr(getattr(first_observation, "cell", None), "soil_location", None) + temporal_start = getattr(resolved_run, "temporal_start", None) or getattr(first_observation, "temporal_start", None) + temporal_end = getattr(resolved_run, "temporal_end", None) or getattr(first_observation, "temporal_end", None) + return { + "run_id": getattr(resolved_run, "id", None), + "job_ref": observation_metadata.get("job_refs", {}), + "region_id": getattr(resolved_location, "id", None), + "date_range": { + "temporal_start": temporal_start.isoformat() if hasattr(temporal_start, "isoformat") else temporal_start, + "temporal_end": temporal_end.isoformat() if hasattr(temporal_end, "isoformat") else temporal_end, + }, + "selected_features": selected_features, + } + + +def _serialize_log_payload(payload: dict[str, Any]) -> str: + return json.dumps(payload, ensure_ascii=True, default=str, sort_keys=True) diff --git a/location_data/openeo_service.py b/location_data/openeo_service.py index d5ad2ef..831caad 100644 --- a/location_data/openeo_service.py +++ b/location_data/openeo_service.py @@ -1,27 +1,38 @@ from __future__ import annotations +import json +import logging import math import os +import time from dataclasses import dataclass from datetime import date from decimal import Decimal +from pathlib import Path +from tempfile import TemporaryDirectory from typing import Any import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from config.proxy import apply_requests_proxy, build_proxy_url_from_proxychains_env from .models import AnalysisGridCell +logger = logging.getLogger(__name__) + DEFAULT_OPENEO_BACKEND_URL = "https://openeofed.dataspace.copernicus.eu" DEFAULT_OPENEO_PROVIDER = "openeo" DEFAULT_OPENEO_PROXY_URL = "socks5h://host.docker.internal:10808" +DEFAULT_OPENEO_TIMEOUT_SECONDS = 600.0 +DEFAULT_OPENEO_HTTP_RETRY_TOTAL = 5 +DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR = 2.0 SENTINEL2_COLLECTION = "SENTINEL2_L2A" SENTINEL3_LST_COLLECTION = "SENTINEL3_SLSTR_L2_LST" SENTINEL1_COLLECTION = "SENTINEL1_GRD" -COPERNICUS_DEM_COLLECTION = "COPERNICUS_30" VALID_SCL_CLASSES = (4, 5, 6) METRIC_NAMES = ( @@ -30,8 +41,12 @@ METRIC_NAMES = ( "lst_c", "soil_vv", "soil_vv_db", - "dem_m", - "slope_deg", +) +CLUSTER_METRIC_NAMES = ( + "ndvi", + "ndwi", + "lst_c", + "soil_vv_db", ) @@ -53,19 +68,67 @@ class TimeoutOverrideSession(requests.Session): def __init__(self, timeout_seconds: float): super().__init__() self.timeout_seconds = timeout_seconds + self.last_response_preview = "" + self.last_response_content_type = "" + self.last_response_url = "" def request(self, method, url, **kwargs): timeout = kwargs.get("timeout") if timeout is None or timeout < self.timeout_seconds: kwargs["timeout"] = self.timeout_seconds - return super().request(method, url, **kwargs) + + request_log = { + "method": str(method).upper(), + "url": url, + "timeout": kwargs.get("timeout"), + "params": kwargs.get("params"), + "json": kwargs.get("json"), + "data": kwargs.get("data"), + "headers": _sanitize_headers(kwargs.get("headers")), + "proxy_url": _sanitize_proxy_url(self.proxies.get("https") or self.proxies.get("http")), + } + logger.info("openEO request payload: %s", _serialize_for_log(request_log)) + + started_at = time.monotonic() + try: + response = super().request(method, url, **kwargs) + except Exception as exc: + logger.exception( + "openEO request failed after %.3fs: %s", + time.monotonic() - started_at, + _serialize_for_log( + { + "method": str(method).upper(), + "url": url, + "error": repr(exc), + } + ), + ) + raise + + logger.info( + "openEO response received after %.3fs: %s", + time.monotonic() - started_at, + _serialize_for_log( + { + "method": str(method).upper(), + "url": url, + "status_code": response.status_code, + "headers": _sanitize_headers(dict(response.headers)), + } + ), + ) + self.last_response_url = str(response.url) + self.last_response_content_type = str(response.headers.get("Content-Type", "")) + self.last_response_preview = response.text[:1000] if response.text else "" + return response @dataclass(frozen=True) class OpenEOConnectionSettings: backend_url: str = DEFAULT_OPENEO_BACKEND_URL auth_method: str = "client_credentials" - timeout_seconds: float = 60.0 + timeout_seconds: float = DEFAULT_OPENEO_TIMEOUT_SECONDS client_id: str = "" client_secret: str = "" provider_id: str = "" @@ -73,13 +136,18 @@ class OpenEOConnectionSettings: password: str = "" allow_interactive_oidc: bool = False proxy_url: str = "" + http_retry_total: int = DEFAULT_OPENEO_HTTP_RETRY_TOTAL + http_retry_backoff_factor: float = DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR @classmethod def from_env(cls) -> "OpenEOConnectionSettings": return cls( backend_url=os.environ.get("OPENEO_BACKEND_URL", DEFAULT_OPENEO_BACKEND_URL).strip(), auth_method=os.environ.get("OPENEO_AUTH_METHOD", "client_credentials").strip().lower(), - timeout_seconds=float(os.environ.get("OPENEO_TIMEOUT_SECONDS", "60").strip() or "60"), + timeout_seconds=float( + os.environ.get("OPENEO_TIMEOUT_SECONDS", str(int(DEFAULT_OPENEO_TIMEOUT_SECONDS))).strip() + or str(int(DEFAULT_OPENEO_TIMEOUT_SECONDS)) + ), client_id=os.environ.get("OPENEO_AUTH_CLIENT_ID", "").strip(), client_secret=os.environ.get("OPENEO_AUTH_CLIENT_SECRET", "").strip(), provider_id=os.environ.get("OPENEO_AUTH_PROVIDER_ID", "").strip(), @@ -88,6 +156,17 @@ class OpenEOConnectionSettings: allow_interactive_oidc=os.environ.get("OPENEO_ALLOW_INTERACTIVE_OIDC", "0").strip().lower() in {"1", "true", "yes", "on"}, proxy_url=_resolve_openeo_proxy_url_from_env(), + http_retry_total=int( + os.environ.get("OPENEO_HTTP_RETRY_TOTAL", str(DEFAULT_OPENEO_HTTP_RETRY_TOTAL)).strip() + or str(DEFAULT_OPENEO_HTTP_RETRY_TOTAL) + ), + http_retry_backoff_factor=float( + os.environ.get( + "OPENEO_HTTP_RETRY_BACKOFF_FACTOR", + str(DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR), + ).strip() + or str(DEFAULT_OPENEO_HTTP_RETRY_BACKOFF_FACTOR) + ), ) @@ -104,6 +183,46 @@ def _resolve_openeo_proxy_url_from_env() -> str: return configured_proxy_url +def _sanitize_headers(headers: dict[str, Any] | None) -> dict[str, Any] | None: + if not headers: + return headers + return {key: _mask_sensitive_value(key, value) for key, value in headers.items()} + + +def _sanitize_proxy_url(proxy_url: str | None) -> str | None: + if not proxy_url: + return proxy_url + return proxy_url + + +def _serialize_for_log(payload: Any) -> str: + return json.dumps(_mask_sensitive_payload(payload), ensure_ascii=True, default=str, sort_keys=True) + + +def _mask_sensitive_payload(value: Any, parent_key: str = "") -> Any: + if isinstance(value, dict): + return {str(key): _mask_sensitive_payload(item, str(key)) for key, item in value.items()} + if isinstance(value, list): + return [_mask_sensitive_payload(item, parent_key) for item in value] + if isinstance(value, tuple): + return [_mask_sensitive_payload(item, parent_key) for item in value] + return _mask_sensitive_value(parent_key, value) + + +def _mask_sensitive_value(key: str, value: Any) -> Any: + normalized_key = (key or "").lower() + if normalized_key in { + "authorization", + "access_token", + "refresh_token", + "id_token", + "client_secret", + "password", + }: + return "***redacted***" + return value + + def is_openeo_auth_configured(settings: OpenEOConnectionSettings | None = None) -> bool: settings = settings or OpenEOConnectionSettings.from_env() @@ -118,9 +237,26 @@ def is_openeo_auth_configured(settings: OpenEOConnectionSettings | None = None) def build_openeo_requests_session(settings: OpenEOConnectionSettings) -> requests.Session: session = TimeoutOverrideSession(settings.timeout_seconds) + session.headers.setdefault("Accept", "application/json") + adapter = HTTPAdapter(max_retries=_build_openeo_http_retry(settings)) + session.mount("http://", adapter) + session.mount("https://", adapter) return apply_requests_proxy(session, settings.proxy_url) +def _build_openeo_http_retry(settings: OpenEOConnectionSettings) -> Retry: + return Retry( + total=settings.http_retry_total, + connect=settings.http_retry_total, + read=settings.http_retry_total, + status=settings.http_retry_total, + backoff_factor=settings.http_retry_backoff_factor, + allowed_methods=None, + status_forcelist=(429, 500, 502, 503, 504), + raise_on_status=False, + ) + + def connect_openeo(settings: OpenEOConnectionSettings | None = None): """ Build an authenticated openEO connection using environment-driven configuration. @@ -140,11 +276,21 @@ def connect_openeo(settings: OpenEOConnectionSettings | None = None): raise OpenEOServiceError("The `openeo` Python client is required for remote sensing jobs.") from exc session = build_openeo_requests_session(settings) - connection = openeo.connect( - settings.backend_url, - session=session, - default_timeout=settings.timeout_seconds, - ) + try: + connection = openeo.connect( + settings.backend_url, + session=session, + default_timeout=settings.timeout_seconds, + ) + except requests.exceptions.JSONDecodeError as exc: + preview = (session.last_response_preview or "").strip() + content_type = session.last_response_content_type or "unknown" + response_url = session.last_response_url or settings.backend_url + raise OpenEOServiceError( + "openEO endpoint returned a non-JSON response while loading capabilities. " + f"url={response_url!r} content_type={content_type!r} preview={preview[:300]!r}. " + "This usually means the proxy returned an HTML page instead of the API response." + ) from exc def resolve_oidc_context( provider_id: str | None, @@ -295,6 +441,8 @@ def compute_remote_sensing_metrics( *, temporal_start: date | str, temporal_end: date | str, + selected_features: list[str] | None = None, + progress_callback=None, connection=None, ) -> dict[str, Any]: """ @@ -309,7 +457,6 @@ def compute_remote_sensing_metrics( "metadata": { "backend": DEFAULT_OPENEO_PROVIDER, "collections_used": [], - "slope_supported": False, "job_refs": {}, "failed_metrics": [], }, @@ -318,6 +465,14 @@ def compute_remote_sensing_metrics( connection = connection or connect_openeo() feature_collection = build_feature_collection(cells) spatial_extent = build_spatial_extent(cells) + log_openeo_request_summary( + cells=cells, + temporal_start=temporal_start, + temporal_end=temporal_end, + spatial_extent=spatial_extent, + selected_features=selected_features or list(METRIC_NAMES), + ) + expected_feature_ids = [cell.cell_code for cell in cells] results = initialize_metric_result_map(cells) metadata = { "backend": DEFAULT_OPENEO_PROVIDER, @@ -326,11 +481,10 @@ def compute_remote_sensing_metrics( SENTINEL2_COLLECTION, SENTINEL3_LST_COLLECTION, SENTINEL1_COLLECTION, - COPERNICUS_DEM_COLLECTION, ], - "slope_supported": True, "job_refs": {}, "failed_metrics": [], + "payload_diagnostics": {}, } metric_runners = [ @@ -338,29 +492,32 @@ def compute_remote_sensing_metrics( ("ndwi", compute_ndwi), ("lst_c", compute_lst_c), ("soil_vv", compute_soil_vv), - ("dem_m", compute_dem_m), - ("slope_deg", compute_slope_deg), ] for metric_name, runner in metric_runners: try: + if progress_callback is not None: + progress_callback(metric_name=metric_name, state="running", metadata=metadata) metric_payload = runner( connection=connection, feature_collection=feature_collection, spatial_extent=spatial_extent, temporal_start=temporal_start, temporal_end=temporal_end, + expected_feature_ids=expected_feature_ids, ) merge_metric_results(results, metric_payload["results"]) metadata["job_refs"][metric_name] = metric_payload.get("job_ref") - if metric_name == "slope_deg" and not metric_payload.get("supported", True): - metadata["slope_supported"] = False - except Exception as exc: - if metric_name == "slope_deg": - metadata["slope_supported"] = False - metadata["failed_metrics"].append( - {"metric": metric_name, "error": str(exc), "non_fatal": True} + metadata["payload_diagnostics"][metric_name] = metric_payload.get("payload_diagnostics", {}) + if progress_callback is not None: + progress_callback( + metric_name=metric_name, + state="completed", + metadata=metadata, + metric_payload=metric_payload, ) - continue + except Exception as exc: + if progress_callback is not None: + progress_callback(metric_name=metric_name, state="failed", metadata=metadata, error=str(exc)) raise OpenEOExecutionError(f"Failed to compute metric `{metric_name}`: {exc}") from exc for cell_code, payload in results.items(): @@ -370,7 +527,54 @@ def compute_remote_sensing_metrics( return {"results": results, "metadata": metadata} -def compute_ndvi(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: +def log_openeo_request_summary( + *, + cells: list[AnalysisGridCell], + temporal_start: date | str, + temporal_end: date | str, + spatial_extent: dict[str, float], + selected_features: list[str], +) -> None: + start_date = _parse_date_value(temporal_start) + end_date = _parse_date_value(temporal_end) + logger.info( + "openEO request summary: %s", + _serialize_for_log( + { + "cell_count": len(cells), + "date_range_days": max((end_date - start_date).days, 0) + 1, + "area_m2": round(_estimate_extent_area_m2(spatial_extent), 2), + "metrics": selected_features, + "spatial_extent": spatial_extent, + "temporal_start": start_date.isoformat(), + "temporal_end": end_date.isoformat(), + } + ), + ) + + +def _estimate_extent_area_m2(spatial_extent: dict[str, float]) -> float: + west = float(spatial_extent["west"]) + east = float(spatial_extent["east"]) + south = float(spatial_extent["south"]) + north = float(spatial_extent["north"]) + mean_lat_rad = math.radians((south + north) / 2.0) + meters_per_degree_lat = 111_320.0 + meters_per_degree_lon = 111_320.0 * math.cos(mean_lat_rad) + width_m = max(east - west, 0.0) * meters_per_degree_lon + height_m = max(north - south, 0.0) * meters_per_degree_lat + return max(width_m, 0.0) * max(height_m, 0.0) + + +def compute_ndvi( + *, + connection, + feature_collection, + spatial_extent, + temporal_start, + temporal_end, + expected_feature_ids: list[str] | None = None, +) -> dict[str, Any]: cube = connection.load_collection( SENTINEL2_COLLECTION, spatial_extent=spatial_extent, @@ -382,11 +586,32 @@ def compute_ndvi(*, connection, feature_collection, spatial_extent, temporal_sta red = cube.band("B04") * 0.0001 nir = cube.band("B08") * 0.0001 ndvi = ((nir - red) / (nir + red)).mask(invalid_mask.resample_cube_spatial(red)) - aggregated = ndvi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() - return {"results": parse_aggregate_spatial_response(aggregated, "ndvi")} + aggregated, job_ref = _run_aggregate_spatial_job( + ndvi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"), + metric_name="ndvi", + ) + payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="ndvi", job_ref=job_ref) + return { + "results": parse_aggregate_spatial_response( + aggregated, + "ndvi", + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ), + "job_ref": job_ref, + "payload_diagnostics": payload_diagnostics, + } -def compute_ndwi(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: +def compute_ndwi( + *, + connection, + feature_collection, + spatial_extent, + temporal_start, + temporal_end, + expected_feature_ids: list[str] | None = None, +) -> dict[str, Any]: cube = connection.load_collection( SENTINEL2_COLLECTION, spatial_extent=spatial_extent, @@ -398,11 +623,32 @@ def compute_ndwi(*, connection, feature_collection, spatial_extent, temporal_sta green = cube.band("B03") * 0.0001 nir = cube.band("B08") * 0.0001 ndwi = ((green - nir) / (green + nir)).mask(invalid_mask.resample_cube_spatial(green)) - aggregated = ndwi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() - return {"results": parse_aggregate_spatial_response(aggregated, "ndwi")} + aggregated, job_ref = _run_aggregate_spatial_job( + ndwi.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"), + metric_name="ndwi", + ) + payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="ndwi", job_ref=job_ref) + return { + "results": parse_aggregate_spatial_response( + aggregated, + "ndwi", + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ), + "job_ref": job_ref, + "payload_diagnostics": payload_diagnostics, + } -def compute_lst_c(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: +def compute_lst_c( + *, + connection, + feature_collection, + spatial_extent, + temporal_start, + temporal_end, + expected_feature_ids: list[str] | None = None, +) -> dict[str, Any]: cube = connection.load_collection( SENTINEL3_LST_COLLECTION, spatial_extent=spatial_extent, @@ -411,11 +657,32 @@ def compute_lst_c(*, connection, feature_collection, spatial_extent, temporal_st band_name = infer_band_name(cube, preferred=("LST", "LST_in", "LST", "band_0")) lst_k = cube.band(band_name) if band_name else cube lst_c = lst_k - 273.15 - aggregated = lst_c.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() - return {"results": parse_aggregate_spatial_response(aggregated, "lst_c")} + aggregated, job_ref = _run_aggregate_spatial_job( + lst_c.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"), + metric_name="lst_c", + ) + payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="lst_c", job_ref=job_ref) + return { + "results": parse_aggregate_spatial_response( + aggregated, + "lst_c", + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ), + "job_ref": job_ref, + "payload_diagnostics": payload_diagnostics, + } -def compute_soil_vv(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: +def compute_soil_vv( + *, + connection, + feature_collection, + spatial_extent, + temporal_start, + temporal_end, + expected_feature_ids: list[str] | None = None, +) -> dict[str, Any]: cube = connection.load_collection( SENTINEL1_COLLECTION, spatial_extent=spatial_extent, @@ -423,46 +690,216 @@ def compute_soil_vv(*, connection, feature_collection, spatial_extent, temporal_ bands=["VV"], ) vv = cube.band("VV") - aggregated = vv.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean").execute() - return {"results": parse_aggregate_spatial_response(aggregated, "soil_vv")} - - -def compute_dem_m(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: - cube = connection.load_collection( - COPERNICUS_DEM_COLLECTION, - spatial_extent=spatial_extent, - temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], + aggregated, job_ref = _run_aggregate_spatial_job( + vv.mean_time().aggregate_spatial(geometries=feature_collection, reducer="mean"), + metric_name="soil_vv", ) - band_name = infer_band_name(cube, preferred=("DEM", "elevation", "band_0")) - dem = cube.band(band_name) if band_name else cube - aggregated = dem.aggregate_spatial(geometries=feature_collection, reducer="mean").execute() - return {"results": parse_aggregate_spatial_response(aggregated, "dem_m")} + payload_diagnostics = _log_raw_payload_summary(aggregated, metric_name="soil_vv", job_ref=job_ref) + return { + "results": parse_aggregate_spatial_response( + aggregated, + "soil_vv", + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ), + "job_ref": job_ref, + "payload_diagnostics": payload_diagnostics, + } -def compute_slope_deg(*, connection, feature_collection, spatial_extent, temporal_start, temporal_end) -> dict[str, Any]: - cube = connection.load_collection( - COPERNICUS_DEM_COLLECTION, - spatial_extent=spatial_extent, - temporal_extent=[_normalize_date(temporal_start), _normalize_date(temporal_end)], +def _run_aggregate_spatial_job(process: Any, *, metric_name: str) -> tuple[Any, str | None]: + title = f"crop-logic-{metric_name}" + description = f"Remote sensing aggregate_spatial execution for metric `{metric_name}`." + logger.info( + "openEO process graph prepared: %s", + _serialize_for_log( + { + "metric_name": metric_name, + "title": title, + "description": description, + "process_graph": process.flat_graph() if hasattr(process, "flat_graph") else None, + } + ), ) - band_name = infer_band_name(cube, preferred=("DEM", "elevation", "band_0")) - dem = cube.band(band_name) if band_name else cube + + if hasattr(process, "create_job"): + job = process.create_job( + title=title, + description=description, + out_format="JSON", + ) + logger.info( + "openEO batch job created: %s", + _serialize_for_log({"metric_name": metric_name, "job_ref": _extract_job_ref(job)}), + ) + started_job = job.start_and_wait() + if started_job is not None: + job = started_job + logger.info( + "openEO batch job finished: %s", + _serialize_for_log({"metric_name": metric_name, "job_ref": _extract_job_ref(job)}), + ) + return _load_job_result_payload(job), _extract_job_ref(job) + + logger.info("openEO process uses synchronous execution fallback for metric `%s`.", metric_name) + return process.execute(), None + + +def _load_job_result_payload(job: Any) -> Any: + results = job.get_results() + + if hasattr(results, "download_files"): + with TemporaryDirectory(prefix="openeo-job-") as temp_dir: + results.download_files(temp_dir) + downloaded_files = sorted(str(path.relative_to(temp_dir)) for path in Path(temp_dir).rglob("*") if path.is_file()) + logger.info( + "openEO batch job files downloaded: %s", + _serialize_for_log({"job_ref": _extract_job_ref(job), "files": downloaded_files}), + ) + payload = _load_first_json_payload(Path(temp_dir), job_ref=_extract_job_ref(job)) + if payload is not None: + return payload + + if hasattr(results, "get_metadata"): + metadata = results.get_metadata() + if isinstance(metadata, dict) and metadata.get("data") is not None: + return metadata["data"] + + raise OpenEOExecutionError( + f"openEO batch job `{_extract_job_ref(job) or 'unknown'}` completed but no JSON result payload could be loaded." + ) + + +def _load_first_json_payload(directory: Path, *, job_ref: str | None = None) -> Any | None: + asset_payload = _load_stac_asset_payload(directory, job_ref=job_ref) + if asset_payload is not None: + return asset_payload + + for candidate in sorted(directory.rglob("*.json")): + payload = _read_json_file(candidate, job_ref=job_ref) + if payload is None: + continue + if _looks_like_stac_metadata_payload(payload): + continue + return payload + return None + + +def _load_stac_asset_payload(directory: Path, *, job_ref: str | None = None) -> Any | None: + for candidate in sorted(directory.rglob("*.json")): + payload = _read_json_file(candidate, job_ref=job_ref) + if not _looks_like_stac_metadata_payload(payload): + continue + for asset_name, asset_path in _iter_stac_asset_paths(payload, directory): + if asset_path.suffix.lower() != ".json": + continue + if not asset_path.exists(): + logger.warning( + "openEO STAC asset file is missing: %s", + _serialize_for_log( + { + "job_ref": job_ref, + "stac_path": str(candidate), + "asset_name": asset_name, + "asset_path": str(asset_path), + } + ), + ) + continue + logger.info( + "openEO batch job selecting STAC asset payload: %s", + _serialize_for_log( + { + "job_ref": job_ref, + "stac_path": str(candidate), + "asset_name": asset_name, + "asset_path": str(asset_path), + } + ), + ) + return _read_json_file(asset_path, job_ref=job_ref) + return None + + +def _iter_stac_asset_paths(payload: Any, directory: Path) -> list[tuple[str, Path]]: + if not isinstance(payload, dict): + return [] + assets = payload.get("assets") + if not isinstance(assets, dict): + return [] + resolved_paths: list[tuple[str, Path]] = [] + for asset_name, asset_details in assets.items(): + if not isinstance(asset_details, dict): + continue + href = asset_details.get("href") + if not href: + continue + raw_path = Path(str(href)) + if raw_path.is_absolute(): + resolved = directory / raw_path.name + else: + resolved = directory / raw_path + resolved_paths.append((str(asset_name), resolved)) + return resolved_paths + + +def _looks_like_stac_metadata_path(path: Path) -> bool: + name = path.name.lower() + return name in {"item.json", "collection.json"} or name.endswith(".stac-item.json") + + +def _looks_like_stac_metadata_payload(payload: Any) -> bool: + return isinstance(payload, dict) and "assets" in payload and any( + key in payload for key in ("stac_version", "stac_extensions", "extent", "summaries") + ) + + +def _read_json_file(path: Path, *, job_ref: str | None = None) -> Any: + raw_text = path.read_text(encoding="utf-8", errors="replace") + if not raw_text.strip(): + logger.warning( + "openEO batch job JSON file is empty: %s", + _serialize_for_log({"job_ref": job_ref, "path": str(path), "preview": raw_text[:500]}), + ) + return None try: - slope_rad = dem.slope() - slope_deg = slope_rad * (180.0 / math.pi) - aggregated = slope_deg.aggregate_spatial(geometries=feature_collection, reducer="mean").execute() - return { - "results": parse_aggregate_spatial_response(aggregated, "slope_deg"), - "supported": True, - } - except Exception: - return { - "results": {feature["id"]: {"slope_deg": None} for feature in feature_collection.get("features", [])}, - "supported": False, - } + return json.loads(raw_text) + except json.JSONDecodeError as exc: + logger.exception( + "openEO batch job JSON parsing failed: %s", + _serialize_for_log( + { + "job_ref": job_ref, + "path": str(path), + "error": str(exc), + "preview": raw_text[:1000], + } + ), + ) + raise OpenEOExecutionError( + f"Failed to parse openEO batch result file `{path.name}` for job `{job_ref or 'unknown'}`: {exc}" + ) from exc -def parse_aggregate_spatial_response(payload: Any, metric_name: str) -> dict[str, dict[str, Any]]: +def _extract_job_ref(job: Any) -> str | None: + for attribute_name in ("job_id", "id"): + value = getattr(job, attribute_name, None) + if value: + return str(value) + if hasattr(job, "describe_job"): + metadata = job.describe_job() + if isinstance(metadata, dict) and metadata.get("id"): + return str(metadata["id"]) + return None + + +def parse_aggregate_spatial_response( + payload: Any, + metric_name: str, + *, + job_ref: str | None = None, + expected_feature_ids: list[str] | None = None, +) -> dict[str, dict[str, Any]]: """ Parse different JSON shapes returned by openEO aggregate_spatial executions. """ @@ -476,10 +913,20 @@ def parse_aggregate_spatial_response(payload: Any, metric_name: str) -> dict[str return _parse_feature_collection_results(payload, metric_name) if isinstance(payload, dict): - return _parse_mapping_results(payload, metric_name) + return _parse_mapping_results( + payload, + metric_name, + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ) if isinstance(payload, list): - return _parse_list_results(payload, metric_name) + return _parse_list_results( + payload, + metric_name, + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ) raise OpenEOExecutionError(f"Unsupported openEO aggregate_spatial response type: {type(payload)!r}") @@ -495,36 +942,174 @@ def _parse_feature_collection_results(payload: dict[str, Any], metric_name: str) if not feature_id: continue properties = feature.get("properties") or {} + _log_feature_mismatch(feature_id, properties, metric_name) value = _extract_aggregate_value(properties) results[feature_id] = {metric_name: _coerce_float(value)} return results -def _parse_mapping_results(payload: dict[str, Any], metric_name: str) -> dict[str, dict[str, Any]]: +def _parse_mapping_results( + payload: dict[str, Any], + metric_name: str, + *, + job_ref: str | None = None, + expected_feature_ids: list[str] | None = None, +) -> dict[str, dict[str, Any]]: if "data" in payload and isinstance(payload["data"], (dict, list)): - return parse_aggregate_spatial_response(payload["data"], metric_name) + return parse_aggregate_spatial_response( + payload["data"], + metric_name, + job_ref=job_ref, + expected_feature_ids=expected_feature_ids, + ) results: dict[str, dict[str, Any]] = {} for feature_id, value in payload.items(): if feature_id in {"type", "links", "meta"}: continue - results[str(feature_id)] = {metric_name: _coerce_float(_extract_aggregate_value(value))} + normalized_feature_id = _normalize_feature_id( + feature_id, + expected_feature_ids=expected_feature_ids, + ) + if isinstance(value, dict): + _log_feature_mismatch(str(normalized_feature_id), value, metric_name) + results[str(normalized_feature_id)] = {metric_name: _coerce_float(_extract_aggregate_value(value))} return results -def _parse_list_results(payload: list[Any], metric_name: str) -> dict[str, dict[str, Any]]: +def _parse_list_results( + payload: list[Any], + metric_name: str, + *, + job_ref: str | None = None, + expected_feature_ids: list[str] | None = None, +) -> dict[str, dict[str, Any]]: results: dict[str, dict[str, Any]] = {} for index, item in enumerate(payload): if isinstance(item, dict): - feature_id = str(item.get("id") or item.get("cell_code") or item.get("feature_id") or index) + feature_id = str( + item.get("id") + or item.get("cell_code") + or item.get("feature_id") + or _normalize_feature_id(index, expected_feature_ids=expected_feature_ids) + ) + _log_feature_mismatch(feature_id, item, metric_name) value = _extract_aggregate_value(item) else: - feature_id = str(index) + feature_id = str(_normalize_feature_id(index, expected_feature_ids=expected_feature_ids)) value = item results[feature_id] = {metric_name: _coerce_float(value)} return results +def _normalize_feature_id( + raw_feature_id: Any, + *, + expected_feature_ids: list[str] | None = None, +) -> str: + feature_id = str(raw_feature_id) + if not expected_feature_ids: + return feature_id + try: + index = int(feature_id) + except (TypeError, ValueError): + return feature_id + if index < 0 or index >= len(expected_feature_ids): + return feature_id + return str(expected_feature_ids[index]) + + +def _log_raw_payload_summary(payload: Any, *, metric_name: str, job_ref: str | None = None) -> dict[str, Any]: + payload_cells = _extract_payload_cells(payload) + payload_keys_sample = [cell_code for cell_code, _raw in payload_cells[:5]] + available_features = sorted(_collect_payload_feature_names(payload)) + returned_cell_count = len(payload_cells) + is_empty = returned_cell_count == 0 + + if is_empty: + logger.warning("openEO payload is empty for job_ref=%s", job_ref) + + logger.info( + "openEO payload summary: %s", + _serialize_for_log( + { + "metric_name": metric_name, + "job_ref": job_ref, + "returned_cell_count": returned_cell_count, + "payload_keys_sample": payload_keys_sample, + "available_features": available_features, + } + ), + ) + return { + "returned_cell_count": returned_cell_count, + "payload_keys_sample": payload_keys_sample, + "available_features": available_features, + } + + +def _extract_payload_cells(payload: Any) -> list[tuple[str, Any]]: + if payload is None: + return [] + if isinstance(payload, dict) and payload.get("type") == "FeatureCollection": + cells = [] + for feature in payload.get("features", []): + feature_id = str( + feature.get("id") + or (feature.get("properties") or {}).get("cell_code") + or (feature.get("properties") or {}).get("id") + or "" + ) + if feature_id: + cells.append((feature_id, feature.get("properties") or {})) + return cells + if isinstance(payload, dict) and "features" in payload and isinstance(payload["features"], list): + return _extract_payload_cells({"type": "FeatureCollection", "features": payload["features"]}) + if isinstance(payload, dict) and "data" in payload and isinstance(payload["data"], (dict, list)): + return _extract_payload_cells(payload["data"]) + if isinstance(payload, dict): + return [ + (str(feature_id), value) + for feature_id, value in payload.items() + if feature_id not in {"type", "links", "meta", "data"} + ] + if isinstance(payload, list): + cells = [] + for index, item in enumerate(payload): + if isinstance(item, dict): + feature_id = str(item.get("id") or item.get("cell_code") or item.get("feature_id") or index) + else: + feature_id = str(index) + cells.append((feature_id, item)) + return cells + return [] + + +def _collect_payload_feature_names(payload: Any) -> set[str]: + names: set[str] = set() + for _cell_code, raw_value in _extract_payload_cells(payload): + if isinstance(raw_value, dict): + names.update(str(key) for key in raw_value.keys()) + return names + + +def _log_feature_mismatch(cell_code: str, raw_value: dict[str, Any], metric_name: str) -> None: + available_keys = sorted(str(key) for key in raw_value.keys()) + if not available_keys: + return + recognized_keys = set(CLUSTER_METRIC_NAMES) | { + metric_name, + "mean", + "value", + "result", + "average", + "id", + "cell_code", + } + if not any(key in recognized_keys for key in available_keys): + logger.warning("Feature mismatch for cell=%s, available_keys=%s", cell_code, available_keys) + + def _extract_aggregate_value(value: Any) -> Any: if isinstance(value, dict): for key in ("mean", "value", "result", "average"): @@ -589,3 +1174,9 @@ def _normalize_date(value: date | str) -> str: if isinstance(value, date): return value.isoformat() return str(value) + + +def _parse_date_value(value: date | str) -> date: + if isinstance(value, date): + return value + return date.fromisoformat(str(value)) diff --git a/location_data/serializers.py b/location_data/serializers.py index f05a0c9..7e56ce9 100644 --- a/location_data/serializers.py +++ b/location_data/serializers.py @@ -165,8 +165,6 @@ class RemoteSensingCellObservationSerializer(serializers.ModelSerializer): "lst_c", "soil_vv", "soil_vv_db", - "dem_m", - "slope_deg", "metadata", ] @@ -177,8 +175,6 @@ class RemoteSensingSummarySerializer(serializers.Serializer): ndwi_mean = serializers.FloatField(allow_null=True) lst_c_mean = serializers.FloatField(allow_null=True) soil_vv_db_mean = serializers.FloatField(allow_null=True) - dem_m_mean = serializers.FloatField(allow_null=True) - slope_deg_mean = serializers.FloatField(allow_null=True) class RemoteSensingRunSerializer(serializers.ModelSerializer): @@ -189,10 +185,12 @@ class RemoteSensingRunSerializer(serializers.ModelSerializer): requested_cluster_count = serializers.SerializerMethodField() def get_status_label(self, obj): - return obj.normalized_status + metadata_status = (obj.metadata or {}).get("status_label") + return metadata_status or obj.normalized_status def get_pipeline_status(self, obj): - return obj.normalized_status + metadata_status = (obj.metadata or {}).get("status_label") + return metadata_status or obj.normalized_status def get_stage(self, obj): return (obj.metadata or {}).get("stage") @@ -290,6 +288,7 @@ class RemoteSensingRunStatusResponseSerializer(serializers.Serializer): source = serializers.CharField() run = RemoteSensingRunSerializer() task_id = serializers.UUIDField(allow_null=True, required=False) + task = serializers.JSONField(required=False) location = SoilLocationResponseSerializer(required=False) block_code = serializers.CharField(allow_blank=True, required=False) chunk_size_sqm = serializers.IntegerField(allow_null=True, required=False) diff --git a/location_data/tasks.py b/location_data/tasks.py index 355a494..61f4daf 100644 --- a/location_data/tasks.py +++ b/location_data/tasks.py @@ -42,6 +42,17 @@ else: logger = logging.getLogger(__name__) +REMOTE_SENSING_TASK_MAX_RETRIES = 5 +REMOTE_SENSING_TASK_RETRY_DELAY_SECONDS = 60 +REMOTE_SENSING_TASK_RETRY_BACKOFF_MAX_SECONDS = 600 +PERSISTED_OBSERVATION_FEATURES = ( + "ndvi", + "ndwi", + "lst_c", + "soil_vv", + "soil_vv_db", +) + def run_remote_sensing_analysis( *, @@ -122,58 +133,83 @@ def run_remote_sensing_analysis( ) if not cells_to_process: - _record_run_stage( - run, - "using_cached_observations", - {"source": "database"}, - ) observations = _load_observations( location=location, block_code=resolved_block_code, temporal_start=start_date, temporal_end=end_date, ) - subdivision_result = _ensure_subdivision_result( - location=location, - run=run, - subdivision=subdivision, - block_code=resolved_block_code, + if not _has_usable_observations( observations=observations, - cluster_count=cluster_count, - selected_features=selected_features, - ) - _record_run_stage( - run, - "clustering_completed", - _build_clustering_stage_metadata(subdivision_result), - ) - summary = { - "status": "completed", - "source": "database", - "run_id": run.id, - "processed_cell_count": 0, - "created_observation_count": 0, - "updated_observation_count": 0, - "existing_observation_count": len(all_cells), - "failed_metric_count": 0, - "chunk_size_sqm": grid_summary["chunk_size_sqm"], - "block_code": resolved_block_code, - "cell_count": len(all_cells), - "subdivision_result_id": getattr(subdivision_result, "id", None), - "cluster_count": getattr(subdivision_result, "cluster_count", 0), - } - _mark_run_success(run, summary) - return summary + selected_features=selected_features or list(DEFAULT_CLUSTER_FEATURES), + ): + logger.warning( + "Cached observations are fully null, refetching remote metrics for run_id=%s", + run.id, + ) + _record_run_stage( + run, + "using_cached_observations", + {"source": "database", "usable": False, "refetching": True}, + ) + cells_to_process = all_cells + else: + _record_run_stage( + run, + "using_cached_observations", + {"source": "database", "usable": True, "refetching": False}, + ) + subdivision_result = _ensure_subdivision_result( + location=location, + run=run, + subdivision=subdivision, + block_code=resolved_block_code, + observations=observations, + cluster_count=cluster_count, + selected_features=selected_features, + ) + _record_run_stage( + run, + "clustering_completed", + _build_clustering_stage_metadata(subdivision_result), + ) + summary = { + "status": "completed", + "source": "database", + "run_id": run.id, + "processed_cell_count": 0, + "created_observation_count": 0, + "updated_observation_count": 0, + "existing_observation_count": len(all_cells), + "failed_metric_count": 0, + "chunk_size_sqm": grid_summary["chunk_size_sqm"], + "block_code": resolved_block_code, + "cell_count": len(all_cells), + "subdivision_result_id": getattr(subdivision_result, "id", None), + "cluster_count": getattr(subdivision_result, "cluster_count", 0), + } + _mark_run_success(run, summary) + return summary _record_run_stage( run, "fetching_remote_metrics", - {"requested_cell_count": len(cells_to_process)}, + _build_remote_metric_stage_details( + cells=cells_to_process, + selected_features=selected_features, + ), + ) + progress_callback = _build_remote_metric_progress_callback( + run=run, + cells=cells_to_process, + selected_features=selected_features, ) remote_payload = compute_remote_sensing_metrics( cells_to_process, temporal_start=start_date, temporal_end=end_date, + selected_features=selected_features or list(DEFAULT_CLUSTER_FEATURES), + progress_callback=progress_callback, ) _record_run_stage( run, @@ -242,7 +278,11 @@ def run_remote_sensing_analysis( raise -@app.task(bind=True, max_retries=3, default_retry_delay=60) +@app.task( + bind=True, + max_retries=REMOTE_SENSING_TASK_MAX_RETRIES, + default_retry_delay=REMOTE_SENSING_TASK_RETRY_DELAY_SECONDS, +) def run_remote_sensing_analysis_task( self, soil_location_id: int, @@ -287,17 +327,30 @@ def run_remote_sensing_analysis_task( ) raise except (OpenEOExecutionError, OpenEOServiceError, RequestException, DataDrivenSubdivisionError) as exc: + retry_count = self.request.retries + 1 + countdown = min( + REMOTE_SENSING_TASK_RETRY_DELAY_SECONDS * (2 ** self.request.retries), + REMOTE_SENSING_TASK_RETRY_BACKOFF_MAX_SECONDS, + ) + _mark_run_retrying( + run_id=run_id, + task_id=self.request.id, + error_message=str(exc), + retry_count=retry_count, + retry_delay_seconds=countdown, + ) logger.warning( "Transient remote sensing failure, retrying task", extra={ "task_id": self.request.id, "soil_location_id": soil_location_id, "block_code": block_code, - "retry_count": self.request.retries, + "retry_count": retry_count, + "retry_delay_seconds": countdown, "error": str(exc), }, ) - raise self.retry(exc=exc) + raise self.retry(exc=exc, countdown=countdown) def _normalize_temporal_date(value: Any, field_name: str): @@ -442,8 +495,20 @@ def _mark_run_success( def _mark_run_failure(run: RemoteSensingRun, error_message: str) -> None: metadata = dict(run.metadata or {}) + failed_stage = str(metadata.get("stage") or "").strip() or None + stage_details = dict(metadata.get("stage_details") or {}) metadata["status_label"] = "failed" + metadata["stage"] = "failed" + metadata["failed_stage"] = failed_stage metadata["failure_reason"] = error_message[:4000] + metadata["stage_details"] = { + **stage_details, + "failed": { + "failed_stage": failed_stage, + "error_message": error_message[:4000], + "failed_stage_details": stage_details.get(failed_stage, {}) if failed_stage else {}, + }, + } metadata["timestamps"] = { **dict(metadata.get("timestamps") or {}), "failed_at": timezone.now().isoformat(), @@ -467,6 +532,51 @@ def _mark_run_failure(run: RemoteSensingRun, error_message: str) -> None: ) +def _mark_run_retrying( + *, + run_id: int | None, + task_id: str, + error_message: str, + retry_count: int, + retry_delay_seconds: int, +) -> None: + run = None + if run_id is not None: + run = RemoteSensingRun.objects.filter(pk=run_id).first() + if run is None and task_id: + run = RemoteSensingRun.objects.filter(metadata__task_id=str(task_id)).first() + if run is None: + return + + metadata = dict(run.metadata or {}) + stage_details = dict(metadata.get("stage_details") or {}) + failed_stage = ( + str(metadata.get("failed_stage") or metadata.get("stage") or "").strip() or None + ) + metadata["status_label"] = "retrying" + metadata["stage"] = "retrying" + metadata["failed_stage"] = failed_stage + metadata.pop("failure_reason", None) + metadata["stage_details"] = { + **stage_details, + "retrying": { + "retry_count": retry_count, + "retry_delay_seconds": retry_delay_seconds, + "last_error": error_message[:4000], + "failed_stage": failed_stage, + "failed_stage_details": stage_details.get(failed_stage, {}) if failed_stage else {}, + }, + } + metadata["timestamps"] = { + **dict(metadata.get("timestamps") or {}), + "retrying_at": timezone.now().isoformat(), + } + run.status = RemoteSensingRun.STATUS_RUNNING + run.error_message = "" + run.metadata = metadata + run.save(update_fields=["status", "error_message", "metadata", "updated_at"]) + + def _load_grid_cells(location: SoilLocation, block_code: str) -> list[AnalysisGridCell]: queryset = AnalysisGridCell.objects.filter(soil_location=location) queryset = queryset.filter(block_code=block_code or "") @@ -513,6 +623,17 @@ def _select_cells_for_processing( return [cell for cell in all_cells if cell.id not in existing_ids] +def _has_usable_observations( + *, + observations: list[AnalysisGridObservation], + selected_features: list[str], +) -> bool: + for observation in observations: + if any(getattr(observation, feature_name, None) is not None for feature_name in selected_features): + return True + return False + + def _upsert_grid_observations( *, cells: list[AnalysisGridCell], @@ -521,19 +642,47 @@ def _upsert_grid_observations( temporal_end, metric_payload: dict[str, Any], ) -> dict[str, int]: + result_by_cell = metric_payload.get("results", {}) + payload_diagnostics = metric_payload["metadata"].get("payload_diagnostics", {}) + payload_cell_codes = sorted(str(cell_code) for cell_code in result_by_cell.keys()) + db_cell_codes = [cell.cell_code for cell in cells] + matched_cell_codes = sorted(set(db_cell_codes) & set(payload_cell_codes)) + unmatched_db_cell_codes = sorted(set(db_cell_codes) - set(payload_cell_codes)) + unmatched_payload_cell_codes = sorted(set(payload_cell_codes) - set(db_cell_codes)) + available_features = _collect_available_features( + result_by_cell=result_by_cell, + payload_diagnostics=payload_diagnostics, + ) + payload_keys_sample = payload_cell_codes[:5] + metadata_template = { "backend_name": metric_payload["metadata"].get("backend"), "backend_url": metric_payload["metadata"].get("backend_url"), "collections_used": metric_payload["metadata"].get("collections_used", []), - "slope_supported": metric_payload["metadata"].get("slope_supported", False), "job_refs": metric_payload["metadata"].get("job_refs", {}), "failed_metrics": metric_payload["metadata"].get("failed_metrics", []), + "payload_diagnostics": payload_diagnostics, "run_id": run.id, } - result_by_cell = metric_payload.get("results", {}) + + logger.info( + "Remote sensing payload/DB cell comparison: %s", + { + "run_id": run.id, + "db_cell_count": len(db_cell_codes), + "payload_cell_count": len(payload_cell_codes), + "matched_cell_count": len(matched_cell_codes), + "unmatched_db_cell_codes": unmatched_db_cell_codes, + "unmatched_payload_cell_codes": unmatched_payload_cell_codes, + }, + ) + if not matched_cell_codes: + logger.error("No payload cells matched DB cell_codes for run_id=%s", run.id) created_count = 0 updated_count = 0 + usable_observation_count = 0 + fully_null_observation_count = 0 with transaction.atomic(): for cell in cells: values = result_by_cell.get(cell.cell_code, {}) @@ -544,10 +693,19 @@ def _upsert_grid_observations( "lst_c": values.get("lst_c"), "soil_vv": values.get("soil_vv"), "soil_vv_db": values.get("soil_vv_db"), - "dem_m": values.get("dem_m"), - "slope_deg": values.get("slope_deg"), "metadata": metadata_template, } + persisted_values = [defaults[feature_name] for feature_name in PERSISTED_OBSERVATION_FEATURES] + usable_values = [defaults[feature_name] for feature_name in DEFAULT_CLUSTER_FEATURES] + if all(value is None for value in persisted_values): + fully_null_observation_count += 1 + logger.warning( + "Persisting empty observation for cell=%s, run_id=%s", + cell.cell_code, + run.id, + ) + if any(value is not None for value in usable_values): + usable_observation_count += 1 observation, created = AnalysisGridObservation.objects.update_or_create( cell=cell, temporal_start=temporal_start, @@ -558,7 +716,179 @@ def _upsert_grid_observations( created_count += 1 else: updated_count += 1 - return {"created_count": created_count, "updated_count": updated_count} + + summary = { + "created_count": created_count, + "updated_count": updated_count, + "total_observation_count": len(cells), + "usable_observation_count": usable_observation_count, + "fully_null_observation_count": fully_null_observation_count, + "matched_cell_count": len(matched_cell_codes), + "matched_cell_codes": matched_cell_codes, + "unmatched_db_cell_codes": unmatched_db_cell_codes, + "unmatched_payload_cell_codes": unmatched_payload_cell_codes, + "payload_keys_sample": payload_keys_sample, + "available_features": available_features, + } + logger.info("Grid observation upsert summary: %s", summary) + if usable_observation_count == 0: + diagnostics = { + "job_ref": metadata_template["job_refs"], + "total_cells": len(cells), + "matched_cells": len(matched_cell_codes), + "payload_keys_sample": payload_keys_sample, + "available_features": available_features, + } + logger.error("All persisted observations are empty for run_id=%s", run.id) + _store_empty_observation_diagnostics(run=run, diagnostics=diagnostics) + summary["empty_observation_diagnostics"] = diagnostics + + return summary + + +def _collect_available_features( + *, + result_by_cell: dict[str, dict[str, Any]], + payload_diagnostics: dict[str, Any], +) -> list[str]: + available = { + feature_name + for values in result_by_cell.values() + for feature_name, value in (values or {}).items() + if value is not None + } + for metric_diagnostics in payload_diagnostics.values(): + available.update(metric_diagnostics.get("available_features", [])) + return sorted(str(feature_name) for feature_name in available) + + +def _store_empty_observation_diagnostics(*, run: RemoteSensingRun, diagnostics: dict[str, Any]) -> None: + metadata = dict(run.metadata or {}) + metadata["diagnostics"] = { + **dict(metadata.get("diagnostics") or {}), + "empty_observations": diagnostics, + } + run.metadata = metadata + run.save(update_fields=["metadata", "updated_at"]) + + +def _build_remote_metric_stage_details( + *, + cells: list[AnalysisGridCell], + selected_features: list[str] | None, + active_metric: str | None = None, + completed_metrics: list[str] | None = None, + failed_metrics: list[dict[str, Any]] | None = None, + metric_states: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + features = list(selected_features or DEFAULT_CLUSTER_FEATURES) + completed = list(completed_metrics or []) + failed = list(failed_metrics or []) + states = metric_states or [ + { + "metric": metric_name, + "status": ( + "completed" + if metric_name in completed + else "failed" + if any(item.get("metric") == metric_name for item in failed) + else "running" + if metric_name == active_metric + else "pending" + ), + } + for metric_name in features + ] + return { + "requested_cell_count": len(cells), + "target_cells": [ + { + "cell_code": cell.cell_code, + "block_code": cell.block_code, + "centroid_lat": str(cell.centroid_lat), + "centroid_lon": str(cell.centroid_lon), + "chunk_size_sqm": cell.chunk_size_sqm, + } + for cell in cells + ], + "metric_progress": { + "total_metrics": len(features), + "completed_metric_count": len(completed), + "active_metric": active_metric, + "completed_metrics": completed, + "failed_metrics": failed, + "states": states, + }, + } + + +def _normalize_progress_metric_name(metric_name: str, features: list[str]) -> str: + derived_metric_map = { + "soil_vv": "soil_vv_db", + } + normalized = derived_metric_map.get(metric_name, metric_name) + if normalized in features: + return normalized + return metric_name + + +def _resolve_progress_job_ref(candidate: str, job_refs: dict[str, Any]) -> Any: + if candidate in job_refs: + return job_refs.get(candidate) + source_metric_map = { + "soil_vv_db": "soil_vv", + } + return job_refs.get(source_metric_map.get(candidate, candidate)) + + +def _build_remote_metric_progress_callback( + *, + run: RemoteSensingRun, + cells: list[AnalysisGridCell], + selected_features: list[str] | None, +): + features = list(selected_features or DEFAULT_CLUSTER_FEATURES) + completed_metrics: list[str] = [] + failed_metrics: list[dict[str, Any]] = [] + + def callback(*, metric_name: str, state: str, metadata: dict[str, Any], metric_payload=None, error: str = "") -> None: + progress_metric_name = _normalize_progress_metric_name(metric_name, features) + if state == "completed" and progress_metric_name not in completed_metrics: + completed_metrics.append(progress_metric_name) + if state == "failed": + failed_entry = {"metric": progress_metric_name, "error": error} + if not any( + item.get("metric") == progress_metric_name and item.get("error") == error + for item in failed_metrics + ): + failed_metrics.append(failed_entry) + + stage_details = _build_remote_metric_stage_details( + cells=cells, + selected_features=features, + active_metric=progress_metric_name if state == "running" else None, + completed_metrics=completed_metrics, + failed_metrics=failed_metrics, + metric_states=[ + { + "metric": candidate, + "status": ( + "completed" + if candidate in completed_metrics + else "failed" + if any(item.get("metric") == candidate for item in failed_metrics) + else "running" + if candidate == progress_metric_name and state == "running" + else "pending" + ), + "job_ref": _resolve_progress_job_ref(candidate, metadata.get("job_refs", {})), + } + for candidate in features + ], + ) + _record_run_stage(run, "fetching_remote_metrics", stage_details) + + return callback def _ensure_subdivision_result( diff --git a/location_data/test_data_driven_subdivision.py b/location_data/test_data_driven_subdivision.py index 37f75ad..ab1bb89 100644 --- a/location_data/test_data_driven_subdivision.py +++ b/location_data/test_data_driven_subdivision.py @@ -2,7 +2,11 @@ from datetime import date from django.test import TestCase -from location_data.data_driven_subdivision import sync_block_subdivision_with_result +from location_data.data_driven_subdivision import ( + EmptyObservationDatasetError, + build_clustering_dataset, + sync_block_subdivision_with_result, +) from location_data.models import ( AnalysisGridCell, AnalysisGridObservation, @@ -133,3 +137,39 @@ class DataDrivenSubdivisionSyncTests(TestCase): self.subdivision.metadata["data_driven_subdivision"]["cluster_count"], 2, ) + + def test_build_clustering_dataset_raises_clear_error_when_all_selected_features_are_null(self): + cell = AnalysisGridCell.objects.create( + soil_location=self.location, + block_subdivision=self.subdivision, + block_code="block-1", + cell_code="cell-null", + chunk_size_sqm=900, + geometry=self.boundary, + centroid_lat="35.689200", + centroid_lon="51.389200", + ) + observation = AnalysisGridObservation.objects.create( + cell=cell, + run=self.run, + temporal_start=date(2025, 1, 1), + temporal_end=date(2025, 1, 31), + metadata={"job_refs": {"ndvi": "job-1"}}, + ) + + with self.assertLogs("location_data.data_driven_subdivision", level="ERROR") as captured: + with self.assertRaisesRegex( + EmptyObservationDatasetError, + "Upstream processing completed but no usable feature values were persisted.", + ): + build_clustering_dataset( + observations=[observation], + selected_features=["ndvi", "ndwi", "lst_c", "soil_vv_db"], + run=self.run, + location=self.location, + ) + + joined = "\n".join(captured.output) + self.assertIn("No usable observations available for clustering", joined) + self.assertIn('"run_id": {}'.format(self.run.id), joined) + self.assertIn('"region_id": {}'.format(self.location.id), joined) diff --git a/location_data/test_openeo_service.py b/location_data/test_openeo_service.py index 4fd67a8..34d59f7 100644 --- a/location_data/test_openeo_service.py +++ b/location_data/test_openeo_service.py @@ -1,15 +1,25 @@ from decimal import Decimal from io import StringIO import os +from pathlib import Path +from tempfile import TemporaryDirectory from unittest.mock import Mock, patch from django.core.management import call_command from django.test import SimpleTestCase +import requests from config.proxy import resolve_requests_proxy_url from location_data.openeo_service import ( OpenEOConnectionSettings, + OpenEOServiceError, + OpenEOExecutionError, + _log_raw_payload_summary, + _load_first_json_payload, _resolve_openeo_proxy_url_from_env, + _run_aggregate_spatial_job, + log_openeo_request_summary, + build_openeo_requests_session, build_empty_metric_payload, connect_openeo, is_openeo_auth_configured, @@ -53,6 +63,49 @@ class OpenEOServiceParsingTests(SimpleTestCase): self.assertEqual(result["cell-1"]["lst_c"], 12.4) self.assertEqual(result["cell-2"]["lst_c"], 15.1) + def test_parse_mapping_results_maps_numeric_keys_to_expected_feature_ids(self): + payload = { + "0": {"mean": 12.4}, + "1": {"mean": 15.1}, + } + + result = parse_aggregate_spatial_response( + payload, + "lst_c", + expected_feature_ids=["cell-1", "cell-2"], + ) + + self.assertEqual(result["cell-1"]["lst_c"], 12.4) + self.assertEqual(result["cell-2"]["lst_c"], 15.1) + + def test_parse_list_results_maps_positional_payload_to_expected_feature_ids(self): + payload = [{"mean": 0.61}, {"mean": 0.47}] + + result = parse_aggregate_spatial_response( + payload, + "ndvi", + expected_feature_ids=["cell-1", "cell-2"], + ) + + self.assertEqual(result["cell-1"]["ndvi"], 0.61) + self.assertEqual(result["cell-2"]["ndvi"], 0.47) + + def test_log_raw_payload_summary_warns_for_empty_payload(self): + with self.assertLogs("location_data.openeo_service", level="WARNING") as captured: + summary = _log_raw_payload_summary({}, metric_name="ndvi", job_ref="job-1") + + self.assertEqual(summary["returned_cell_count"], 0) + self.assertIn("openEO payload is empty for job_ref=job-1", "\n".join(captured.output)) + + def test_parse_logs_feature_mismatch_for_unexpected_keys(self): + payload = {"cell-1": {"foo": 12.4}} + + with self.assertLogs("location_data.openeo_service", level="WARNING") as captured: + result = parse_aggregate_spatial_response(payload, "lst_c", job_ref="job-2") + + self.assertEqual(result["cell-1"]["lst_c"], 12.4) + self.assertIn("Feature mismatch for cell=cell-1, available_keys=['foo']", "\n".join(captured.output)) + def test_linear_to_db(self): self.assertEqual(linear_to_db(10.0), 10.0) self.assertEqual(linear_to_db(Decimal("1.0")), 0.0) @@ -74,8 +127,32 @@ class OpenEOServiceParsingTests(SimpleTestCase): self.assertEqual(target["cell-2"]["ndwi"], 0.2) self.assertIn("soil_vv_db", target["cell-2"]) + def test_log_openeo_request_summary_logs_expected_fields(self): + cell = Mock() + cell.cell_code = "cell-1" + with self.assertLogs("location_data.openeo_service", level="INFO") as captured: + log_openeo_request_summary( + cells=[cell], + temporal_start="2026-04-08", + temporal_end="2026-05-08", + spatial_extent={"west": 49.9995, "south": 49.9995, "east": 50.0005, "north": 50.0005}, + selected_features=["ndvi", "ndwi"], + ) + + joined = "\n".join(captured.output) + self.assertIn("openEO request summary", joined) + self.assertIn('"cell_count": 1', joined) + self.assertIn('"date_range_days": 31', joined) + self.assertIn('"metrics": ["ndvi", "ndwi"]', joined) + class OpenEOConnectionTests(SimpleTestCase): + def test_default_openeo_timeout_is_ten_minutes(self): + with patch.dict(os.environ, {}, clear=True): + settings = OpenEOConnectionSettings.from_env() + + self.assertEqual(settings.timeout_seconds, 600.0) + def test_default_openeo_proxy_url_uses_proxychains_endpoint_without_wrapping_process(self): with patch.dict( os.environ, @@ -140,22 +217,154 @@ class OpenEOConnectionTests(SimpleTestCase): def test_connect_openeo_applies_proxy_to_session(self): connection = Mock() - connection.authenticate_oidc_resource_owner_password_credentials.return_value = connection + connection._get_oidc_provider.return_value = ("provider-1", None) + connection.get.return_value.json.return_value = { + "providers": [ + { + "id": "provider-1", + "title": "Provider 1", + "issuer": "https://issuer.example.com", + "scopes": ["openid"], + "default_clients": [], + } + ] + } openeo_module = Mock() openeo_module.connect.return_value = connection + oidc_module = Mock() + oidc_module.OidcClientCredentialsAuthenticator.return_value = Mock() + oidc_module.OidcClientInfo.side_effect = lambda **kwargs: kwargs + oidc_module.OidcProviderInfo.side_effect = lambda **kwargs: kwargs + oidc_module.OidcResourceOwnerPasswordAuthenticator.return_value = Mock() + connection._authenticate_oidc.return_value = connection settings = OpenEOConnectionSettings( backend_url="https://openeofed.dataspace.copernicus.eu", auth_method="password", timeout_seconds=123, + client_id="client-id", username="user@example.com", password="secret", proxy_url="socks5h://127.0.0.1:10808", ) - with patch.dict("sys.modules", {"openeo": openeo_module}): + with patch.dict( + "sys.modules", + { + "openeo": openeo_module, + "openeo.rest": Mock(), + "openeo.rest.auth": Mock(), + "openeo.rest.auth.oidc": oidc_module, + }, + ): connect_openeo(settings) self.assertEqual(openeo_module.connect.call_args.kwargs["default_timeout"], 123) session = openeo_module.connect.call_args.kwargs["session"] self.assertEqual(session.proxies["https"], "socks5h://127.0.0.1:10808") self.assertFalse(session.trust_env) + + def test_timeout_override_session_logs_request_payload_before_dispatch(self): + response = Mock(status_code=200, headers={"Content-Type": "application/json"}) + response.text = "{}" + response.url = "https://openeofed.dataspace.copernicus.eu/result" + with patch.object(requests.Session, "request", return_value=response) as request_mock: + with self.assertLogs("location_data.openeo_service", level="INFO") as captured: + session = build_openeo_requests_session(OpenEOConnectionSettings(proxy_url="socks5h://127.0.0.1:10808")) + session.request( + "post", + "https://openeofed.dataspace.copernicus.eu/result", + json={"process": {"foo": "bar"}}, + headers={"Authorization": "Bearer secret"}, + ) + + request_mock.assert_called_once() + self.assertTrue(any("openEO request payload" in line for line in captured.output)) + self.assertTrue(any("***redacted***" in line for line in captured.output)) + + def test_connect_openeo_raises_clear_error_for_html_capabilities_response(self): + settings = OpenEOConnectionSettings( + backend_url="https://openeofed.dataspace.copernicus.eu", + timeout_seconds=600, + ) + bad_response = Mock() + bad_response.url = "https://openeofed.dataspace.copernicus.eu/" + bad_response.headers = {"Content-Type": "text/html"} + bad_response.text = "proxy page" + + session = build_openeo_requests_session(settings) + session.last_response_url = bad_response.url + session.last_response_content_type = "text/html" + session.last_response_preview = bad_response.text + + openeo_module = Mock() + openeo_module.connect.side_effect = requests.exceptions.JSONDecodeError("Expecting value", "", 0) + + oidc_module = Mock() + with patch("location_data.openeo_service.build_openeo_requests_session", return_value=session): + with patch.dict( + "sys.modules", + { + "openeo": openeo_module, + "openeo.rest": Mock(), + "openeo.rest.auth": Mock(), + "openeo.rest.auth.oidc": oidc_module, + }, + ): + with self.assertRaisesRegex(OpenEOServiceError, "non-JSON response"): + connect_openeo(settings) + + def test_build_openeo_requests_session_mounts_retrying_adapters(self): + session = build_openeo_requests_session( + OpenEOConnectionSettings( + timeout_seconds=120, + http_retry_total=5, + http_retry_backoff_factor=2.0, + ) + ) + + https_adapter = session.get_adapter("https://openeofed.dataspace.copernicus.eu") + self.assertEqual(https_adapter.max_retries.total, 5) + self.assertEqual(https_adapter.max_retries.connect, 5) + self.assertEqual(https_adapter.max_retries.backoff_factor, 2.0) + self.assertIsNone(https_adapter.max_retries.allowed_methods) + + def test_run_aggregate_spatial_job_prefers_batch_job_results(self): + process = Mock() + job = Mock(job_id="job-123") + process.create_job.return_value = job + job.start_and_wait.return_value = job + results = Mock() + job.get_results.return_value = results + + def write_json(target_dir): + Path(target_dir, "result.json").write_text('{"cell-1": {"mean": 0.5}}', encoding="utf-8") + + results.download_files.side_effect = write_json + + payload, job_ref = _run_aggregate_spatial_job(process, metric_name="ndvi") + + self.assertEqual(payload, {"cell-1": {"mean": 0.5}}) + self.assertEqual(job_ref, "job-123") + process.execute.assert_not_called() + + def test_load_first_json_payload_prefers_stac_asset_data_over_metadata(self): + with TemporaryDirectory() as temp_dir: + Path(temp_dir, "item.json").write_text( + ( + '{"stac_version":"1.0.0","assets":{"timeseries.json":{"href":"timeseries.json"}},' + '"extent":{"spatial":{},"temporal":{}}}' + ), + encoding="utf-8", + ) + Path(temp_dir, "timeseries.json").write_text('{"cell-1": {"mean": 0.5}}', encoding="utf-8") + + payload = _load_first_json_payload(Path(temp_dir), job_ref="job-asset") + + self.assertEqual(payload, {"cell-1": {"mean": 0.5}}) + + def test_load_first_json_payload_raises_clear_error_for_invalid_json(self): + with TemporaryDirectory() as temp_dir: + Path(temp_dir, "result.json").write_text("not-json", encoding="utf-8") + with self.assertRaises(OpenEOExecutionError): + with self.assertLogs("location_data.openeo_service", level="ERROR"): + _load_first_json_payload(Path(temp_dir), job_ref="job-123") diff --git a/location_data/test_remote_sensing_api.py b/location_data/test_remote_sensing_api.py index 89e5830..6a4c16c 100644 --- a/location_data/test_remote_sensing_api.py +++ b/location_data/test_remote_sensing_api.py @@ -201,6 +201,210 @@ class RemoteSensingApiTests(TestCase): self.assertEqual(payload["run"]["stage"], "completed") self.assertEqual(payload["run"]["selected_features"], ["ndvi"]) + @patch("location_data.views._get_remote_sensing_async_result") + def test_run_status_endpoint_returns_detailed_task_progress(self, mock_async_result): + mock_async_result.return_value = SimpleNamespace( + state="STARTED", + result=None, + info={"message": "fetching_remote_metrics"}, + ready=lambda: False, + successful=lambda: False, + failed=lambda: False, + ) + task_id = "e723ba3e-c53c-401b-b3a0-5f7013c7b401" + run = RemoteSensingRun.objects.create( + soil_location=self.location, + block_subdivision=self.subdivision, + block_code="", + chunk_size_sqm=900, + temporal_start=self.temporal_start, + temporal_end=self.temporal_end, + status=RemoteSensingRun.STATUS_RUNNING, + metadata={ + "task_id": task_id, + "stage": "fetching_remote_metrics", + "selected_features": ["ndvi", "ndwi"], + "timestamps": { + "queued_at": "2026-05-10T08:00:00Z", + "started_at": "2026-05-10T08:00:03Z", + "fetching_remote_metrics_at": "2026-05-10T08:00:12Z", + }, + "stage_details": { + "fetching_remote_metrics": { + "requested_cell_count": 2, + "metric_progress": { + "total_metrics": 2, + "completed_metric_count": 1, + "active_metric": "ndwi", + "completed_metrics": ["ndvi"], + "failed_metrics": [], + "states": [ + {"metric": "ndvi", "status": "completed"}, + {"metric": "ndwi", "status": "running"}, + ], + }, + } + }, + }, + ) + + response = self.client.get(f"/remote-sensing/runs/{task_id}/status/") + + self.assertEqual(response.status_code, 200) + payload = response.json()["data"] + self.assertEqual(payload["status"], "running") + self.assertEqual(payload["task"]["current_stage"], "fetching_remote_metrics") + self.assertEqual(payload["task"]["metric_progress"]["active_metric"], "ndwi") + self.assertEqual(payload["task"]["stages"][-1]["status"], "running") + self.assertEqual(payload["task"]["celery"]["state"], "STARTED") + self.assertEqual(payload["task"]["celery"]["info"]["message"], "fetching_remote_metrics") + self.assertEqual(payload["run"]["id"], run.id) + + @patch("location_data.views._get_remote_sensing_async_result") + def test_run_status_endpoint_returns_retrying_status_when_celery_is_retrying(self, mock_async_result): + mock_async_result.return_value = SimpleNamespace( + state="RETRY", + result="temporary openEO timeout", + info="temporary openEO timeout", + ready=lambda: False, + successful=lambda: False, + failed=lambda: False, + ) + task_id = "e723ba3e-c53c-401b-b3a0-5f7013c7b401" + run = RemoteSensingRun.objects.create( + soil_location=self.location, + block_subdivision=self.subdivision, + block_code="", + chunk_size_sqm=900, + temporal_start=self.temporal_start, + temporal_end=self.temporal_end, + status=RemoteSensingRun.STATUS_RUNNING, + metadata={ + "task_id": task_id, + "stage": "retrying", + "status_label": "retrying", + "failed_stage": "observations_persisted", + "timestamps": { + "failed_at": "2026-05-10T08:10:00Z", + "retrying_at": "2026-05-10T08:11:00Z", + }, + "stage_details": { + "retrying": { + "retry_count": 2, + "retry_delay_seconds": 120, + "last_error": "temporary openEO timeout", + "failed_stage": "observations_persisted", + "failed_stage_details": {"created_count": 12, "updated_count": 0}, + } + }, + }, + ) + + response = self.client.get(f"/remote-sensing/runs/{task_id}/status/") + + self.assertEqual(response.status_code, 200) + payload = response.json()["data"] + self.assertEqual(payload["status"], "retrying") + self.assertEqual(payload["run"]["pipeline_status"], "retrying") + self.assertEqual(payload["task"]["current_stage"], "retrying") + self.assertEqual(payload["task"]["retry"]["retry_count"], 2) + self.assertEqual(payload["task"]["last_error"], "temporary openEO timeout") + self.assertNotIn("failure_reason", payload["task"]) + self.assertEqual(payload["task"]["celery"]["state"], "RETRY") + + @patch("location_data.views._get_remote_sensing_async_result") + def test_run_status_endpoint_overrides_stale_failed_db_state_when_celery_is_retrying(self, mock_async_result): + mock_async_result.return_value = SimpleNamespace( + state="RETRY", + result="temporary openEO timeout", + info="temporary openEO timeout", + ready=lambda: False, + successful=lambda: False, + failed=lambda: False, + ) + task_id = "e723ba3e-c53c-401b-b3a0-5f7013c7b401" + run = RemoteSensingRun.objects.create( + soil_location=self.location, + block_subdivision=self.subdivision, + block_code="", + chunk_size_sqm=900, + temporal_start=self.temporal_start, + temporal_end=self.temporal_end, + status=RemoteSensingRun.STATUS_FAILURE, + error_message="temporary openEO timeout", + metadata={ + "task_id": task_id, + "stage": "failed", + "status_label": "failed", + "failed_stage": "observations_persisted", + "failure_reason": "temporary openEO timeout", + "timestamps": {"failed_at": "2026-05-10T08:10:00Z"}, + "stage_details": { + "failed": { + "failed_stage": "observations_persisted", + "error_message": "temporary openEO timeout", + "failed_stage_details": {"created_count": 12, "updated_count": 0}, + } + }, + }, + ) + + response = self.client.get(f"/remote-sensing/runs/{task_id}/status/") + + self.assertEqual(response.status_code, 200) + payload = response.json()["data"] + self.assertEqual(payload["status"], "retrying") + self.assertEqual(payload["run"]["status"], RemoteSensingRun.STATUS_FAILURE) + self.assertEqual(payload["run"]["status_label"], "retrying") + self.assertEqual(payload["run"]["pipeline_status"], "retrying") + self.assertEqual(payload["run"]["stage"], "retrying") + self.assertEqual(payload["task"]["current_stage"], "retrying") + self.assertEqual(payload["task"]["retry"]["failed_stage"], "observations_persisted") + self.assertEqual(payload["task"]["stages"][-1]["name"], "retrying") + self.assertEqual(payload["task"]["stages"][-1]["status"], "running") + self.assertNotIn("failure_reason", payload["task"]) + self.assertEqual(payload["task"]["celery"]["state"], "RETRY") + self.assertEqual(payload["run"]["id"], run.id) + + def test_run_status_endpoint_returns_failed_task_details(self): + task_id = "e723ba3e-c53c-401b-b3a0-5f7013c7b401" + run = RemoteSensingRun.objects.create( + soil_location=self.location, + block_subdivision=self.subdivision, + block_code="", + chunk_size_sqm=900, + temporal_start=self.temporal_start, + temporal_end=self.temporal_end, + status=RemoteSensingRun.STATUS_FAILURE, + error_message="openEO timeout", + metadata={ + "task_id": task_id, + "stage": "failed", + "failed_stage": "observations_persisted", + "failure_reason": "openEO timeout", + "timestamps": {"failed_at": "2026-05-10T08:10:00Z"}, + "stage_details": { + "failed": { + "failed_stage": "observations_persisted", + "error_message": "openEO timeout", + "failed_stage_details": {"created_count": 12, "updated_count": 0}, + } + }, + }, + ) + + response = self.client.get(f"/remote-sensing/runs/{task_id}/status/") + + self.assertEqual(response.status_code, 200) + payload = response.json()["data"] + self.assertEqual(payload["status"], "failed") + self.assertEqual(payload["task"]["current_stage"], "failed") + self.assertEqual(payload["task"]["failed_stage"], "observations_persisted") + self.assertEqual(payload["task"]["failure_reason"], "openEO timeout") + self.assertEqual(payload["task"]["current_stage_details"]["failed_stage"], "observations_persisted") + self.assertEqual(payload["task"]["stages"][-1]["status"], "failed") + self.assertEqual(payload["run"]["id"], run.id) + def test_run_result_endpoint_returns_paginated_assignments(self): run = RemoteSensingRun.objects.create( soil_location=self.location, diff --git a/location_data/test_remote_sensing_tasks.py b/location_data/test_remote_sensing_tasks.py new file mode 100644 index 0000000..82a6382 --- /dev/null +++ b/location_data/test_remote_sensing_tasks.py @@ -0,0 +1,183 @@ +from datetime import date +from unittest.mock import Mock, patch + +from django.test import TestCase + +from location_data.models import AnalysisGridCell, AnalysisGridObservation, RemoteSensingRun, SoilLocation +from location_data.tasks import _upsert_grid_observations, run_remote_sensing_analysis + + +class RemoteSensingTaskDiagnosticsTests(TestCase): + def setUp(self): + self.boundary = { + "type": "Polygon", + "coordinates": [ + [ + [51.3890, 35.6890], + [51.3900, 35.6890], + [51.3900, 35.6900], + [51.3890, 35.6900], + [51.3890, 35.6890], + ] + ], + } + self.location = SoilLocation.objects.create( + latitude="35.689200", + longitude="51.389000", + farm_boundary=self.boundary, + ) + self.run = RemoteSensingRun.objects.create( + soil_location=self.location, + block_code="", + chunk_size_sqm=900, + temporal_start=date(2026, 4, 9), + temporal_end=date(2026, 5, 9), + status=RemoteSensingRun.STATUS_PENDING, + metadata={}, + ) + self.cell = AnalysisGridCell.objects.create( + soil_location=self.location, + block_code="", + cell_code="cell-1", + chunk_size_sqm=900, + geometry=self.boundary, + centroid_lat="35.689200", + centroid_lon="51.389200", + ) + + def test_upsert_logs_and_stores_diagnostics_for_empty_observations(self): + metric_payload = { + "results": {}, + "metadata": { + "backend": "openeo", + "backend_url": "https://openeofed.dataspace.copernicus.eu", + "collections_used": ["SENTINEL2_L2A"], + "job_refs": {"ndvi": "job-1"}, + "failed_metrics": [], + "payload_diagnostics": { + "ndvi": { + "returned_cell_count": 0, + "payload_keys_sample": [], + "available_features": ["mean"], + } + }, + }, + } + + with self.assertLogs("location_data.tasks", level="WARNING") as captured: + summary = _upsert_grid_observations( + cells=[self.cell], + run=self.run, + temporal_start=date(2026, 4, 9), + temporal_end=date(2026, 5, 9), + metric_payload=metric_payload, + ) + + log_output = "\n".join(captured.output) + self.assertIn("Persisting empty observation for cell=cell-1, run_id=", log_output) + self.assertIn("No payload cells matched DB cell_codes for run_id=", log_output) + self.assertIn("All persisted observations are empty for run_id=", log_output) + + self.assertEqual(summary["total_observation_count"], 1) + self.assertEqual(summary["usable_observation_count"], 0) + self.assertEqual(summary["fully_null_observation_count"], 1) + self.assertEqual(summary["matched_cell_count"], 0) + self.assertEqual(summary["payload_keys_sample"], []) + self.assertEqual(summary["available_features"], ["mean"]) + + observation = AnalysisGridObservation.objects.get(cell=self.cell) + self.assertIsNone(observation.ndvi) + self.assertIsNone(observation.ndwi) + self.assertIsNone(observation.lst_c) + self.assertIsNone(observation.soil_vv) + self.assertIsNone(observation.soil_vv_db) + + self.run.refresh_from_db() + diagnostics = self.run.metadata["diagnostics"]["empty_observations"] + self.assertEqual(diagnostics["job_ref"], {"ndvi": "job-1"}) + self.assertEqual(diagnostics["total_cells"], 1) + self.assertEqual(diagnostics["matched_cells"], 0) + self.assertEqual(diagnostics["payload_keys_sample"], []) + self.assertEqual(diagnostics["available_features"], ["mean"]) + + def test_run_remote_sensing_analysis_refetches_when_cached_observations_are_empty(self): + AnalysisGridObservation.objects.create( + cell=self.cell, + run=self.run, + temporal_start=date(2026, 4, 9), + temporal_end=date(2026, 5, 9), + metadata={}, + ) + subdivision_result = Mock( + id=99, + cluster_count=1, + selected_features=["ndvi", "ndwi", "lst_c", "soil_vv_db"], + metadata={"used_cell_count": 1, "skipped_cell_count": 0, "kmeans_params": {}}, + skipped_cell_codes=[], + ) + remote_payload = { + "results": { + "cell-1": { + "ndvi": 0.52, + "ndwi": 0.21, + "lst_c": None, + "soil_vv": 10.0, + "soil_vv_db": 10.0, + } + }, + "metadata": { + "backend": "openeo", + "backend_url": "https://openeofed.dataspace.copernicus.eu", + "collections_used": ["SENTINEL2_L2A", "SENTINEL1_GRD"], + "job_refs": {"ndvi": "job-1"}, + "failed_metrics": [], + "payload_diagnostics": { + "ndvi": { + "returned_cell_count": 1, + "payload_keys_sample": ["0"], + "available_features": ["mean"], + } + }, + }, + } + + with patch( + "location_data.tasks.create_or_get_analysis_grid_cells", + return_value={ + "created": False, + "block_code": "", + "total_count": 1, + "created_count": 0, + "chunk_size_sqm": 900, + "existing_count": 1, + }, + ), patch( + "location_data.tasks.compute_remote_sensing_metrics", + return_value=remote_payload, + ) as compute_mock, patch( + "location_data.tasks._ensure_subdivision_result", + return_value=subdivision_result, + ): + summary = run_remote_sensing_analysis( + soil_location_id=self.location.id, + block_code="", + temporal_start=date(2026, 4, 9), + temporal_end=date(2026, 5, 9), + run_id=self.run.id, + ) + + compute_mock.assert_called_once() + self.assertEqual(summary["source"], "openeo") + self.assertEqual(summary["processed_cell_count"], 1) + + observation = AnalysisGridObservation.objects.get(cell=self.cell) + self.assertEqual(observation.ndvi, 0.52) + self.assertEqual(observation.ndwi, 0.21) + self.assertEqual(observation.soil_vv, 10.0) + self.assertEqual(observation.soil_vv_db, 10.0) + + self.run.refresh_from_db() + cached_details = self.run.metadata["stage_details"]["using_cached_observations"] + self.assertEqual(cached_details["source"], "database") + self.assertFalse(cached_details["usable"]) + self.assertTrue(cached_details["refetching"]) diff --git a/location_data/views.py b/location_data/views.py index d290252..b4dbf8b 100644 --- a/location_data/views.py +++ b/location_data/views.py @@ -1,4 +1,5 @@ from datetime import timedelta +from typing import Any from django.apps import apps from django.core.paginator import EmptyPage, Paginator @@ -49,6 +50,21 @@ from .serializers import ( from .tasks import run_remote_sensing_analysis_task MAX_REMOTE_SENSING_PAGE_SIZE = 200 +REMOTE_SENSING_RUN_STAGE_ORDER = ( + "queued", + "running", + "preparing_analysis_grid", + "analysis_grid_ready", + "analysis_cells_selected", + "using_cached_observations", + "fetching_remote_metrics", + "remote_metrics_fetched", + "observations_persisted", + "clustering_completed", + "completed", + "failed", + "retrying", +) SoilLocationPayloadSerializer = inline_serializer( name="SoilLocationPayloadSerializer", @@ -636,13 +652,19 @@ class RemoteSensingRunStatusView(APIView): def _build_remote_sensing_run_status_payload(run: RemoteSensingRun, *, page: int, page_size: int) -> dict: run_data = RemoteSensingRunSerializer(run).data task_id = (run.metadata or {}).get("task_id") + task_data = _build_remote_sensing_task_payload(run) + effective_status = _apply_live_retry_state_override(run_data, task_data) + status_payload = { + "status": effective_status or run_data["status_label"], + "source": "database", + "run": run_data, + "task_id": task_id, + "task": task_data, + } if run.status in {RemoteSensingRun.STATUS_PENDING, RemoteSensingRun.STATUS_RUNNING}: - return { - "status": run_data["status_label"], - "source": "database", - "run": run_data, - "task_id": task_id, - } + return status_payload + if run.status == RemoteSensingRun.STATUS_FAILURE: + return status_payload location = _get_location_by_lat_lon(run.soil_location.latitude, run.soil_location.longitude, prefetch=True) observations = _get_remote_sensing_observations( @@ -654,10 +676,7 @@ def _build_remote_sensing_run_status_payload(run: RemoteSensingRun, *, page: int subdivision_result = getattr(run, "subdivision_result", None) response_payload = { - "status": run_data["status_label"], - "source": "database", - "run": run_data, - "task_id": task_id, + **status_payload, "location": SoilLocationResponseSerializer(location).data, "block_code": run.block_code, "chunk_size_sqm": run.chunk_size_sqm, @@ -705,6 +724,180 @@ def _build_remote_sensing_run_status_payload(run: RemoteSensingRun, *, page: int return response_payload +def _get_remote_sensing_async_result(task_id: str): + try: + from celery.result import AsyncResult + except ImportError: # pragma: no cover - fallback when Celery is absent + return None + + try: + return AsyncResult(task_id) + except Exception: # pragma: no cover - depends on Celery backend configuration + return None + + +def _serialize_task_value(value): + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(key): _serialize_task_value(item) for key, item in value.items()} + if isinstance(value, (list, tuple)): + return [_serialize_task_value(item) for item in value] + return str(value) + + +def _build_remote_sensing_task_payload(run: RemoteSensingRun) -> dict: + metadata = dict(run.metadata or {}) + timestamps = dict(metadata.get("timestamps") or {}) + stage_details = dict(metadata.get("stage_details") or {}) + current_stage = metadata.get("stage") + failed_stage = metadata.get("failed_stage") + task_payload = { + "current_stage": current_stage, + "current_stage_details": stage_details.get(current_stage, {}), + "timestamps": timestamps, + "stages": _build_remote_sensing_stage_entries( + current_stage=current_stage, + stage_details=stage_details, + timestamps=timestamps, + run_status=run.status, + ), + } + if failed_stage: + task_payload["failed_stage"] = failed_stage + + metric_progress = (stage_details.get("fetching_remote_metrics") or {}).get("metric_progress") + if metric_progress: + task_payload["metric_progress"] = metric_progress + + retry_context = stage_details.get("retrying") + if retry_context: + task_payload["retry"] = retry_context + task_payload["last_error"] = retry_context.get("last_error") + + failure_reason = None + if metadata.get("stage") == "failed" or run.status == RemoteSensingRun.STATUS_FAILURE: + failure_reason = metadata.get("failure_reason") or run.error_message + if failure_reason: + task_payload["failure_reason"] = failure_reason + + task_id = metadata.get("task_id") + celery_payload = _build_remote_sensing_celery_payload(str(task_id)) if task_id else None + if celery_payload is not None: + task_payload["celery"] = celery_payload + + return task_payload + + +def _apply_live_retry_state_override(run_data: dict[str, Any], task_data: dict[str, Any]) -> str | None: + celery_payload = task_data.get("celery") or {} + if celery_payload.get("state") != "RETRY": + return None + + retry_context = dict(task_data.get("retry") or {}) + if not retry_context: + retry_context = { + "retry_count": None, + "retry_delay_seconds": None, + "last_error": task_data.get("failure_reason") or celery_payload.get("info"), + "failed_stage": task_data.get("failed_stage"), + "failed_stage_details": ( + task_data.get("current_stage_details", {}) + if task_data.get("current_stage") == "failed" + else {} + ), + } + + task_data["retry"] = retry_context + task_data["last_error"] = retry_context.get("last_error") or celery_payload.get("info") + task_data["current_stage"] = "retrying" + task_data["current_stage_details"] = retry_context + task_data.pop("failure_reason", None) + _upsert_retrying_stage_entry(task_data, retry_context) + + run_data["status_label"] = "retrying" + run_data["pipeline_status"] = "retrying" + run_data["stage"] = "retrying" + return "retrying" + + +def _upsert_retrying_stage_entry(task_data: dict[str, Any], retry_context: dict[str, Any]) -> None: + stages = list(task_data.get("stages") or []) + retrying_entry = { + "name": "retrying", + "status": "running", + "entered_at": (task_data.get("timestamps") or {}).get("retrying_at"), + "details": retry_context, + } + for index, entry in enumerate(stages): + if entry.get("name") == "retrying": + stages[index] = retrying_entry + task_data["stages"] = stages + return + stages.append(retrying_entry) + task_data["stages"] = stages + + +def _build_remote_sensing_stage_entries( + *, + current_stage: str | None, + stage_details: dict, + timestamps: dict, + run_status: str, +) -> list[dict]: + stage_names = [] + for stage_name in REMOTE_SENSING_RUN_STAGE_ORDER: + if stage_name == current_stage or stage_name in stage_details or f"{stage_name}_at" in timestamps: + stage_names.append(stage_name) + if current_stage and current_stage not in stage_names: + stage_names.append(current_stage) + + entries = [] + for stage_name in stage_names: + if run_status == RemoteSensingRun.STATUS_FAILURE and stage_name == current_stage: + stage_status = "failed" + elif stage_name == current_stage and run_status == RemoteSensingRun.STATUS_PENDING: + stage_status = "pending" + elif stage_name == current_stage and run_status == RemoteSensingRun.STATUS_RUNNING: + stage_status = "running" + else: + stage_status = "completed" + entries.append( + { + "name": stage_name, + "status": stage_status, + "entered_at": timestamps.get(f"{stage_name}_at"), + "details": stage_details.get(stage_name, {}), + } + ) + return entries + + +def _build_remote_sensing_celery_payload(task_id: str) -> dict | None: + async_result = _get_remote_sensing_async_result(task_id) + if async_result is None: + return None + + try: + payload = { + "state": str(async_result.state), + "ready": bool(async_result.ready()), + "successful": bool(async_result.successful()) if async_result.ready() else False, + "failed": bool(async_result.failed()) if async_result.ready() else False, + } + except Exception: # pragma: no cover - depends on Celery backend configuration + return None + + info = getattr(async_result, "info", None) + if info not in (None, {}): + payload["info"] = _serialize_task_value(info) + + if async_result.failed(): + payload["error"] = _serialize_task_value(async_result.result) + + return payload + + def _get_location_by_lat_lon(lat, lon, *, prefetch: bool = False): lat_rounded = round(lat, 6) lon_rounded = round(lon, 6) @@ -894,8 +1087,6 @@ def _build_remote_sensing_summary(observations): ndwi_mean=Avg("ndwi"), lst_c_mean=Avg("lst_c"), soil_vv_db_mean=Avg("soil_vv_db"), - dem_m_mean=Avg("dem_m"), - slope_deg_mean=Avg("slope_deg"), ) summary = { "cell_count": observations.count(), @@ -903,8 +1094,6 @@ def _build_remote_sensing_summary(observations): "ndwi_mean": _round_or_none(aggregates.get("ndwi_mean")), "lst_c_mean": _round_or_none(aggregates.get("lst_c_mean")), "soil_vv_db_mean": _round_or_none(aggregates.get("soil_vv_db_mean")), - "dem_m_mean": _round_or_none(aggregates.get("dem_m_mean")), - "slope_deg_mean": _round_or_none(aggregates.get("slope_deg_mean")), } return summary @@ -916,8 +1105,6 @@ def _empty_remote_sensing_summary(): "ndwi_mean": None, "lst_c_mean": None, "soil_vv_db_mean": None, - "dem_m_mean": None, - "slope_deg_mean": None, } diff --git a/logs/app.log.2026-04-07 b/logs/app.log.2026-04-07 deleted file mode 100644 index 33b3a0f..0000000 --- a/logs/app.log.2026-04-07 +++ /dev/null @@ -1 +0,0 @@ -2026-04-07 07:48:04,983 [INFO] django.utils.autoreload: Watching for file changes with StatReloader