This commit is contained in:
2026-04-24 18:34:17 +03:30
parent 24ed5776bc
commit f7dc05dc9e
22 changed files with 3730 additions and 139 deletions
+47
View File
@@ -1,9 +1,56 @@
import os import os
from types import SimpleNamespace
from uuid import uuid4
try:
from celery import Celery from celery import Celery
except ImportError: # pragma: no cover - test/dev fallback when celery is absent
Celery = None
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
if Celery is not None:
app = Celery("config") app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY") app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks() app.autodiscover_tasks()
else:
class _FallbackCeleryApp:
def config_from_object(self, *_args, **_kwargs):
return None
def autodiscover_tasks(self, *_args, **_kwargs):
return None
def task(self, *decorator_args, **decorator_kwargs):
bind = decorator_kwargs.get("bind", False)
def decorator(func):
def delay(*args, **kwargs):
task_id = f"missing-celery-{uuid4()}"
return SimpleNamespace(
id=task_id,
status="FAILURE",
result={"error": "Celery is not installed."},
)
if bind:
def wrapped(*args, **kwargs):
dummy_self = SimpleNamespace(
request=SimpleNamespace(id=f"missing-celery-{uuid4()}"),
update_state=lambda **_kw: None,
)
return func(dummy_self, *args, **kwargs)
wrapped.delay = delay
wrapped.__name__ = func.__name__
wrapped.__doc__ = func.__doc__
return wrapped
func.delay = delay
return func
if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1:
return decorator(decorator_args[0])
return decorator
app = _FallbackCeleryApp()
+1
View File
@@ -139,6 +139,7 @@ SPECTACULAR_SETTINGS = {
{"name": "Farm Data", "description": "داده‌های مزرعه و سنسورها"}, {"name": "Farm Data", "description": "داده‌های مزرعه و سنسورها"},
{"name": "Farm Parameters", "description": "پارامترهای سنسورهای مزرعه"}, {"name": "Farm Parameters", "description": "پارامترهای سنسورهای مزرعه"},
{"name": "Plant", "description": "مدیریت گیاهان و دریافت اطلاعات گیاه"}, {"name": "Plant", "description": "مدیریت گیاهان و دریافت اطلاعات گیاه"},
{"name": "Crop Simulation", "description": "شبیه سازی رشد و مقایسه سناریوهای گیاه"},
{"name": "Irrigation", "description": "مدیریت روش‌های آبیاری"}, {"name": "Irrigation", "description": "مدیریت روش‌های آبیاری"},
{"name": "Irrigation Recommendation", "description": "درخواست و پیگیری توصیه آبیاری"}, {"name": "Irrigation Recommendation", "description": "درخواست و پیگیری توصیه آبیاری"},
{"name": "Fertilization Recommendation", "description": "درخواست و پیگیری توصیه کودهی"}, {"name": "Fertilization Recommendation", "description": "درخواست و پیگیری توصیه کودهی"},
+30 -26
View File
@@ -1,48 +1,52 @@
You are an expert agricultural consultant AI specializing in plant nutrition and soil fertility. Your task is to analyze the provided Knowledge Base (Context) — which includes soil test results, crop growth stage, and current farm conditions — to provide actionable fertilization advice to farmers. You are a fertilization recommendation assistant for CropLogic.
### TONE & STYLE ### GOAL
- Be friendly, respectful, and easy to understand for a farmer. Avoid overly complex academic jargon unless explained. Convert soil data, plant stage, weather risk, and the block named `[خروجی بهینه ساز شبیه سازی]` into a precise Persian fertilization plan for the farmer.
- Speak directly to the farmer in Persian (Farsi).
- If mathematical expressions or chemical ratios are used (like $N-P-K$ formulas or percentages like $20\%$), ensure they are clear.
### CORE RULES ### HARD RULES
1. MANDATORY FERTILIZER RECOMMENDATION: Your response MUST always include a clear fertilization recommendation. You must tell the farmer exactly what nutrient or fertilizer is needed based on the context. 1. The optimizer block is the source of truth for fertilizer type, dose, application method, timing, validity period, and the main scientific reasoning.
2. METHOD AND TIMING: Every fertilization recommendation MUST specify the application method (e.g., foliar spray, fertigation, soil application) and the precise timing (e.g., early morning, avoiding high wind/temperature). 2. Do not invent a fertilizer formula or dose that conflicts with the optimizer.
3. VALIDITY PERIOD: Specify the time window during which this fertilizer should be applied for maximum efficacy based on the crop's growth stage. 3. Always return only valid JSON with a top-level `sections` array.
4. NO EXTRA TEXT: Your entire response MUST be ONLY a valid JSON object. Do not include any text or markdown formatting outside of the JSON structure itself. 4. The `sections` array must include at least:
5. JSON STRUCTURE: You must strictly adhere to the JSON structure provided below. - one `recommendation` section for the core fertilization action
- one `list` section for mixing, safety, or field execution notes
- one `warning` section when there is burn risk, rainfall risk, pH incompatibility, or nutrient imbalance
5. Write in clear Persian and stay practical for field use.
### JSON OUTPUT STRUCTURE ### OUTPUT CONTRACT
{ {
"sections": [ "sections": [
{ {
"type": "recommendation", "type": "recommendation",
"title": "عنوان توصیه (مانند: برنامه محلول‌پاشی تقویتی)", "title": "برنامه کودهی بهینه",
"icon": "leaf", "icon": "leaf",
"content": "توضیح کوتاه توصیه", "content": "خلاصه یک جمله ای از سناریوی منتخب",
"fertilizerType": "نوع کود پیشنهادی (مثلاً: کود $N-P-K$ با فرمول $20-20-20$ یا اوره)", "fertilizerType": "نوع کود پیشنهادی",
"amount": "میزان مصرف دقیق (مثلاً: ۳ در هزار یا ۵۰ کیلوگرم در هکتار)", "amount": "مقدار مصرف دقیق",
"applicationMethod": "روش مصرف (مثلاً: محلول‌پاشی روی برگ، همراه با آبیاری، چالکود)", "applicationMethod": "روش مصرف",
"timing": "بهترین زمان انجام کوددهی (مثلاً: ساعات خنک صبح، قبل از آبیاری)", "timing": "بهترین زمان اجرا",
"validityPeriod": "محدوده زمانی مجاز برای انجام این کوددهی (مثلاً: تا پایان مرحله پنجه‌زنی)", "validityPeriod": "مدت اعتبار این توصیه",
"expandableExplanation": "توضیحات تکمیلی و دلایل علمی برای رفع کمبود عناصر (اختیاری)" "expandableExplanation": "دلیل انتخاب این سناریو بر اساس کمبود عناصر، pH، مرحله رشد و شبیه سازی"
}, },
{ {
"type": "list", "type": "list",
"title": "نکات مهم ایمنی و اختلاط", "title": "نکات اجرایی و اختلاط",
"icon": "list", "icon": "list",
"items": [ "items": [
"نکته اول (مثلاً: از اختلاط با ترکیبات مسی خودداری شود)", "نکته عملی 1",
"نکته دوم" "نکته عملی 2"
] ]
}, },
{ {
"type": "warning", "type": "warning",
"title": "هشدار کمبود عناصر یا سوختگی", "title": "هشدار کودهی",
"icon": "alert-triangle", "icon": "alert-triangle",
"content": "متن هشدار (در صورت وجود علائم کمبود شدید یا خطر سوختگی گیاه)" "content": "هشدار کوتاه و کاربردی"
} }
] ]
} }
Note: The "sections" array MUST contain at least one object with "type": "recommendation" dedicated to fertilization. Valid icons for this topic include "leaf", "flask", "list", and "alert-triangle". Ensure the JSON is properly escaped and strictly valid. ### WRITING RULES
- If the optimizer highlights a dominant nutrient gap, explain it briefly in `expandableExplanation`.
- If rainfall or temperature limits the method, repeat that constraint in `warning`.
- Never output markdown, code fences, greetings, or extra commentary.
+29 -24
View File
@@ -1,46 +1,51 @@
You are an expert agricultural consultant AI. Your task is to analyze the provided Knowledge Base (Context), which includes scientific agricultural data and specific farm conditions, to provide actionable irrigation advice to farmers. You are an irrigation recommendation assistant for CropLogic.
### TONE & STYLE ### GOAL
- Be friendly, respectful, and easy to understand for a farmer. Avoid overly complex academic jargon unless explained. Turn the farm context, weather context, FAO-56 calculations, and the block named `[خروجی بهینه ساز شبیه سازی]` into a farmer-friendly Persian JSON response.
- Speak directly to the farmer in Persian (Farsi).
### CORE RULES ### HARD RULES
1. MANDATORY IRRIGATION RECOMMENDATION: Your response MUST always include a clear irrigation recommendation. You cannot simply provide general information; you must tell the farmer what to do regarding irrigation. 1. The optimizer block is the source of truth for amount, timing, frequency, validity period, event dates, and stress reasoning. Do not invent conflicting numbers.
2. VALIDITY PERIOD: Every irrigation recommendation MUST include its validity period (e.g., "This recommendation is valid for the next 3 days" or "Valid until the next rainfall"). You must specify this clearly so the farmer knows when to seek new advice. 2. If both optimizer data and general knowledge are present, prefer optimizer data and use knowledge only to explain why.
3. NO EXTRA TEXT: Your entire response MUST be ONLY a valid JSON object. Do not include any greeting text or markdown formatting (like 3. Always return only valid JSON with a top-level `sections` array.
```json) outside of the JSON structure itself. 4. The `sections` array must include at least:
4. JSON STRUCTURE: You must strictly adhere to the JSON structure provided below. - one `recommendation` section for the main irrigation plan
- one `list` section for operational notes
- one `warning` section when there is rainfall risk, heat stress, wind risk, or low/high soil moisture
5. Write in clear Persian for a farmer. Keep sentences short and practical.
### JSON OUTPUT STRUCTURE ### OUTPUT CONTRACT
{ {
"sections": [ "sections": [
{ {
"type": "recommendation", "type": "recommendation",
"title": "عنوان توصیه (مانند: برنامه آبیاری فوری)", "title": "برنامه آبیاری بهینه",
"icon": "droplet", "icon": "droplet",
"content": "توضیح کوتاه توصیه", "content": "خلاصه یک جمله ای از بهترین سناریوی شبیه سازی",
"frequency": "دوره تناوب آبیاری (اختیاری)", "frequency": "تعداد نوبت آبیاری در بازه اعتبار",
"amount": "میزان آب مورد نیاز (مثلاً بر اساس میلیمتر یا ساعت آبیاری)", "amount": "مقدار آب در هر نوبت و جمع کل",
"timing": "بهترین زمان انجام آبیاری", "timing": "بهترین زمان اجرا",
"validityPeriod": "مدت زمان اعتبار این توصیه (مثلاً: معتبر برای ۳ روز آینده با توجه به پیش‌بینی هوا)", "validityPeriod": "مدت اعتبار دقیق توصیه",
"expandableExplanation": "توضیحات تکمیلی و دلایل علمی برای کشاورز (اختیاری)" "expandableExplanation": "توضیح دلیل انتخاب این سناریو با ارجاع به تنش آبی، دما، بارش و شبیه سازی"
}, },
{ {
"type": "list", "type": "list",
"title": "نکات مهم", "title": "اقدامات اجرایی",
"icon": "list", "icon": "list",
"items": [ "items": [
"نکته اول", "نکته عملی 1",
"نکته دوم" "نکته عملی 2"
] ]
}, },
{ {
"type": "warning", "type": "warning",
"title": "هشدار تنش آبی یا شرایط خاص", "title": "هشدار آبیاری",
"icon": "alert-triangle", "icon": "alert-triangle",
"content": "متن هشدار" "content": "هشدار کوتاه و کاربردی"
} }
] ]
} }
Note: The "sections" array MUST contain at least one object with "type": "recommendation" dedicated to irrigation. You can use "list" or "warning" types as needed based on the context. Ensure the JSON is properly escaped and strictly valid. ### WRITING RULES
- If event dates are provided by the optimizer, mention them naturally inside `content` or `expandableExplanation`.
- If the optimizer says the advice is valid until rainfall, repeat that exact condition in `validityPeriod`.
- Never output markdown, code fences, greetings, or extra commentary.
+1
View File
@@ -20,4 +20,5 @@ urlpatterns = [
path("api/plants/", include("plant.urls")), path("api/plants/", include("plant.urls")),
path("api/irrigation/", include("irrigation.urls")), path("api/irrigation/", include("irrigation.urls")),
path("api/fertilization/", include("fertilization.urls")), path("api/fertilization/", include("fertilization.urls")),
path("api/crop-simulation/", include("crop_simulation.urls")),
] ]
+822
View File
@@ -0,0 +1,822 @@
# راهنمای کامل `crop_simulation/services.py`
این فایل توضیح می‌دهد که سرویس‌های شبیه‌سازی در `crop_simulation/services.py` چه کاری انجام می‌دهند، ورودی و خروجی هر بخش چیست، و چگونه با تنظیمات موجود در `irrigation/apps.py` و `fertilization/apps.py` ارتباط می‌گیرند.
---
## نمای کلی
فایل `crop_simulation/services.py` هسته اجرای سناریوهای شبیه‌سازی محصول در پروژه است. این فایل سه مسئولیت اصلی دارد:
1. نرمال‌سازی ورودی‌ها برای موتور شبیه‌سازی
2. اجرای مدل PCSE/WOFOST
3. ذخیره و مدیریت سناریوها و runها در دیتابیس
در عمل این فایل بین داده‌های خام مزرعه/هواشناسی/مدیریتی و خروجی نهایی شبیه‌سازی قرار می‌گیرد.
---
## ساختار کلی فایل
این فایل را می‌توان به ۴ بخش تقسیم کرد:
1. توابع کمکی برای تبدیل ورودی‌ها
2. کلاس `PcseSimulationManager`
3. کلاس `CropSimulationService`
4. wrapperهای سطح ماژول برای استفاده ساده‌تر
---
## بخش اول: ثابت‌ها و Exception
### `DEFAULT_OUTPUT_VARS`
لیست متغیرهایی که از خروجی روزانه مدل می‌خواهیم:
- `DVS`
- `LAI`
- `TAGP`
- `TWSO`
- `SM`
### `DEFAULT_SUMMARY_VARS`
متغیرهای خلاصه:
- `TAGP`
- `TWSO`
- `CTRAT`
- `RD`
### `DEFAULT_TERMINAL_VARS`
متغیرهای انتهایی:
- `TAGP`
- `TWSO`
- `LAI`
- `DVS`
### `CropSimulationError`
خطای اختصاصی این ماژول است. هر جا داده ورودی یا اجرای مدل مشکل داشته باشد، معمولا این exception یا exceptionهای مشتق‌شده از آن دیده می‌شود.
---
## بخش دوم: توابع کمکی داخلی
این توابع public API نیستند، اما پایه رفتار کل سرویس را تشکیل می‌دهند.
### `_json_ready(value)`
داده‌های Python را برای ذخیره در JSON آماده می‌کند.
کارهایی که انجام می‌دهد:
- `dict`، `list` و `tuple` را recursive تبدیل می‌کند
- `date` و `datetime` را به `isoformat()` تبدیل می‌کند
موارد استفاده:
- قبل از ذخیره `input_payload`
- قبل از ذخیره `result_payload`
- قبل از ذخیره payload هر `SimulationRun`
### `_coerce_date(value)`
ورودی را به `date` تبدیل می‌کند.
ورودی قابل قبول:
- `date`
- `datetime`
- رشته ISO مثل `2026-04-01`
اگر نوع پشتیبانی نشود، `CropSimulationError` می‌دهد.
### `_normalize_weather_records(weather)`
ورودی آب‌وهوا را به فرمت استاندارد موردنیاز PCSE تبدیل می‌کند.
ورودی قابل قبول:
- یک `dict`
- یک `list[dict]`
- یک آبجکت با کلید `records`
خروجی همیشه لیستی از رکوردهای نرمال‌شده با کلیدهای زیر است:
- `DAY`
- `LAT`
- `LON`
- `ELEV`
- `IRRAD`
- `TMIN`
- `TMAX`
- `VAP`
- `WIND`
- `RAIN`
- `E0`
- `ES0`
- `ET0`
اگر رکوردها خالی باشند، خطا می‌دهد.
### `_normalize_agromanagement(agromanagement)`
ورودی agromanagement را به یک `list[dict]` تبدیل می‌کند.
ورودی قابل قبول:
- دیکشنری با کلید `AgroManagement`
- لیست
- یک دیکشنری تکی
اگر خالی باشد، خطا می‌دهد.
### `_deep_copy_json_like(value)`
نسخه deep copy ساده از objectهای JSON-like می‌سازد.
برای جلوگیری از mutation روی ورودی اصلی استفاده می‌شود.
### `_parse_recommendation_events(...)`
داده‌های توصیه آبیاری یا کودهی را به فرمت event قابل الحاق به `TimedEvents` تبدیل می‌کند.
این تابع از چند شکل ورودی پشتیبانی می‌کند:
- `events`
- `schedule`
- `applications`
- `plan`
نمونه ورودی آبیاری:
```python
{
"events": [
{"date": "2026-04-25", "amount": 2.5, "efficiency": 0.8}
]
}
```
نمونه خروجی:
```python
[
{
"event_signal": "irrigate",
"name": "irrigate recommendation",
"comment": "",
"events_table": [
{
date(2026, 4, 25): {"amount": 2.5, "efficiency": 0.8}
}
],
}
]
```
### `_merge_management_recommendations(...)`
مهم‌ترین تابع glue برای اتصال recommendationها به شبیه‌سازی است.
کار این تابع:
1. agromanagement را normalize می‌کند
2. توصیه آبیاری را به eventهای `irrigate` تبدیل می‌کند
3. توصیه کودهی را به eventهای `apply_n` تبدیل می‌کند
4. همه آن‌ها را داخل اولین campaign معتبر در `TimedEvents` merge می‌کند
این تابع همان نقطه‌ای است که recommendationهای اپ آبیاری/کودهی به سناریوی شبیه‌سازی تزریق می‌شوند.
### `_normalize_pcse_output_records(records)`
خروجی‌های مدل PCSE را به لیست تبدیل می‌کند تا کدهای بعدی همیشه با ساختار یکنواخت کار کنند.
### `_pick_first_not_none(*values)`
اولین مقدار non-null را برمی‌گرداند.
برای ساخت metricهای نهایی مثل `yield_estimate` استفاده می‌شود.
### `_extract_total_n(agromanagement)`
جمع کل `N_amount` را از eventهای کودهی استخراج می‌کند.
در نسخه فعلی این تابع برای محاسبات جانبی آماده است و نقطه مناسبی برای توسعه تحلیل استراتژی‌های تغذیه است.
### `_load_pcse_bindings()`
کلاس‌ها و ماژول‌های لازم از package `pcse` را load می‌کند:
- `ParameterProvider`
- `WeatherDataProvider`
- `WeatherDataContainer`
- `pcse.models`
اگر `pcse` نصب نباشد، `None` برمی‌گرداند.
### `_resolve_model_class(bindings, model_name)`
کلاس مدل PCSE را با نامی مثل `Wofost72_WLP_CWB` پیدا می‌کند.
---
## `PreparedSimulationInput`
این dataclass ورودی‌های نرمال‌شده برای اجرای مدل را نگه می‌دارد:
- `weather`
- `soil`
- `crop`
- `site`
- `agromanagement`
این ساختار باعث می‌شود manager با یک payload استاندارد کار کند.
---
## بخش سوم: `PcseSimulationManager`
این کلاس فقط مسئول اجرای موتور شبیه‌سازی است و وارد منطق ذخیره سناریوها نمی‌شود.
### `__init__(model_name="Wofost72_WLP_CWB")`
مدل PCSE مورد استفاده را مشخص می‌کند.
مدل پیش‌فرض:
```python
Wofost72_WLP_CWB
```
### `run_simulation(...)`
ورودی خام می‌گیرد، normalize می‌کند، dependencyهای PCSE را load می‌کند، و شبیه‌سازی را اجرا می‌کند.
پارامترها:
- `weather`
- `soil`
- `crop_parameters`
- `agromanagement`
- `site_parameters`
خروجی:
```python
{
"engine": "pcse",
"model_name": "Wofost72_WLP_CWB",
"metrics": {...},
"daily_output": [...],
"summary_output": [...],
"terminal_output": [...]
}
```
### `_run_with_pcse(prepared, bindings)`
اجرای واقعی مدل را انجام می‌دهد.
جریان داخلی:
1. ساخت weather provider سفارشی از روی dictها
2. ساخت `ParameterProvider`
3. ساخت instance مدل PCSE
4. اجرای `run_till_terminate()` یا `run()`
5. گرفتن خروجی‌ها
6. تبدیل خروجی به فرم نهایی
### `_build_result(...)`
metricهای کلیدی را از خروجی‌های terminal/summary/daily استخراج می‌کند:
- `yield_estimate`
- `biomass`
- `max_lai`
اولویت انتخاب metricها:
1. terminal
2. summary
3. آخرین رکورد daily
---
## بخش چهارم: `CropSimulationService`
این کلاس service layer سطح بالاتر است. علاوه بر اجرای مدل، سناریوها و runها را در دیتابیس ذخیره می‌کند.
مدل‌های مرتبط:
- `SimulationScenario`
- `SimulationRun`
### `__init__(manager=None)`
اگر manager داده نشود، از `PcseSimulationManager()` پیش‌فرض استفاده می‌شود.
---
## متدهای public اصلی
### 1) `run_single_simulation(...)`
برای اجرای یک سناریوی تکی.
پارامترها:
- `weather`
- `soil`
- `crop_parameters`
- `agromanagement`
- `site_parameters`
- `irrigation_recommendation`
- `fertilization_recommendation`
- `name`
کارها:
1. merge کردن recommendationها داخل management
2. ساخت `SimulationScenario` با نوع `SINGLE`
3. ساخت `SimulationRun`
4. اجرای سناریو
مهم:
اگر recommendationهای آبیاری/کودهی بدهید، این متد آن‌ها را به eventهای مدل تبدیل می‌کند.
نمونه:
```python
from crop_simulation.services import run_single_simulation
result = run_single_simulation(
weather=weather_payload,
soil={"SMFCF": 0.34, "SMW": 0.12, "RDMSOL": 120.0},
crop_parameters={"crop_name": "wheat", "TSUM1": 800},
agromanagement=agromanagement_payload,
site_parameters={"WAV": 40.0},
irrigation_recommendation={
"events": [
{"date": "2026-04-25", "amount": 2.5, "efficiency": 0.8}
]
},
)
```
### 2) `compare_crops(...)`
برای مقایسه دو محصول.
ورودی‌های اضافه:
- `crop_a`
- `crop_b`
خروجی:
- سناریو با نوع `CROP_COMPARISON`
- دو run
- comparison شامل best run و yield gap
### 3) `recommend_best_crop(...)`
برای مقایسه چند محصول و انتخاب بهترین گزینه.
ورودی مهم:
- `crops: list[dict]`
شرط:
- حداقل دو crop باید وجود داشته باشد
خروجی ساده‌شده:
```python
{
"scenario_id": ...,
"scenario_type": "crop_comparison",
"recommended_crop": {
"run_key": "...",
"label": "...",
"expected_yield_estimate": ...
},
"candidates": [...],
"raw_result": {...}
}
```
### 4) `compare_fertilization_strategies(...)`
برای مقایسه چند strategy کودهی روی یک crop ثابت.
ورودی ویژه:
```python
strategies = [
{
"label": "base",
"agromanagement": [...]
},
{
"label": "high_n",
"agromanagement": [...]
}
]
```
این متد برای هر strategy یک run می‌سازد و بهترین استراتژی را بر اساس `yield_estimate` انتخاب می‌کند.
### 5) `get_scenario_result(scenario_id)`
نتیجه ذخیره‌شده یک سناریو را از دیتابیس برمی‌گرداند.
خروجی شامل:
- اطلاعات scenario
- اطلاعات همه runها
- status
- input payload
- result payload
- error message
---
## متدهای داخلی مهم در `CropSimulationService`
### `_execute_scenario(...)`
قلب اجرای سناریو است.
جریان:
1. status سناریو را `RUNNING` می‌کند
2. تک‌تک runها را اجرا می‌کند
3. خروجی هر run را ذخیره می‌کند
4. اگر exception رخ دهد:
- همان run را `FAILURE` می‌کند
- سناریو را `FAILURE` می‌کند
- خطا را ذخیره می‌کند
5. اگر همه چیز موفق باشد:
- `scenario_result` می‌سازد
- سناریو را `SUCCESS` می‌کند
### `_build_scenario_result(scenario, results)`
خروجی سطح سناریو را می‌سازد.
رفتار بر اساس نوع سناریو:
- `SINGLE`:
- فقط `result` برمی‌گرداند
- `CROP_COMPARISON`:
- comparison می‌سازد
- بهترین run را مشخص می‌کند
- `yield_gap` می‌سازد
- `FERTILIZATION_COMPARISON`:
- recommendation برای بهترین strategy می‌سازد
---
## wrapperهای سطح ماژول
در انتهای فایل این wrapperها وجود دارند:
- `run_single_simulation(**kwargs)`
- `compare_crops(**kwargs)`
- `recommend_best_crop(**kwargs)`
- `compare_fertilization_strategies(**kwargs)`
همه آن‌ها با `@transaction.atomic` تزئین شده‌اند.
یعنی اگر بخواهید ساده از بیرون صدا بزنید، لازم نیست خودتان instance بسازید:
```python
from crop_simulation.services import recommend_best_crop
result = recommend_best_crop(
weather=weather_payload,
soil=soil_payload,
crops=[crop_a, crop_b, crop_c],
agromanagement=agromanagement_payload,
)
```
---
## نحوه ارتباط با مدل‌های دیتابیس
### `SimulationScenario`
نماینده یک سناریوی کلی است.
مثال‌ها:
- single run
- crop comparison
- fertilization comparison
### `SimulationRun`
نماینده هر اجرای منفرد داخل یک سناریو است.
مثلا در `compare_crops`:
- یک `SimulationScenario`
- دو `SimulationRun`
---
## ارتباط `crop_simulation/services.py` با `crop_simulation/apps.py`
فایل `crop_simulation/apps.py` این متد را expose می‌کند:
```python
def get_recommendation_optimizer(self):
return self.recommendation_optimizer
```
این optimizer در فایل `crop_simulation/recommendation_optimizer.py` ساخته می‌شود و برای recommendationهای آبیاری و کودهی استفاده می‌شود.
نکته مهم:
- `services.py` موتور اجرای سناریوهاست
- `recommendation_optimizer.py` روی همین موتور سناریوهای candidate می‌سازد
- `apps.py` فقط نقطه دسترسی مرکزی به optimizer است
یعنی:
```python
optimizer = apps.get_app_config("crop_simulation").get_recommendation_optimizer()
```
و بعد optimizer در داخل خودش از `CropSimulationService` استفاده می‌کند.
---
## ارتباط با `irrigation/apps.py`
فایل `irrigation/apps.py` خودش شبیه‌سازی اجرا نمی‌کند؛ بلکه تنظیمات default برای optimizer آبیاری را نگه می‌دارد.
### فیلدهای مهم
#### `tone_file`
مسیر tone مربوط به LLM:
```python
config/tones/irrigation_tone.txt
```
#### `optimizer_defaults`
این property تنظیمات پایه بهینه‌سازی آبیاری را برمی‌گرداند:
- `validity_days`
- `minimum_event_mm`
- `significant_rain_threshold_mm`
- `stage_targets`
- `strategy_profiles`
### `stage_targets`
هدف رطوبت یا رفتار پایه برای stageهای مختلف:
- `initial`
- `vegetative`
- `flowering`
- `fruiting`
### `strategy_profiles`
سه سناریوی پایه برای optimizer:
- `conservative`
- `balanced`
- `protective`
هر سناریو مشخص می‌کند:
- ضریب آب (`multiplier`)
- ضریب تعداد دفعات (`frequency_factor`)
- تعداد event پایه (`event_count`)
### نحوه استفاده در کد
در optimizer آبیاری معمولا به شکل زیر خوانده می‌شود:
```python
defaults = apps.get_app_config("irrigation").get_optimizer_defaults()
```
سپس این defaults به سناریوهای recommendation تبدیل می‌شوند و در صورت نیاز به `run_single_simulation()` پاس داده می‌شوند.
### نقش آن در ارتباط با `services.py`
ارتباط غیرمستقیم است:
1. `irrigation/apps.py` تنظیمات baseline را می‌دهد
2. optimizer با این تنظیمات candidate strategy می‌سازد
3. strategyها به recommendation event تبدیل می‌شوند
4. `crop_simulation/services.py` آن‌ها را داخل agromanagement merge و اجرا می‌کند
---
## ارتباط با `fertilization/apps.py`
این فایل مشابه irrigation است اما برای منطق کودهی.
### `tone_file`
```python
config/tones/fertilization_tone.txt
```
### `optimizer_defaults`
این تنظیمات را می‌دهد:
- `validity_days`
- `rain_delay_threshold_mm`
- `stage_targets`
- `strategy_profiles`
### `stage_targets`
برای هر stage اطلاعات زیر مشخص می‌شود:
- `n`
- `p`
- `k`
- `formula`
- `application_method`
- `timing`
### `strategy_profiles`
سناریوهای پایه:
- `maintenance`
- `balanced`
- `corrective`
هرکدام مشخص می‌کنند:
- ضریب مصرف (`multiplier`)
- focus تغذیه‌ای
- روش مصرف
- override فرمول در صورت نیاز
### نحوه استفاده در کد
```python
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
```
سپس optimizer با این defaults چند strategy می‌سازد. اگر لازم باشد این strategyها به `compare_fertilization_strategies()` یا `run_single_simulation()` داده می‌شوند.
### ارتباط آن با `services.py`
ارتباط باز هم غیرمستقیم است:
1. `fertilization/apps.py` پروفایل stage و strategy را می‌دهد
2. optimizer از روی آن strategy تولید می‌کند
3. strategy به eventهای `apply_n` تبدیل می‌شود
4. `services.py` این eventها را داخل agromanagement merge می‌کند
5. سناریو اجرا و مقایسه می‌شود
---
## الگوی ارتباط کامل بین سه بخش
### سناریوی آبیاری
```text
irrigation/apps.py
-> optimizer_defaults
-> recommendation optimizer
-> irrigation recommendation events
-> crop_simulation/services.py:_merge_management_recommendations()
-> run_single_simulation()
-> PCSE run
-> scenario/run result
```
### سناریوی کودهی
```text
fertilization/apps.py
-> optimizer_defaults
-> recommendation optimizer
-> fertilization recommendation events
-> crop_simulation/services.py:_merge_management_recommendations()
-> compare_fertilization_strategies() / run_single_simulation()
-> PCSE run
-> best strategy result
```
---
## نمونه استفاده واقعی
### اجرای یک شبیه‌سازی ساده
```python
from crop_simulation.services import run_single_simulation
result = run_single_simulation(
weather=[
{
"DAY": "2026-04-01",
"LAT": 35.7,
"LON": 51.4,
"ELEV": 1200,
"IRRAD": 16000000,
"TMIN": 11,
"TMAX": 22,
"VAP": 12,
"WIND": 2.4,
"RAIN": 0.8,
"E0": 0.35,
"ES0": 0.3,
"ET0": 0.32,
}
],
soil={"SMFCF": 0.34, "SMW": 0.12, "RDMSOL": 120.0},
crop_parameters={"crop_name": "wheat", "TSUM1": 800, "YIELD_SCALE": 1.0},
agromanagement=[
{
"2026-04-01": {
"CropCalendar": {
"crop_name": "wheat",
"variety_name": "winter-wheat",
"crop_start_date": "2026-04-05",
"crop_start_type": "sowing",
"crop_end_date": "2026-09-01",
"crop_end_type": "harvest",
"max_duration": 180,
},
"TimedEvents": [],
"StateEvents": [],
}
}
],
site_parameters={"WAV": 40.0},
)
```
### مقایسه دو محصول
```python
from crop_simulation.services import compare_crops
result = compare_crops(
weather=weather_payload,
soil=soil_payload,
crop_a={"crop_name": "wheat", "TSUM1": 800},
crop_b={"crop_name": "maize", "TSUM1": 900},
agromanagement=agromanagement_payload,
site_parameters={"WAV": 40.0},
)
```
### مقایسه strategyهای کودهی
```python
from crop_simulation.services import compare_fertilization_strategies
result = compare_fertilization_strategies(
weather=weather_payload,
soil=soil_payload,
crop_parameters={"crop_name": "wheat", "TSUM1": 800},
strategies=[
{"label": "base", "agromanagement": agm_base},
{"label": "high_n", "agromanagement": agm_high_n},
],
site_parameters={"WAV": 40.0},
)
```
---
## نکات مهم توسعه
### 1. نقطه اصلی inject کردن توصیه‌ها
اگر بخواهید recommendationهای جدید را وارد شبیه‌سازی کنید، مهم‌ترین نقطه:
```python
_merge_management_recommendations()
```
### 2. نقطه اصلی اجرای موتور
اگر بخواهید backend engine عوض شود یا مدل جدید اضافه شود:
```python
PcseSimulationManager.run_simulation()
```
### 3. نقطه اصلی مدیریت lifecycle سناریو
اگر بخواهید queueing، logging یا audit بیشتری اضافه کنید:
```python
CropSimulationService._execute_scenario()
```
### 4. ارتباط با اپ‌های recommendation
اگر stageها یا strategyهای آبیاری/کودهی تغییر کنند، باید این فایل‌ها بررسی شوند:
- `irrigation/apps.py`
- `fertilization/apps.py`
چون optimizer از آن‌ها defaultها را می‌خواند.
---
## جمع‌بندی
اگر بخواهیم نقش هر فایل را در یک جمله بگوییم:
- `crop_simulation/services.py`: اجرای شبیه‌سازی، ساخت scenario/run، و merge کردن recommendationها با management
- `crop_simulation/apps.py`: نقطه دسترسی مرکزی به optimizer
- `irrigation/apps.py`: تنظیمات پایه برای سناریوهای بهینه‌سازی آبیاری
- `fertilization/apps.py`: تنظیمات پایه برای سناریوهای بهینه‌سازی کودهی
و زنجیره کلی این است:
```text
defaults in app config
-> optimizer
-> recommendation events
-> crop_simulation/services.py
-> PCSE execution
-> scenario result
```
اگر بخواهید، قدم بعدی می‌توانم یک فایل دوم هم بسازم که فقط نمونه request/response واقعی برای هر تابع و هر سناریو را به‌صورت cookbook نشان بدهد.
+11
View File
@@ -1,3 +1,5 @@
from functools import cached_property
from django.apps import AppConfig from django.apps import AppConfig
@@ -5,3 +7,12 @@ class CropSimulationConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField" default_auto_field = "django.db.models.BigAutoField"
name = "crop_simulation" name = "crop_simulation"
verbose_name = "Crop Simulation" verbose_name = "Crop Simulation"
@cached_property
def recommendation_optimizer(self):
from .recommendation_optimizer import SimulationRecommendationOptimizer
return SimulationRecommendationOptimizer()
def get_recommendation_optimizer(self):
return self.recommendation_optimizer
+534
View File
@@ -0,0 +1,534 @@
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from math import exp
from typing import Any
from django.core.paginator import EmptyPage, Paginator
from farm_data.models import SensorData
from plant.gdd import calculate_daily_gdd, resolve_growth_profile
from weather.models import WeatherForecast
from .services import CropSimulationService
DEFAULT_DYNAMIC_PARAMETERS = ["DVS", "LAI", "TAGP", "TWSO", "SM"]
DEFAULT_PAGE_SIZE = 10
MAX_PAGE_SIZE = 50
DEFAULT_STAGE_LABELS = {
"pre_emergence": "پیش از سبز شدن",
"establishment": "استقرار",
"vegetative": "رشد رویشی",
"flowering": "گلدهی",
"reproductive": "پرشدن محصول",
"maturity": "رسیدگی",
}
class GrowthSimulationError(Exception):
pass
@dataclass
class GrowthSimulationContext:
plant_name: str
plant: Any
dynamic_parameters: list[str]
weather: list[dict[str, Any]]
crop_parameters: dict[str, Any]
soil_parameters: dict[str, Any]
site_parameters: dict[str, Any]
agromanagement: list[dict[str, Any]]
page_size: int
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value in (None, ""):
return default
return float(value)
except (TypeError, ValueError):
return default
def _coerce_date(value: Any) -> date:
if isinstance(value, date) and not isinstance(value, datetime):
return value
if isinstance(value, datetime):
return value.date()
if isinstance(value, str):
return date.fromisoformat(value)
raise GrowthSimulationError(f"Invalid date value: {value!r}")
def _json_ready(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _json_ready(item) for key, item in value.items()}
if isinstance(value, list):
return [_json_ready(item) for item in value]
if isinstance(value, tuple):
return [_json_ready(item) for item in value]
if isinstance(value, (date, datetime)):
return value.isoformat()
return value
def _normalize_weather_records(weather: Any) -> list[dict[str, Any]]:
if not weather:
return []
records = weather.get("records") if isinstance(weather, dict) and "records" in weather else weather
if not isinstance(records, list):
records = [records]
normalized = []
for item in records:
if not isinstance(item, dict):
raise GrowthSimulationError("Weather records must be JSON objects.")
current_date = _coerce_date(item.get("DAY") or item.get("day"))
normalized.append(
{
"DAY": current_date,
"LAT": _safe_float(item.get("LAT", item.get("lat")), 35.7),
"LON": _safe_float(item.get("LON", item.get("lon")), 51.4),
"ELEV": _safe_float(item.get("ELEV", item.get("elev")), 1200.0),
"IRRAD": _safe_float(item.get("IRRAD", item.get("irrad")), 16_000_000.0),
"TMIN": _safe_float(item.get("TMIN", item.get("tmin")), 12.0),
"TMAX": _safe_float(item.get("TMAX", item.get("tmax")), 24.0),
"VAP": _safe_float(item.get("VAP", item.get("vap")), 12.0),
"WIND": _safe_float(item.get("WIND", item.get("wind")), 2.0),
"RAIN": _safe_float(item.get("RAIN", item.get("rain")), 0.0),
"E0": _safe_float(item.get("E0", item.get("e0")), 0.35),
"ES0": _safe_float(item.get("ES0", item.get("es0")), 0.3),
"ET0": _safe_float(item.get("ET0", item.get("et0")), 0.32),
}
)
if not normalized:
raise GrowthSimulationError("At least one weather record is required.")
return normalized
def _build_weather_from_farm(sensor: SensorData) -> list[dict[str, Any]]:
forecasts = list(
WeatherForecast.objects.filter(location=sensor.center_location)
.order_by("forecast_date")[:14]
)
if not forecasts:
raise GrowthSimulationError("No forecast data found for the selected farm.")
records = []
for forecast in forecasts:
records.append(
{
"DAY": forecast.forecast_date,
"LAT": float(sensor.center_location.latitude),
"LON": float(sensor.center_location.longitude),
"ELEV": 1200.0,
"IRRAD": 16_000_000.0,
"TMIN": _safe_float(forecast.temperature_min, 12.0),
"TMAX": _safe_float(forecast.temperature_max, 24.0),
"VAP": max(_safe_float(forecast.humidity_mean, 55.0) / 5.0, 6.0),
"WIND": _safe_float(forecast.wind_speed_max, 7.2) / 3.6,
"RAIN": _safe_float(forecast.precipitation, 0.0),
"E0": _safe_float(forecast.et0, 0.35),
"ES0": max(_safe_float(forecast.et0, 0.35) * 0.9, 0.1),
"ET0": _safe_float(forecast.et0, 0.35),
}
)
return records
def _build_soil_and_site_from_farm(sensor: SensorData) -> tuple[dict[str, Any], dict[str, Any]]:
depths = list(sensor.center_location.depths.all())
top_depth = depths[0] if depths else None
smfcf = _safe_float(getattr(top_depth, "wv0033", None), 0.34)
smw = _safe_float(getattr(top_depth, "wv1500", None), 0.14)
soil_moisture = None
payload = sensor.sensor_payload or {}
if isinstance(payload, dict):
for block in payload.values():
if isinstance(block, dict) and block.get("soil_moisture") is not None:
soil_moisture = _safe_float(block.get("soil_moisture"))
break
site = {"WAV": soil_moisture if soil_moisture is not None else 40.0}
soil = {"SMFCF": smfcf, "SMW": smw, "RDMSOL": 120.0}
return soil, site
def _build_default_crop_parameters(plant: Any) -> dict[str, Any]:
profile = resolve_growth_profile(plant)
required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0)
return {
"crop_name": plant.name,
"TSUM1": round(required_gdd * 0.45, 3),
"TSUM2": round(required_gdd * 0.55, 3),
"YIELD_SCALE": 1.0,
"MAX_LAI": 5.0,
"MAX_BIOMASS": 12000.0,
}
def _build_default_agromanagement(plant_name: str, weather: list[dict[str, Any]]) -> list[dict[str, Any]]:
first_day = weather[0]["DAY"]
last_day = weather[-1]["DAY"]
crop_start = first_day
crop_end = max(last_day, crop_start + timedelta(days=1))
return [
{
first_day: {
"CropCalendar": {
"crop_name": plant_name,
"variety_name": "default",
"crop_start_date": crop_start,
"crop_start_type": "sowing",
"crop_end_date": crop_end,
"crop_end_type": "harvest",
"max_duration": max((crop_end - crop_start).days, 1),
},
"TimedEvents": [],
"StateEvents": [],
}
}
]
def _resolve_plant_simulation_defaults(plant: Any) -> tuple[dict[str, Any] | None, list[dict[str, Any]] | None]:
for attr in ("growth_profile", "irrigation_profile", "health_profile"):
profile = getattr(plant, attr, None) or {}
if not isinstance(profile, dict):
continue
simulation = profile.get("simulation")
if not isinstance(simulation, dict):
continue
crop_parameters = simulation.get("crop_parameters")
agromanagement = simulation.get("agromanagement")
if isinstance(crop_parameters, dict) and agromanagement:
return deepcopy(crop_parameters), deepcopy(agromanagement)
return None, None
def build_growth_context(payload: dict[str, Any]) -> GrowthSimulationContext:
plant_name = payload["plant_name"]
from plant.models import Plant
plant = Plant.objects.filter(name=plant_name).first()
if plant is None:
raise GrowthSimulationError("Plant not found.")
dynamic_parameters = payload.get("dynamic_parameters") or DEFAULT_DYNAMIC_PARAMETERS
page_size = min(max(int(payload.get("page_size") or DEFAULT_PAGE_SIZE), 1), MAX_PAGE_SIZE)
sensor = None
if payload.get("farm_uuid"):
sensor = (
SensorData.objects.select_related("center_location")
.prefetch_related("center_location__depths")
.filter(farm_uuid=payload["farm_uuid"])
.first()
)
if sensor is None:
raise GrowthSimulationError("Farm not found.")
weather = (
_normalize_weather_records(payload["weather"])
if payload.get("weather")
else _build_weather_from_farm(sensor)
if sensor is not None
else []
)
if not weather:
raise GrowthSimulationError("Weather input is required.")
default_crop_parameters, default_agromanagement = _resolve_plant_simulation_defaults(plant)
crop_parameters = deepcopy(payload.get("crop_parameters") or default_crop_parameters or _build_default_crop_parameters(plant))
crop_parameters.setdefault("crop_name", plant.name)
soil_parameters = deepcopy(payload.get("soil_parameters") or {})
site_parameters = deepcopy(payload.get("site_parameters") or {})
if sensor is not None:
farm_soil, farm_site = _build_soil_and_site_from_farm(sensor)
soil_parameters = {**farm_soil, **soil_parameters}
site_parameters = {**farm_site, **site_parameters}
soil_parameters.setdefault("SMFCF", 0.34)
soil_parameters.setdefault("SMW", 0.14)
soil_parameters.setdefault("RDMSOL", 120.0)
site_parameters.setdefault("WAV", 40.0)
agromanagement = deepcopy(
payload.get("agromanagement")
or default_agromanagement
or _build_default_agromanagement(plant.name, weather)
)
return GrowthSimulationContext(
plant_name=plant_name,
plant=plant,
dynamic_parameters=dynamic_parameters,
weather=weather,
crop_parameters=crop_parameters,
soil_parameters=soil_parameters,
site_parameters=site_parameters,
agromanagement=agromanagement,
page_size=page_size,
)
def _derive_stage(dvs: float) -> tuple[str, str]:
if dvs < 0:
return "pre_emergence", DEFAULT_STAGE_LABELS["pre_emergence"]
if dvs < 0.2:
return "establishment", DEFAULT_STAGE_LABELS["establishment"]
if dvs < 1.0:
return "vegetative", DEFAULT_STAGE_LABELS["vegetative"]
if dvs < 1.3:
return "flowering", DEFAULT_STAGE_LABELS["flowering"]
if dvs < 2.0:
return "reproductive", DEFAULT_STAGE_LABELS["reproductive"]
return "maturity", DEFAULT_STAGE_LABELS["maturity"]
def _logistic(value: float, midpoint: float, steepness: float, upper: float) -> float:
return upper / (1.0 + exp(-steepness * (value - midpoint)))
def _run_projection_engine(context: GrowthSimulationContext) -> dict[str, Any]:
profile = resolve_growth_profile(context.plant)
required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0)
current_gdd = _safe_float(profile.get("current_cumulative_gdd"), 0.0)
base_temperature = _safe_float(profile.get("base_temperature"), 10.0)
max_lai = _safe_float(context.crop_parameters.get("MAX_LAI"), 5.0)
max_biomass = _safe_float(context.crop_parameters.get("MAX_BIOMASS"), 12000.0)
soil_moisture = _safe_float(context.site_parameters.get("WAV"), 40.0)
daily_output = []
for record in context.weather:
tmax = _safe_float(record.get("TMAX"), 24.0)
tmin = _safe_float(record.get("TMIN"), 12.0)
rain = _safe_float(record.get("RAIN"), 0.0)
et0 = _safe_float(record.get("ET0"), 0.32)
daily_gdd = calculate_daily_gdd(tmax=tmax, tmin=tmin, tbase=base_temperature)
current_gdd += daily_gdd
dvs = min(max((current_gdd / max(required_gdd, 1.0)) * 2.0, 0.0), 2.0)
if dvs <= 1.0:
lai = _logistic(dvs, midpoint=0.55, steepness=7.5, upper=max_lai)
else:
decline_factor = max(0.25, 1.0 - ((dvs - 1.0) / 1.1))
lai = max_lai * decline_factor
biomass_factor = min(current_gdd / max(required_gdd, 1.0), 1.25)
weather_modifier = max(0.65, min(1.15, 1.0 + (rain * 0.02) - (et0 * 0.08)))
tagp = max_biomass * biomass_factor * weather_modifier
twso = tagp * max(min((dvs - 0.95) / 0.85, 0.55), 0.0)
soil_moisture = max(5.0, min(95.0, soil_moisture + (rain * 0.7) - (et0 * 8.5)))
entry = {
"DAY": record["DAY"],
"DVS": round(dvs, 4),
"LAI": round(lai, 4),
"TAGP": round(tagp, 4),
"TWSO": round(twso, 4),
"SM": round(soil_moisture / 100.0, 4),
"GDD": round(daily_gdd, 4),
"TMIN": round(tmin, 4),
"TMAX": round(tmax, 4),
"RAIN": round(rain, 4),
"ET0": round(et0, 4),
}
daily_output.append(entry)
final_entry = daily_output[-1] if daily_output else {}
return {
"engine": "growth_projection",
"model_name": "growth_projection_v1",
"metrics": {
"yield_estimate": round(_safe_float(final_entry.get("TWSO"), 0.0), 4),
"biomass": round(_safe_float(final_entry.get("TAGP"), 0.0), 4),
"max_lai": round(max((_safe_float(item.get("LAI"), 0.0) for item in daily_output), default=0.0), 4),
},
"daily_output": _json_ready(daily_output),
"summary_output": [],
"terminal_output": [_json_ready(final_entry)] if final_entry else [],
}
def _run_simulation(context: GrowthSimulationContext) -> tuple[dict[str, Any], int | None, str | None]:
try:
response = CropSimulationService().run_single_simulation(
weather=context.weather,
soil=context.soil_parameters,
crop_parameters=context.crop_parameters,
agromanagement=context.agromanagement,
site_parameters=context.site_parameters,
name=f"growth:{context.plant_name}",
)
return response["result"], response.get("scenario_id"), None
except Exception as exc:
fallback = _run_projection_engine(context)
return fallback, None, str(exc)
def summarize_growth_stages(
daily_output: list[dict[str, Any]],
dynamic_parameters: list[str],
) -> list[dict[str, Any]]:
if not daily_output:
return []
stage_items = []
current = None
for raw in daily_output:
record = dict(raw)
day = _coerce_date(record.get("DAY") or record.get("day"))
dvs = _safe_float(record.get("DVS"), 0.0)
stage_code, stage_name = _derive_stage(dvs)
parameter_values = {}
for param in dynamic_parameters:
if record.get(param) is not None:
parameter_values[param] = _safe_float(record.get(param))
if current is None or current["stage_code"] != stage_code:
if current is not None:
stage_items.append(current)
current = {
"stage_code": stage_code,
"stage_name": stage_name,
"start_date": day,
"end_date": day,
"days_count": 1,
"raw_days": [
{
"date": day,
"parameters": parameter_values,
}
],
}
continue
current["end_date"] = day
current["days_count"] += 1
current["raw_days"].append({"date": day, "parameters": parameter_values})
if current is not None:
stage_items.append(current)
summarized = []
for index, item in enumerate(stage_items, start=1):
metrics = {}
for param in dynamic_parameters:
values = [
day_item["parameters"][param]
for day_item in item["raw_days"]
if param in day_item["parameters"]
]
if not values:
continue
metrics[param] = {
"start": round(values[0], 4),
"end": round(values[-1], 4),
"min": round(min(values), 4),
"max": round(max(values), 4),
"avg": round(sum(values) / len(values), 4),
}
summarized.append(
{
"order": index,
"stage_code": item["stage_code"],
"stage_name": item["stage_name"],
"start_date": item["start_date"].isoformat(),
"end_date": item["end_date"].isoformat(),
"days_count": item["days_count"],
"metrics": metrics,
}
)
return summarized
def paginate_growth_stages(
stage_timeline: list[dict[str, Any]],
*,
page: int,
page_size: int,
) -> dict[str, Any]:
page_size = min(max(page_size, 1), MAX_PAGE_SIZE)
if not stage_timeline:
return {
"items": [],
"pagination": {
"page": 1,
"page_size": page_size,
"total_items": 0,
"total_pages": 0,
"has_next": False,
"has_previous": False,
},
}
paginator = Paginator(stage_timeline, page_size)
try:
page_obj = paginator.page(page)
except EmptyPage:
page_obj = paginator.page(paginator.num_pages or 1)
return {
"items": list(page_obj.object_list),
"pagination": {
"page": page_obj.number,
"page_size": page_size,
"total_items": paginator.count,
"total_pages": paginator.num_pages,
"has_next": page_obj.has_next(),
"has_previous": page_obj.has_previous(),
},
}
def run_growth_simulation(payload: dict[str, Any], progress_callback=None) -> dict[str, Any]:
context = build_growth_context(payload)
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={"current": 1, "total": 3, "message": "simulation input resolved"},
)
simulation_result, scenario_id, simulation_error = _run_simulation(context)
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={"current": 2, "total": 3, "message": "simulation finished"},
)
stage_timeline = summarize_growth_stages(
daily_output=simulation_result.get("daily_output", []),
dynamic_parameters=context.dynamic_parameters,
)
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={"current": 3, "total": 3, "message": "growth stages prepared"},
)
paginated = paginate_growth_stages(
stage_timeline,
page=1,
page_size=context.page_size,
)
return {
"plant_name": context.plant_name,
"dynamic_parameters": context.dynamic_parameters,
"engine": simulation_result.get("engine"),
"model_name": simulation_result.get("model_name"),
"scenario_id": scenario_id,
"simulation_warning": simulation_error,
"summary_metrics": simulation_result.get("metrics", {}),
"stage_timeline": stage_timeline,
"stages_page": paginated["items"],
"pagination": paginated["pagination"],
"daily_records_count": len(simulation_result.get("daily_output", [])),
"default_page_size": context.page_size,
}
+781
View File
@@ -0,0 +1,781 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from statistics import mean
from typing import Any
from django.apps import apps
from crop_simulation.services import CropSimulationService
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except (TypeError, ValueError):
return default
def _clamp(value: float, lower: float, upper: float) -> float:
return max(lower, min(value, upper))
def _stage_key(growth_stage: str | None) -> str:
text = (growth_stage or "").strip().lower()
if any(token in text for token in ("flower", "گل", "anthesis")):
return "flowering"
if any(token in text for token in ("fruit", "میوه", "برداشت", "ripen", "harvest")):
return "fruiting"
if any(token in text for token in ("initial", "seed", "جوانه", "نشا", "استقرار")):
return "initial"
return "vegetative"
def _first_not_none(*values: Any) -> Any:
for value in values:
if value is not None:
return value
return None
def _sensor_metric(sensor: Any, metric: str) -> float | None:
if sensor is None:
return None
if hasattr(sensor, metric):
value = getattr(sensor, metric)
return _safe_float(value, default=0.0) if value is not None else None
payload = getattr(sensor, "sensor_payload", None) or {}
if not isinstance(payload, dict):
return None
for block in payload.values():
if isinstance(block, dict) and block.get(metric) is not None:
return _safe_float(block.get(metric), default=0.0)
return None
def _parse_temperature_range(plant: Any) -> tuple[float, float]:
raw = (getattr(plant, "temperature", "") or "").replace("تا", "-")
digits = []
current = ""
for char in raw:
if char.isdigit() or char in ".-":
current += char
continue
if current:
digits.append(current)
current = ""
if current:
digits.append(current)
if len(digits) >= 2:
low = _safe_float(digits[0], 12.0)
high = _safe_float(digits[1], 28.0)
if low < high:
return low, high
return 14.0, 30.0
def _mean_forecast_value(forecasts: list[Any], attr: str, fallback: float = 0.0) -> float:
values = [_safe_float(getattr(item, attr, None), default=fallback) for item in forecasts]
return round(mean(values), 3) if values else fallback
def _next_rain_date(forecasts: list[Any], threshold_mm: float) -> str | None:
for forecast in forecasts:
if _safe_float(getattr(forecast, "precipitation", None), 0.0) >= threshold_mm:
return forecast.forecast_date.isoformat()
return None
def _best_timing(avg_temp: float, avg_wind: float) -> str:
if avg_temp >= 30 or avg_wind >= 18:
return "اوایل صبح"
if avg_temp <= 18:
return "اواخر صبح"
return "اوایل صبح یا نزدیک غروب"
def _build_weather_records(forecasts: list[Any], *, latitude: float, longitude: float) -> list[dict[str, Any]]:
records = []
for forecast in forecasts:
tmin = _safe_float(
_first_not_none(getattr(forecast, "temperature_min", None), getattr(forecast, "temperature_mean", None)),
12.0,
)
tmax = _safe_float(
_first_not_none(getattr(forecast, "temperature_max", None), getattr(forecast, "temperature_mean", None)),
24.0,
)
humidity = _safe_float(getattr(forecast, "humidity_mean", None), 55.0)
vap = max(6.0, round((humidity / 100.0) * 20.0, 3))
wind_kmh = _safe_float(getattr(forecast, "wind_speed_max", None), 7.2)
wind_ms = round(wind_kmh / 3.6, 3)
et0 = _safe_float(getattr(forecast, "et0", None), 0.35)
records.append(
{
"DAY": forecast.forecast_date,
"LAT": latitude,
"LON": longitude,
"ELEV": 1200.0,
"IRRAD": 16_000_000.0,
"TMIN": tmin,
"TMAX": tmax,
"VAP": vap,
"WIND": wind_ms,
"RAIN": _safe_float(getattr(forecast, "precipitation", None), 0.0),
"E0": et0,
"ES0": max(et0 * 0.9, 0.1),
"ET0": et0,
}
)
return records
def _build_soil_parameters(sensor: Any) -> tuple[dict[str, Any], dict[str, Any]]:
moisture_pct = _sensor_metric(sensor, "soil_moisture")
depths = []
center_location = getattr(sensor, "center_location", None)
if center_location is not None:
depths = list(center_location.depths.all())
top_depth = depths[0] if depths else None
wv0033 = _safe_float(getattr(top_depth, "wv0033", None), 0.34)
wv1500 = _safe_float(getattr(top_depth, "wv1500", None), 0.14)
smfcf = _clamp(wv0033 if wv0033 > 0 else 0.34, 0.2, 0.55)
smw = _clamp(wv1500 if wv1500 > 0 else 0.12, 0.05, smfcf - 0.02)
if moisture_pct is not None:
wav = round(_clamp(moisture_pct / 100.0, smw, smfcf) * 100.0, 3)
else:
wav = round(((smfcf + smw) / 2.0) * 100.0, 3)
soil = {
"SMFCF": round(smfcf, 3),
"SMW": round(smw, 3),
"RDMSOL": 120.0,
}
site = {"WAV": wav}
return soil, site
def _build_crop_parameters(plant: Any, growth_stage: str | None) -> tuple[dict[str, Any], list[dict[str, Any]]] | None:
profiles = []
for attr in ("growth_profile", "irrigation_profile", "health_profile"):
profile = getattr(plant, attr, None) or {}
if isinstance(profile, dict):
profiles.append(profile)
simulation_block = None
for profile in profiles:
candidate = profile.get("simulation")
if isinstance(candidate, dict):
simulation_block = candidate
break
if not simulation_block:
return None
crop_parameters = simulation_block.get("crop_parameters")
agromanagement = simulation_block.get("agromanagement")
if not isinstance(crop_parameters, dict) or not agromanagement:
return None
enriched_crop = dict(crop_parameters)
enriched_crop.setdefault("crop_name", getattr(plant, "name", "crop"))
if growth_stage:
enriched_crop.setdefault("growth_stage", growth_stage)
return enriched_crop, agromanagement
def _event_dates_for_frequency(forecasts: list[Any], count: int) -> list[str]:
if not forecasts:
return []
ranked = sorted(
forecasts,
key=lambda item: (
_safe_float(getattr(item, "et0", None), 0.0)
+ _safe_float(getattr(item, "temperature_max", None), 0.0) / 10.0
- _safe_float(getattr(item, "precipitation", None), 0.0)
),
reverse=True,
)
selected = sorted(ranked[:count], key=lambda item: item.forecast_date)
return [item.forecast_date.isoformat() for item in selected]
def _irrigation_context_text(result: dict[str, Any]) -> str:
recommended = result["recommended_strategy"]
alternative_lines = [
f"- {item['label']}: امتیاز {item['score']}, آب کل {item['total_irrigation_mm']} mm"
for item in result.get("alternatives", [])
]
lines = [
f"engine: {result['engine']}",
f"استراتژی منتخب: {recommended['label']}",
f"امتیاز شبیه سازی: {recommended['score']}",
f"شاخص عملکرد مورد انتظار: {recommended['expected_yield_index']}",
f"آب کل پیشنهادی: {recommended['total_irrigation_mm']} mm",
f"مقدار هر نوبت: {recommended['amount_per_event_mm']} mm",
f"تعداد نوبت: {recommended['events']}",
f"تقویم اجرای پیشنهادی: {', '.join(recommended['event_dates']) or 'نامشخص'}",
f"زمان انجام: {recommended['timing']}",
f"رطوبت هدف خاک: {recommended['moisture_target_percent']}%",
f"اعتبار: {recommended['validity_period']}",
"دلایل اصلی:",
*[f"- {item}" for item in recommended["reasoning"]],
]
if alternative_lines:
lines.extend(["گزینه های جایگزین:", *alternative_lines])
return "\n".join(lines)
def _fertilization_context_text(result: dict[str, Any]) -> str:
recommended = result["recommended_strategy"]
alternative_lines = [
f"- {item['label']}: امتیاز {item['score']}, دوز {item['amount_kg_per_ha']} kg/ha"
for item in result.get("alternatives", [])
]
lines = [
f"engine: {result['engine']}",
f"استراتژی منتخب: {recommended['label']}",
f"امتیاز شبیه سازی: {recommended['score']}",
f"شاخص عملکرد مورد انتظار: {recommended['expected_yield_index']}",
f"نوع کود: {recommended['fertilizer_type']}",
f"مقدار مصرف: {recommended['amount_kg_per_ha']} kg/ha",
f"روش مصرف: {recommended['application_method']}",
f"زمان مصرف: {recommended['timing']}",
f"اعتبار: {recommended['validity_period']}",
"دلایل اصلی:",
*[f"- {item}" for item in recommended["reasoning"]],
]
if alternative_lines:
lines.extend(["گزینه های جایگزین:", *alternative_lines])
return "\n".join(lines)
@dataclass
class StrategyResult:
code: str
label: str
score: float
expected_yield_index: float
payload: dict[str, Any]
reasoning: list[str]
class SimulationRecommendationOptimizer:
"""بهینه ساز توصیه های آبیاری و کودهی داخل اپ crop_simulation."""
def __init__(self):
self.simulation_service = CropSimulationService()
def optimize_irrigation(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
daily_water_needs: list[dict[str, Any]],
growth_stage: str | None,
irrigation_method: Any | None,
) -> dict[str, Any] | None:
if sensor is None or plant is None or not forecasts:
return None
crop_blueprint = _build_crop_parameters(plant, growth_stage)
if crop_blueprint:
pcse_result = self._optimize_irrigation_with_pcse(
sensor=sensor,
plant=plant,
forecasts=forecasts,
daily_water_needs=daily_water_needs,
growth_stage=growth_stage,
crop_blueprint=crop_blueprint,
)
if pcse_result is not None:
return pcse_result
return self._optimize_irrigation_with_heuristic(
sensor=sensor,
plant=plant,
forecasts=forecasts,
daily_water_needs=daily_water_needs,
growth_stage=growth_stage,
irrigation_method=irrigation_method,
)
def optimize_fertilization(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
growth_stage: str | None,
) -> dict[str, Any] | None:
if sensor is None or plant is None:
return None
crop_blueprint = _build_crop_parameters(plant, growth_stage)
if crop_blueprint and forecasts:
pcse_result = self._optimize_fertilization_with_pcse(
sensor=sensor,
plant=plant,
forecasts=forecasts,
growth_stage=growth_stage,
crop_blueprint=crop_blueprint,
)
if pcse_result is not None:
return pcse_result
return self._optimize_fertilization_with_heuristic(
sensor=sensor,
plant=plant,
forecasts=forecasts,
growth_stage=growth_stage,
)
def _optimize_irrigation_with_pcse(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
daily_water_needs: list[dict[str, Any]],
growth_stage: str | None,
crop_blueprint: tuple[dict[str, Any], list[dict[str, Any]]],
) -> dict[str, Any] | None:
defaults = apps.get_app_config("irrigation").get_optimizer_defaults()
crop_parameters, agromanagement = crop_blueprint
soil, site = _build_soil_parameters(sensor)
weather = _build_weather_records(
forecasts,
latitude=_safe_float(sensor.center_location.latitude),
longitude=_safe_float(sensor.center_location.longitude),
)
total_mm = round(sum(_safe_float(item.get("gross_irrigation_mm"), 0.0) for item in daily_water_needs), 3)
if total_mm <= 0:
return None
strategies = []
for spec in defaults["strategy_profiles"]:
irrigation_events = []
event_dates = _event_dates_for_frequency(forecasts, max(1, spec["event_count"]))
amount_per_event = round((total_mm * spec["multiplier"]) / max(len(event_dates), 1), 3)
for day in event_dates:
irrigation_events.append({"date": day, "amount": amount_per_event})
try:
result = self.simulation_service.run_single_simulation(
weather=weather,
soil=soil,
crop_parameters=crop_parameters,
agromanagement=agromanagement,
site_parameters=site,
irrigation_recommendation={"events": irrigation_events},
name=f"irrigation-{spec['code']}",
)
except Exception:
return None
yield_estimate = _safe_float(
result.get("result", {}).get("metrics", {}).get("yield_estimate"),
0.0,
)
score = round(_clamp((yield_estimate / 100.0), 0.0, 100.0), 2)
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=round(score, 2),
payload={
"events": len(event_dates),
"event_dates": event_dates,
"amount_per_event_mm": amount_per_event,
"total_irrigation_mm": round(amount_per_event * len(event_dates), 3),
"timing": _best_timing(
_mean_forecast_value(forecasts, "temperature_mean", 22.0),
_mean_forecast_value(forecasts, "wind_speed_max", 8.0),
),
},
reasoning=[
"امتیاز بر اساس بیشترین عملکرد شبیه سازی شده انتخاب شد.",
f"عملکرد نسبی این سناریو {round(score, 2)} ارزیابی شد.",
],
)
)
best = max(strategies, key=lambda item: item.score)
moisture_target = defaults["stage_targets"].get(_stage_key(growth_stage), defaults["stage_targets"]["vegetative"])
result = {
"engine": "pcse",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"total_irrigation_mm": best.payload["total_irrigation_mm"],
"amount_per_event_mm": best.payload["amount_per_event_mm"],
"events": best.payload["events"],
"frequency_per_week": best.payload["events"],
"event_dates": best.payload["event_dates"],
"timing": best.payload["timing"],
"moisture_target_percent": moisture_target,
"validity_period": f"معتبر برای {defaults['validity_days']} روز آینده",
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"total_irrigation_mm": item.payload["total_irrigation_mm"],
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
}
result["context_text"] = _irrigation_context_text(result)
return result
def _optimize_irrigation_with_heuristic(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
daily_water_needs: list[dict[str, Any]],
growth_stage: str | None,
irrigation_method: Any | None,
) -> dict[str, Any]:
defaults = apps.get_app_config("irrigation").get_optimizer_defaults()
stage_key = _stage_key(growth_stage)
moisture_target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
total_mm = round(sum(_safe_float(item.get("gross_irrigation_mm"), 0.0) for item in daily_water_needs), 3)
non_zero_days = [item for item in daily_water_needs if _safe_float(item.get("gross_irrigation_mm"), 0.0) > 0]
average_temp = _mean_forecast_value(forecasts, "temperature_mean", 22.0)
average_wind = _mean_forecast_value(forecasts, "wind_speed_max", 8.0)
heat_risk = _mean_forecast_value(forecasts, "temperature_max", 28.0) >= 32.0
rain_date = _next_rain_date(forecasts, defaults["significant_rain_threshold_mm"])
efficiency = _safe_float(getattr(irrigation_method, "water_efficiency_percent", None), 75.0)
soil_moisture = _sensor_metric(sensor, "soil_moisture")
strategies: list[StrategyResult] = []
for spec in defaults["strategy_profiles"]:
event_count = max(1, min(7, round(max(len(non_zero_days), 1) * spec["frequency_factor"])))
applied_total = round(max(total_mm * spec["multiplier"], 0.0), 3)
amount_per_event = round(max(applied_total / event_count, defaults["minimum_event_mm"]), 3)
water_penalty = abs(applied_total - total_mm) * 2.4
if total_mm <= 0:
water_penalty = 0.0 if spec["code"] == "conservative" else 12.0
soil_penalty = 0.0
if soil_moisture is not None:
if soil_moisture < 25 and spec["code"] == "conservative":
soil_penalty += 8.0
if soil_moisture > 55 and spec["code"] == "protective":
soil_penalty += 7.0
climate_bonus = 0.0
if heat_risk and spec["code"] == "protective":
climate_bonus += 6.0
if rain_date and spec["code"] == "protective":
climate_bonus -= 8.0
if efficiency >= 85 and spec["code"] == "balanced":
climate_bonus += 4.0
score = round(_clamp(100.0 - water_penalty - soil_penalty + climate_bonus, 35.0, 96.0), 2)
event_dates = _event_dates_for_frequency(forecasts, event_count)
reasoning = [
f"نیاز آبی محاسبه شده برای بازه پیش رو حدود {total_mm} میلی متر است.",
f"این سناریو {applied_total} میلی متر آب را در {event_count} نوبت پخش می کند.",
]
if heat_risk:
reasoning.append("به خاطر دمای بالاتر از حد مطلوب، تنش گرمایی در امتیازدهی لحاظ شده است.")
if rain_date:
reasoning.append(f"بارش معنی دار از تاریخ {rain_date} احتمال کاهش نیاز آبی را بالا می برد.")
if soil_moisture is not None:
reasoning.append(f"رطوبت فعلی خاک حدود {round(soil_moisture, 1)} درصد در نظر گرفته شده است.")
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=round(52.0 + (score * 0.48), 2),
payload={
"events": event_count,
"amount_per_event_mm": amount_per_event,
"total_irrigation_mm": applied_total,
"event_dates": event_dates,
"timing": _best_timing(average_temp, average_wind),
},
reasoning=reasoning,
)
)
best = max(strategies, key=lambda item: item.score)
validity_period = f"معتبر برای {defaults['validity_days']} روز آینده"
if rain_date:
validity_period = f"معتبر تا قبل از بارش موثر پیش بینی شده در {rain_date}"
result = {
"engine": "crop_simulation_heuristic",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"total_irrigation_mm": best.payload["total_irrigation_mm"],
"amount_per_event_mm": best.payload["amount_per_event_mm"],
"events": best.payload["events"],
"frequency_per_week": min(best.payload["events"] + 1, 7),
"event_dates": best.payload["event_dates"],
"timing": best.payload["timing"],
"moisture_target_percent": moisture_target,
"validity_period": validity_period,
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"total_irrigation_mm": item.payload["total_irrigation_mm"],
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
}
result["context_text"] = _irrigation_context_text(result)
return result
def _optimize_fertilization_with_pcse(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
growth_stage: str | None,
crop_blueprint: tuple[dict[str, Any], list[dict[str, Any]]],
) -> dict[str, Any] | None:
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
crop_parameters, agromanagement = crop_blueprint
soil, site = _build_soil_parameters(sensor)
weather = _build_weather_records(
forecasts,
latitude=_safe_float(sensor.center_location.latitude),
longitude=_safe_float(sensor.center_location.longitude),
)
stage_key = _stage_key(growth_stage)
target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
base_n = max(target["n"], 20)
strategies = []
for spec in defaults["strategy_profiles"]:
n_amount = round(base_n * spec["multiplier"], 3)
strategy_agromanagement = [
{
key: {
**value,
"TimedEvents": [
{
"event_signal": "apply_n",
"name": spec["label"],
"events_table": [
{
forecasts[0].forecast_date: {
"N_amount": n_amount,
"N_recovery": 0.7,
}
}
],
}
],
}
}
for entry in agromanagement
for key, value in entry.items()
] or agromanagement
try:
result = self.simulation_service.run_single_simulation(
weather=weather,
soil=soil,
crop_parameters=crop_parameters,
agromanagement=strategy_agromanagement,
site_parameters=site,
name=f"fertilization-{spec['code']}",
)
except Exception:
return None
yield_estimate = _safe_float(
result.get("result", {}).get("metrics", {}).get("yield_estimate"),
0.0,
)
score = round(_clamp((yield_estimate / 100.0), 0.0, 100.0), 2)
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=score,
payload={
"amount_kg_per_ha": round(n_amount * 1.6, 3),
"fertilizer_type": target["formula"],
"application_method": target["application_method"],
"timing": target["timing"],
},
reasoning=[
"سناریو برتر با بیشترین عملکرد شبیه سازی شده انتخاب شد.",
f"فرمول هدف برای این مرحله {target['formula']} در نظر گرفته شد.",
],
)
)
best = max(strategies, key=lambda item: item.score)
result = {
"engine": "pcse",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"fertilizer_type": best.payload["fertilizer_type"],
"amount_kg_per_ha": best.payload["amount_kg_per_ha"],
"application_method": best.payload["application_method"],
"timing": best.payload["timing"],
"validity_period": f"معتبر برای {defaults['validity_days']} روز آینده",
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"amount_kg_per_ha": item.payload["amount_kg_per_ha"],
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
}
result["context_text"] = _fertilization_context_text(result)
return result
def _optimize_fertilization_with_heuristic(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
growth_stage: str | None,
) -> dict[str, Any]:
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
stage_key = _stage_key(growth_stage)
target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
current_n = _sensor_metric(sensor, "nitrogen")
current_p = _sensor_metric(sensor, "phosphorus")
current_k = _sensor_metric(sensor, "potassium")
current_ph = _sensor_metric(sensor, "soil_ph")
deficits = {
"n": max(target["n"] - _safe_float(current_n, target["n"] * 0.6), 0.0),
"p": max(target["p"] - _safe_float(current_p, target["p"] * 0.6), 0.0),
"k": max(target["k"] - _safe_float(current_k, target["k"] * 0.6), 0.0),
}
dominant = max(deficits, key=deficits.get)
severity = sum(deficits.values())
next_rain = _next_rain_date(forecasts, defaults["rain_delay_threshold_mm"]) if forecasts else None
avg_temp = _mean_forecast_value(forecasts, "temperature_mean", 22.0)
strategies: list[StrategyResult] = []
for spec in defaults["strategy_profiles"]:
base_amount = max(30.0, min(120.0, 35.0 + (severity * 1.4)))
amount = round(base_amount * spec["multiplier"], 2)
mismatch_penalty = 0.0
if dominant == "n" and "ازت" not in spec["focus"]:
mismatch_penalty += 12.0
if dominant == "k" and "پتاس" not in spec["focus"]:
mismatch_penalty += 12.0
if dominant == "p" and "فسفر" not in spec["focus"]:
mismatch_penalty += 12.0
if current_ph is not None and current_ph > 7.8 and "فسفر" in spec["focus"]:
mismatch_penalty += 8.0
if next_rain and spec["application_method"] == "محلول پاشی":
mismatch_penalty += 10.0
score = round(_clamp(96.0 - mismatch_penalty - abs(spec["multiplier"] - 1.0) * 18.0, 42.0, 95.0), 2)
reasoning = [
f"کسری عناصر برای این مرحله با فرمول هدف {target['formula']} سنجیده شد.",
f"بیشترین کمبود نسبی مربوط به عنصر {dominant.upper()} است.",
f"دوز پیشنهادی این سناریو {amount} کیلوگرم در هکتار برآورد شد.",
]
if current_ph is not None:
reasoning.append(f"pH فعلی خاک حدود {round(current_ph, 2)} در تصمیم گیری لحاظ شد.")
if next_rain:
reasoning.append(f"به دلیل بارش موثر نزدیک در {next_rain} از مصرف سطحی پرریسک اجتناب شده است.")
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=round(50.0 + (score * 0.5), 2),
payload={
"fertilizer_type": spec["formula_override"] or target["formula"],
"amount_kg_per_ha": amount,
"application_method": spec["application_method"],
"timing": target["timing"] if avg_temp < 30 else "صبح زود یا نزدیک غروب",
},
reasoning=reasoning,
)
)
best = max(strategies, key=lambda item: item.score)
validity_period = f"معتبر برای {defaults['validity_days']} روز آینده"
if stage_key == "flowering":
validity_period = "معتبر تا پایان پنجره گلدهی فعلی و حداکثر 5 روز آینده"
result = {
"engine": "crop_simulation_heuristic",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"fertilizer_type": best.payload["fertilizer_type"],
"amount_kg_per_ha": best.payload["amount_kg_per_ha"],
"application_method": best.payload["application_method"],
"timing": best.payload["timing"],
"validity_period": validity_period,
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"amount_kg_per_ha": item.payload["amount_kg_per_ha"],
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
"nutrient_status": {
"nitrogen": current_n,
"phosphorus": current_p,
"potassium": current_k,
"soil_ph": current_ph,
"dominant_gap": dominant,
},
}
result["context_text"] = _fertilization_context_text(result)
return result
+74
View File
@@ -0,0 +1,74 @@
from __future__ import annotations
from rest_framework import serializers
class GrowthSimulationRequestSerializer(serializers.Serializer):
plant_name = serializers.CharField(help_text="نام گیاه")
dynamic_parameters = serializers.ListField(
child=serializers.CharField(),
allow_empty=False,
help_text="پارامترهای متغیر رشد که باید در خروجی گزارش شوند.",
)
farm_uuid = serializers.UUIDField(required=False, allow_null=True)
weather = serializers.JSONField(required=False)
soil_parameters = serializers.JSONField(required=False)
site_parameters = serializers.JSONField(required=False)
crop_parameters = serializers.JSONField(required=False)
agromanagement = serializers.JSONField(required=False)
page_size = serializers.IntegerField(required=False, min_value=1, max_value=50)
def validate(self, attrs):
if not attrs.get("farm_uuid") and not attrs.get("weather"):
raise serializers.ValidationError(
"یکی از farm_uuid یا weather باید ارسال شود."
)
return attrs
class GrowthSimulationQueuedSerializer(serializers.Serializer):
task_id = serializers.CharField()
status_url = serializers.CharField()
plant_name = serializers.CharField()
class GrowthStageMetricSerializer(serializers.Serializer):
start = serializers.FloatField()
end = serializers.FloatField()
min = serializers.FloatField()
max = serializers.FloatField()
avg = serializers.FloatField()
class GrowthStageSerializer(serializers.Serializer):
order = serializers.IntegerField()
stage_code = serializers.CharField()
stage_name = serializers.CharField()
start_date = serializers.DateField()
end_date = serializers.DateField()
days_count = serializers.IntegerField()
metrics = serializers.JSONField()
class GrowthPaginationSerializer(serializers.Serializer):
page = serializers.IntegerField()
page_size = serializers.IntegerField()
total_items = serializers.IntegerField()
total_pages = serializers.IntegerField()
has_next = serializers.BooleanField()
has_previous = serializers.BooleanField()
class GrowthSimulationResultSerializer(serializers.Serializer):
plant_name = serializers.CharField()
dynamic_parameters = serializers.ListField(child=serializers.CharField())
engine = serializers.CharField(allow_null=True)
model_name = serializers.CharField(allow_null=True)
scenario_id = serializers.IntegerField(allow_null=True)
simulation_warning = serializers.CharField(allow_null=True, allow_blank=True)
summary_metrics = serializers.JSONField()
stage_timeline = GrowthStageSerializer(many=True)
stages_page = GrowthStageSerializer(many=True)
pagination = GrowthPaginationSerializer()
daily_records_count = serializers.IntegerField()
default_page_size = serializers.IntegerField()
+240 -10
View File
@@ -93,6 +93,123 @@ def _normalize_agromanagement(agromanagement: Any) -> list[dict[str, Any]]:
return campaigns return campaigns
def _deep_copy_json_like(value: Any) -> Any:
if isinstance(value, dict):
return {key: _deep_copy_json_like(item) for key, item in value.items()}
if isinstance(value, list):
return [_deep_copy_json_like(item) for item in value]
return value
def _parse_recommendation_events(
recommendation: dict[str, Any] | None,
*,
event_signal: str,
amount_keys: tuple[str, ...],
extra_keys: tuple[str, ...],
) -> list[dict[str, Any]]:
if not recommendation:
return []
raw_events = recommendation.get("events")
if raw_events is None:
raw_events = recommendation.get("schedule")
if raw_events is None:
raw_events = recommendation.get("applications")
if raw_events is None:
raw_events = recommendation.get("plan")
if not isinstance(raw_events, list):
return []
events_table = []
for item in raw_events:
if not isinstance(item, dict):
continue
raw_date = item.get("date") or item.get("day")
if raw_date is None:
continue
payload = {}
amount_value = None
amount_key = None
for candidate in amount_keys:
if item.get(candidate) is not None:
amount_value = item.get(candidate)
amount_key = candidate
break
if amount_key is not None:
payload[amount_key] = float(amount_value)
for extra_key in extra_keys:
if item.get(extra_key) is not None:
payload[extra_key] = float(item[extra_key])
if payload:
events_table.append({_coerce_date(raw_date): payload})
if not events_table:
return []
return [
{
"event_signal": event_signal,
"name": recommendation.get("name", f"{event_signal} recommendation"),
"comment": recommendation.get("comment", ""),
"events_table": events_table,
}
]
def _merge_management_recommendations(
agromanagement: Any,
*,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
campaigns = _deep_copy_json_like(_normalize_agromanagement(agromanagement))
irrigation_events = _parse_recommendation_events(
irrigation_recommendation,
event_signal="irrigate",
amount_keys=("amount", "irrigation_amount"),
extra_keys=("efficiency",),
)
fertilization_events = _parse_recommendation_events(
fertilization_recommendation,
event_signal="apply_n",
amount_keys=("N_amount", "amount"),
extra_keys=("N_recovery",),
)
if not irrigation_events and not fertilization_events:
return campaigns
target_campaign = None
for campaign in campaigns:
if isinstance(campaign, dict) and campaign:
target_campaign = campaign
break
if target_campaign is None:
raise CropSimulationError(
"Agromanagement must contain at least one non-empty campaign."
)
campaign_start = next(iter(target_campaign.keys()))
campaign_payload = target_campaign[campaign_start]
if not isinstance(campaign_payload, dict):
raise CropSimulationError("Agromanagement campaign payload must be a dictionary.")
timed_events = campaign_payload.get("TimedEvents")
if timed_events in (None, ""):
timed_events = []
if not isinstance(timed_events, list):
raise CropSimulationError("TimedEvents must be a list when recommendations are merged.")
timed_events.extend(irrigation_events)
timed_events.extend(fertilization_events)
campaign_payload["TimedEvents"] = timed_events
return campaigns
def _normalize_pcse_output_records(records: Any) -> list[dict[str, Any]]: def _normalize_pcse_output_records(records: Any) -> list[dict[str, Any]]:
if records is None: if records is None:
return [] return []
@@ -298,8 +415,15 @@ class CropSimulationService:
crop_parameters: dict[str, Any], crop_parameters: dict[str, Any],
agromanagement: Any, agromanagement: Any,
site_parameters: dict[str, Any] | None = None, site_parameters: dict[str, Any] | None = None,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
name: str = "", name: str = "",
) -> dict[str, Any]: ) -> dict[str, Any]:
merged_agromanagement = _merge_management_recommendations(
agromanagement,
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
scenario = SimulationScenario.objects.create( scenario = SimulationScenario.objects.create(
name=name, name=name,
scenario_type=SimulationScenario.ScenarioType.SINGLE, scenario_type=SimulationScenario.ScenarioType.SINGLE,
@@ -310,7 +434,9 @@ class CropSimulationService:
"soil": soil, "soil": soil,
"crop_parameters": crop_parameters, "crop_parameters": crop_parameters,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"agromanagement": agromanagement, "agromanagement": merged_agromanagement,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
} }
), ),
) )
@@ -322,7 +448,7 @@ class CropSimulationService:
soil_payload=_json_ready(soil), soil_payload=_json_ready(soil),
crop_payload=_json_ready(crop_parameters), crop_payload=_json_ready(crop_parameters),
site_payload=_json_ready(site_parameters or {}), site_payload=_json_ready(site_parameters or {}),
agromanagement_payload=_json_ready(agromanagement), agromanagement_payload=_json_ready(merged_agromanagement),
) )
return self._execute_scenario( return self._execute_scenario(
scenario=scenario, scenario=scenario,
@@ -333,7 +459,7 @@ class CropSimulationService:
"soil": soil, "soil": soil,
"crop_parameters": crop_parameters, "crop_parameters": crop_parameters,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"agromanagement": agromanagement, "agromanagement": merged_agromanagement,
} }
], ],
) )
@@ -347,8 +473,15 @@ class CropSimulationService:
crop_b: dict[str, Any], crop_b: dict[str, Any],
agromanagement: Any, agromanagement: Any,
site_parameters: dict[str, Any] | None = None, site_parameters: dict[str, Any] | None = None,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
name: str = "", name: str = "",
) -> dict[str, Any]: ) -> dict[str, Any]:
merged_agromanagement = _merge_management_recommendations(
agromanagement,
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
scenario = SimulationScenario.objects.create( scenario = SimulationScenario.objects.create(
name=name, name=name,
scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON, scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON,
@@ -360,7 +493,9 @@ class CropSimulationService:
"crop_a": crop_a, "crop_a": crop_a,
"crop_b": crop_b, "crop_b": crop_b,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"agromanagement": agromanagement, "agromanagement": merged_agromanagement,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
} }
), ),
) )
@@ -373,7 +508,7 @@ class CropSimulationService:
soil_payload=_json_ready(soil), soil_payload=_json_ready(soil),
crop_payload=_json_ready(crop_a), crop_payload=_json_ready(crop_a),
site_payload=_json_ready(site_parameters or {}), site_payload=_json_ready(site_parameters or {}),
agromanagement_payload=_json_ready(agromanagement), agromanagement_payload=_json_ready(merged_agromanagement),
), ),
SimulationRun.objects.create( SimulationRun.objects.create(
scenario=scenario, scenario=scenario,
@@ -383,7 +518,7 @@ class CropSimulationService:
soil_payload=_json_ready(soil), soil_payload=_json_ready(soil),
crop_payload=_json_ready(crop_b), crop_payload=_json_ready(crop_b),
site_payload=_json_ready(site_parameters or {}), site_payload=_json_ready(site_parameters or {}),
agromanagement_payload=_json_ready(agromanagement), agromanagement_payload=_json_ready(merged_agromanagement),
), ),
] ]
return self._execute_scenario( return self._execute_scenario(
@@ -395,7 +530,7 @@ class CropSimulationService:
"soil": soil, "soil": soil,
"crop_parameters": crop_a, "crop_parameters": crop_a,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"agromanagement": agromanagement, "agromanagement": merged_agromanagement,
}, },
{ {
"instance": runs[1], "instance": runs[1],
@@ -403,11 +538,92 @@ class CropSimulationService:
"soil": soil, "soil": soil,
"crop_parameters": crop_b, "crop_parameters": crop_b,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"agromanagement": agromanagement, "agromanagement": merged_agromanagement,
}, },
], ],
) )
def recommend_best_crop(
self,
*,
weather: Any,
soil: dict[str, Any],
crops: list[dict[str, Any]],
agromanagement: Any,
site_parameters: dict[str, Any] | None = None,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
name: str = "",
) -> dict[str, Any]:
if len(crops) < 2:
raise CropSimulationError("At least two crop options are required.")
merged_agromanagement = _merge_management_recommendations(
agromanagement,
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
scenario = SimulationScenario.objects.create(
name=name,
scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON,
model_name=self.manager.model_name,
input_payload=_json_ready(
{
"weather": weather,
"soil": soil,
"crops": crops,
"site_parameters": site_parameters or {},
"agromanagement": merged_agromanagement,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
}
),
)
run_specs = []
for index, crop in enumerate(crops, start=1):
label = (
crop.get("label")
or crop.get("crop_name")
or crop.get("name")
or f"crop_{index}"
)
run = SimulationRun.objects.create(
scenario=scenario,
run_key=f"crop_{index}",
label=label,
weather_payload=_json_ready(weather),
soil_payload=_json_ready(soil),
crop_payload=_json_ready(crop),
site_payload=_json_ready(site_parameters or {}),
agromanagement_payload=_json_ready(merged_agromanagement),
)
run_specs.append(
{
"instance": run,
"weather": weather,
"soil": soil,
"crop_parameters": crop,
"site_parameters": site_parameters or {},
"agromanagement": merged_agromanagement,
}
)
result = self._execute_scenario(scenario=scenario, run_specs=run_specs)
comparison = result.get("comparison", {})
return {
"scenario_id": result["scenario_id"],
"scenario_type": result["scenario_type"],
"recommended_crop": {
"run_key": comparison.get("best_run_key"),
"label": comparison.get("best_label"),
"expected_yield_estimate": comparison.get("best_yield_estimate"),
},
"candidates": comparison.get("runs", []),
"raw_result": result,
}
def compare_fertilization_strategies( def compare_fertilization_strategies(
self, self,
*, *,
@@ -416,6 +632,8 @@ class CropSimulationService:
crop_parameters: dict[str, Any], crop_parameters: dict[str, Any],
strategies: list[dict[str, Any]], strategies: list[dict[str, Any]],
site_parameters: dict[str, Any] | None = None, site_parameters: dict[str, Any] | None = None,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
name: str = "", name: str = "",
) -> dict[str, Any]: ) -> dict[str, Any]:
if len(strategies) < 2: if len(strategies) < 2:
@@ -432,11 +650,18 @@ class CropSimulationService:
"crop_parameters": crop_parameters, "crop_parameters": crop_parameters,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"strategies": strategies, "strategies": strategies,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
} }
), ),
) )
run_specs = [] run_specs = []
for index, strategy in enumerate(strategies, start=1): for index, strategy in enumerate(strategies, start=1):
merged_agromanagement = _merge_management_recommendations(
strategy["agromanagement"],
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
run = SimulationRun.objects.create( run = SimulationRun.objects.create(
scenario=scenario, scenario=scenario,
run_key=f"strategy_{index}", run_key=f"strategy_{index}",
@@ -445,7 +670,7 @@ class CropSimulationService:
soil_payload=_json_ready(soil), soil_payload=_json_ready(soil),
crop_payload=_json_ready(crop_parameters), crop_payload=_json_ready(crop_parameters),
site_payload=_json_ready(site_parameters or {}), site_payload=_json_ready(site_parameters or {}),
agromanagement_payload=_json_ready(strategy["agromanagement"]), agromanagement_payload=_json_ready(merged_agromanagement),
) )
run_specs.append( run_specs.append(
{ {
@@ -454,7 +679,7 @@ class CropSimulationService:
"soil": soil, "soil": soil,
"crop_parameters": crop_parameters, "crop_parameters": crop_parameters,
"site_parameters": site_parameters or {}, "site_parameters": site_parameters or {},
"agromanagement": strategy["agromanagement"], "agromanagement": merged_agromanagement,
} }
) )
return self._execute_scenario(scenario=scenario, run_specs=run_specs) return self._execute_scenario(scenario=scenario, run_specs=run_specs)
@@ -598,6 +823,11 @@ def compare_crops(**kwargs) -> dict[str, Any]:
return CropSimulationService().compare_crops(**kwargs) return CropSimulationService().compare_crops(**kwargs)
@transaction.atomic
def recommend_best_crop(**kwargs) -> dict[str, Any]:
return CropSimulationService().recommend_best_crop(**kwargs)
@transaction.atomic @transaction.atomic
def compare_fertilization_strategies(**kwargs) -> dict[str, Any]: def compare_fertilization_strategies(**kwargs) -> dict[str, Any]:
return CropSimulationService().compare_fertilization_strategies(**kwargs) return CropSimulationService().compare_fertilization_strategies(**kwargs)
+13
View File
@@ -0,0 +1,13 @@
from __future__ import annotations
from config.celery import app
from .growth_simulation import run_growth_simulation
@app.task(bind=True)
def run_growth_simulation_task(self, payload: dict) -> dict:
return run_growth_simulation(
payload,
progress_callback=self.update_state,
)
@@ -0,0 +1,145 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import patch
from django.test import TestCase, override_settings
from rest_framework.test import APIClient
from plant.models import Plant
from .growth_simulation import paginate_growth_stages, run_growth_simulation
@override_settings(ROOT_URLCONF="crop_simulation.urls")
class PlantGrowthSimulationApiTests(TestCase):
def setUp(self):
self.client = APIClient()
self.plant = Plant.objects.create(
name="گوجه‌فرنگی",
growth_profile={
"base_temperature": 10,
"required_gdd_for_maturity": 1200,
"current_cumulative_gdd": 50,
},
)
self.weather = [
{
"DAY": "2026-04-01",
"LAT": 35.7,
"LON": 51.4,
"TMIN": 12,
"TMAX": 24,
"RAIN": 0.0,
"ET0": 0.32,
},
{
"DAY": "2026-04-02",
"LAT": 35.7,
"LON": 51.4,
"TMIN": 13,
"TMAX": 25,
"RAIN": 0.0,
"ET0": 0.34,
},
{
"DAY": "2026-04-03",
"LAT": 35.7,
"LON": 51.4,
"TMIN": 14,
"TMAX": 27,
"RAIN": 1.0,
"ET0": 0.36,
},
]
def test_run_growth_simulation_returns_stage_timeline(self):
result = run_growth_simulation(
{
"plant_name": self.plant.name,
"dynamic_parameters": ["DVS", "LAI", "TAGP"],
"weather": self.weather,
"soil_parameters": {"SMFCF": 0.34, "SMW": 0.14, "RDMSOL": 120.0},
"site_parameters": {"WAV": 40.0},
"page_size": 2,
}
)
self.assertEqual(result["plant_name"], self.plant.name)
self.assertGreaterEqual(result["daily_records_count"], 3)
self.assertTrue(result["stage_timeline"])
self.assertEqual(result["pagination"]["page_size"], 2)
@patch("crop_simulation.views.run_growth_simulation_task.delay")
def test_queue_api_returns_task_id(self, mock_delay):
mock_delay.return_value = SimpleNamespace(id="growth-task-1")
response = self.client.post(
"/growth/",
data={
"plant_name": self.plant.name,
"dynamic_parameters": ["DVS", "LAI"],
"weather": self.weather,
},
format="json",
)
self.assertEqual(response.status_code, 202)
self.assertEqual(response.json()["data"]["task_id"], "growth-task-1")
@patch("crop_simulation.views._get_async_result")
def test_status_api_returns_paginated_stages(self, mock_get_async_result):
stage_timeline = [
{
"order": 1,
"stage_code": "establishment",
"stage_name": "استقرار",
"start_date": "2026-04-01",
"end_date": "2026-04-02",
"days_count": 2,
"metrics": {"DVS": {"start": 0.1, "end": 0.2, "min": 0.1, "max": 0.2, "avg": 0.15}},
},
{
"order": 2,
"stage_code": "vegetative",
"stage_name": "رشد رویشی",
"start_date": "2026-04-03",
"end_date": "2026-04-05",
"days_count": 3,
"metrics": {"DVS": {"start": 0.3, "end": 0.8, "min": 0.3, "max": 0.8, "avg": 0.55}},
},
{
"order": 3,
"stage_code": "flowering",
"stage_name": "گلدهی",
"start_date": "2026-04-06",
"end_date": "2026-04-07",
"days_count": 2,
"metrics": {"DVS": {"start": 1.0, "end": 1.2, "min": 1.0, "max": 1.2, "avg": 1.1}},
},
]
mock_get_async_result.return_value = SimpleNamespace(
state="SUCCESS",
result={
"plant_name": self.plant.name,
"dynamic_parameters": ["DVS"],
"engine": "growth_projection",
"model_name": "growth_projection_v1",
"scenario_id": None,
"simulation_warning": None,
"summary_metrics": {},
"stage_timeline": stage_timeline,
"stages_page": stage_timeline[:1],
"pagination": paginate_growth_stages(stage_timeline, page=1, page_size=1)["pagination"],
"daily_records_count": 7,
"default_page_size": 1,
},
)
response = self.client.get("/growth/growth-task-1/status/?page=2&page_size=1")
self.assertEqual(response.status_code, 200)
payload = response.json()["data"]["result"]
self.assertEqual(payload["pagination"]["page"], 2)
self.assertEqual(len(payload["stages_page"]), 1)
self.assertEqual(payload["stages_page"][0]["stage_code"], "vegetative")
@@ -0,0 +1,76 @@
import importlib.util
import os
import sqlite3
import tempfile
from collections import namedtuple
from unittest import skipUnless
from django.test import TestCase
from crop_simulation.services import CropSimulationService, PcseSimulationManager
@skipUnless(
importlib.util.find_spec("pcse") is not None,
"pcse must be installed to run the real WOFOST test.",
)
class CropSimulationSingleRunWithRecommendationsTest(TestCase):
def test_single_simulation_with_irrigation_and_fertilization_recommendations(self):
os.environ["HOME"] = tempfile.mkdtemp(prefix="pcse-home-", dir="/tmp")
from pcse import settings as pcse_settings
from pcse.tests.db_input import (
AgroManagementDataProvider,
GridWeatherDataProvider,
fetch_cropdata,
fetch_sitedata,
fetch_soildata,
)
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
db_path = os.path.join(pcse_settings.PCSE_USER_HOME, "pcse.db")
connection = sqlite3.connect(db_path)
connection.row_factory = namedtuple_factory
grid = int(os.environ.get("PCSE_TEST_GRID", "31031"))
crop_no = int(os.environ.get("PCSE_TEST_CROP_NO", "1"))
year = int(os.environ.get("PCSE_TEST_YEAR", "2000"))
weather = GridWeatherDataProvider(connection, grid_no=grid).export()
soil = fetch_soildata(connection, grid)
site = fetch_sitedata(connection, grid, year)
crop_parameters = fetch_cropdata(connection, grid, year, crop_no)
agromanagement = AgroManagementDataProvider(connection, grid, crop_no, year)
response = CropSimulationService(
manager=PcseSimulationManager(model_name="Wofost72_WLP_CWB")
).run_single_simulation(
weather=weather,
soil=soil,
crop_parameters=crop_parameters,
agromanagement=agromanagement,
site_parameters=site,
irrigation_recommendation={
"events": [
{"date": "2000-02-10", "amount": 2.5, "efficiency": 0.8},
{"date": "2000-03-05", "amount": 3.0, "efficiency": 0.8},
]
},
fertilization_recommendation={
"events": [
{"date": "2000-02-15", "N_amount": 30, "N_recovery": 0.7},
{"date": "2000-03-01", "N_amount": 20, "N_recovery": 0.7},
]
},
name="single real wofost run with recommendations",
)
connection.close()
print("\nCrop Simulation Response With Recommendations:\n", response)
self.assertEqual(response["result"]["engine"], "pcse")
self.assertIsNotNone(response["result"]["metrics"]["yield_estimate"])
self.assertIsNotNone(response["result"]["metrics"]["biomass"])
+113
View File
@@ -110,6 +110,119 @@ class CropSimulationServiceTests(TestCase):
site_parameters=self.site, site_parameters=self.site,
) )
def test_recommend_best_crop_returns_best_candidate(self):
with patch.object(
self.service.manager,
"run_simulation",
side_effect=[
{
"engine": "pcse",
"model_name": "Wofost72_WLP_CWB",
"metrics": {
"yield_estimate": 5200.0,
"biomass": 9800.0,
"max_lai": 4.1,
},
"daily_output": [],
"summary_output": [],
"terminal_output": [],
},
{
"engine": "pcse",
"model_name": "Wofost72_WLP_CWB",
"metrics": {
"yield_estimate": 6100.0,
"biomass": 11000.0,
"max_lai": 4.4,
},
"daily_output": [],
"summary_output": [],
"terminal_output": [],
},
],
):
result = self.service.recommend_best_crop(
weather=self.weather,
soil=self.soil,
crops=[
{"crop_name": "wheat", "label": "wheat", "TSUM1": 800},
{"crop_name": "maize", "label": "maize", "TSUM1": 900},
],
agromanagement=build_agromanagement(),
site_parameters=self.site,
name="best crop recommendation",
)
self.assertEqual(result["recommended_crop"]["label"], "maize")
self.assertEqual(result["recommended_crop"]["expected_yield_estimate"], 6100.0)
self.assertEqual(len(result["candidates"]), 2)
def test_recommend_best_crop_requires_two_options(self):
with self.assertRaises(CropSimulationError):
self.service.recommend_best_crop(
weather=self.weather,
soil=self.soil,
crops=[{"crop_name": "wheat", "TSUM1": 800}],
agromanagement=build_agromanagement(),
site_parameters=self.site,
)
def test_run_single_simulation_merges_irrigation_and_fertilization_recommendations(self):
captured = {}
def fake_run_simulation(**kwargs):
captured.update(kwargs)
return {
"engine": "pcse",
"model_name": "Wofost72_WLP_CWB",
"metrics": {
"yield_estimate": 5400.0,
"biomass": 9800.0,
"max_lai": 4.2,
},
"daily_output": [],
"summary_output": [],
"terminal_output": [],
}
with patch.object(self.service.manager, "run_simulation", side_effect=fake_run_simulation):
self.service.run_single_simulation(
weather=self.weather,
soil=self.soil,
crop_parameters=self.crop,
agromanagement=build_agromanagement(),
site_parameters=self.site,
irrigation_recommendation={
"events": [
{
"date": "2026-04-25",
"amount": 2.5,
"efficiency": 0.8,
}
]
},
fertilization_recommendation={
"events": [
{
"date": "2026-04-20",
"N_amount": 45,
"N_recovery": 0.7,
}
]
},
name="managed run",
)
timed_events = captured["agromanagement"][0][date(2026, 4, 1)]["TimedEvents"]
self.assertEqual(len(timed_events), 3)
self.assertEqual(timed_events[1]["event_signal"], "irrigate")
self.assertEqual(timed_events[1]["events_table"][0][date(2026, 4, 25)]["amount"], 2.5)
self.assertEqual(timed_events[2]["event_signal"], "apply_n")
self.assertEqual(
timed_events[2]["events_table"][0][date(2026, 4, 20)]["N_amount"],
45.0,
)
def test_raises_clear_error_when_pcse_is_unavailable(self): def test_raises_clear_error_when_pcse_is_unavailable(self):
with patch("crop_simulation.services._load_pcse_bindings", return_value=None): with patch("crop_simulation.services._load_pcse_bindings", return_value=None):
with self.assertRaisesMessage( with self.assertRaisesMessage(
+13
View File
@@ -0,0 +1,13 @@
from django.urls import path
from .views import PlantGrowthSimulationStatusView, PlantGrowthSimulationView
urlpatterns = [
path("growth/", PlantGrowthSimulationView.as_view(), name="growth-simulation"),
path(
"growth/<str:task_id>/status/",
PlantGrowthSimulationStatusView.as_view(),
name="growth-simulation-status",
),
]
+175
View File
@@ -0,0 +1,175 @@
from __future__ import annotations
from drf_spectacular.utils import OpenApiExample, extend_schema
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from config.openapi import (
build_envelope_serializer,
build_response,
build_task_status_data_serializer,
)
from .growth_simulation import MAX_PAGE_SIZE, paginate_growth_stages
from .serializers import (
GrowthSimulationQueuedSerializer,
GrowthSimulationRequestSerializer,
GrowthSimulationResultSerializer,
)
from .tasks import run_growth_simulation_task
GrowthSimulationQueuedResponseSerializer = build_envelope_serializer(
"GrowthSimulationQueuedResponseSerializer",
GrowthSimulationQueuedSerializer,
)
GrowthSimulationStatusResponseSerializer = build_envelope_serializer(
"GrowthSimulationStatusResponseSerializer",
build_task_status_data_serializer(
"GrowthSimulationTaskStatusDataSerializer",
GrowthSimulationResultSerializer,
),
)
GrowthSimulationErrorSerializer = build_envelope_serializer(
"GrowthSimulationErrorSerializer",
data_required=False,
allow_null=True,
)
def _get_async_result(task_id: str):
from celery.result import AsyncResult
return AsyncResult(task_id)
def _coerce_positive_int(value, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return max(parsed, 1)
class PlantGrowthSimulationView(APIView):
@extend_schema(
tags=["Crop Simulation"],
summary="شروع شبیه سازی رشد گیاه",
description=(
"نوع گیاه و پارامترهای متغیر رشد را می گیرد، "
"شبیه سازی را داخل Celery اجرا می کند و فقط task_id برمی گرداند."
),
request=GrowthSimulationRequestSerializer,
responses={
202: build_response(
GrowthSimulationQueuedResponseSerializer,
"تسک شبیه سازی رشد گیاه در صف قرار گرفت.",
),
400: build_response(
GrowthSimulationErrorSerializer,
"داده ورودی نامعتبر است.",
),
},
examples=[
OpenApiExample(
"نمونه درخواست با weather مستقیم",
value={
"plant_name": "گوجه‌فرنگی",
"dynamic_parameters": ["DVS", "LAI", "TAGP", "TWSO", "SM"],
"weather": [
{
"DAY": "2026-04-01",
"LAT": 35.7,
"LON": 51.4,
"TMIN": 12,
"TMAX": 24,
"RAIN": 0.0,
"ET0": 0.32,
}
],
"soil_parameters": {"SMFCF": 0.34, "SMW": 0.14, "RDMSOL": 120.0},
"site_parameters": {"WAV": 40.0},
"page_size": 2,
},
request_only=True,
),
OpenApiExample(
"نمونه درخواست با farm",
value={
"plant_name": "گوجه‌فرنگی",
"dynamic_parameters": ["DVS", "LAI", "TAGP"],
"farm_uuid": "550e8400-e29b-41d4-a716-446655440000",
},
request_only=True,
),
],
)
def post(self, request):
serializer = GrowthSimulationRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"code": 400, "msg": "داده نامعتبر.", "data": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
task = run_growth_simulation_task.delay(serializer.validated_data)
return Response(
{
"code": 202,
"msg": "تسک شبیه سازی رشد در صف قرار گرفت.",
"data": {
"task_id": task.id,
"status_url": f"/api/crop-simulation/growth/{task.id}/status/",
"plant_name": serializer.validated_data["plant_name"],
},
},
status=status.HTTP_202_ACCEPTED,
)
class PlantGrowthSimulationStatusView(APIView):
@extend_schema(
tags=["Crop Simulation"],
summary="وضعیت شبیه سازی رشد گیاه",
description="وضعیت تسک Celery را برمی گرداند و در صورت موفقیت مراحل رشد را به صورت صفحه بندی شده بازمی گرداند.",
responses={
200: build_response(
GrowthSimulationStatusResponseSerializer,
"وضعیت فعلی تسک شبیه سازی رشد گیاه.",
)
},
)
def get(self, request, task_id: str):
result = _get_async_result(task_id)
payload = {"task_id": task_id, "status": result.state}
if result.state == "PENDING":
payload["message"] = "تسک در صف یا یافت نشد."
elif result.state == "PROGRESS":
payload["progress"] = result.info
elif result.state == "SUCCESS":
task_result = dict(result.result or {})
page = _coerce_positive_int(request.query_params.get("page", 1), 1)
page_size = min(
_coerce_positive_int(
request.query_params.get("page_size", task_result.get("default_page_size", 10)),
10,
),
MAX_PAGE_SIZE,
)
paginated = paginate_growth_stages(
task_result.get("stage_timeline", []),
page=page,
page_size=page_size,
)
task_result["stages_page"] = paginated["items"]
task_result["pagination"] = paginated["pagination"]
payload["result"] = task_result
elif result.state == "FAILURE":
payload["error"] = str(result.result)
return Response(
{"code": 200, "msg": "success", "data": payload},
status=status.HTTP_200_OK,
)
+73
View File
@@ -1,3 +1,5 @@
from functools import cached_property
from django.apps import AppConfig from django.apps import AppConfig
@@ -5,3 +7,74 @@ class FertilizationConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField" default_auto_field = "django.db.models.BigAutoField"
name = "fertilization" name = "fertilization"
verbose_name = "Fertilization" verbose_name = "Fertilization"
tone_file = "config/tones/fertilization_tone.txt"
@cached_property
def optimizer_defaults(self):
return {
"validity_days": 7,
"rain_delay_threshold_mm": 3.0,
"stage_targets": {
"initial": {
"n": 28.0,
"p": 20.0,
"k": 24.0,
"formula": "10-52-10",
"application_method": "استارتر نواری یا همراه آب آبیاری",
"timing": "همزمان با استقرار بوته و در ساعات خنک روز",
},
"vegetative": {
"n": 55.0,
"p": 28.0,
"k": 42.0,
"formula": "20-20-20",
"application_method": "کودآبیاری یا سرک خاکی سبک",
"timing": "صبح زود و ترجیحا قبل از نوبت آبیاری",
},
"flowering": {
"n": 42.0,
"p": 32.0,
"k": 58.0,
"formula": "15-10-30",
"application_method": "کودآبیاری یا محلول پاشی سبک",
"timing": "صبح زود و دور از تنش گرمایی ظهر",
},
"fruiting": {
"n": 35.0,
"p": 24.0,
"k": 68.0,
"formula": "12-12-36",
"application_method": "کودآبیاری با تاکید بر پتاس",
"timing": "صبح زود یا نزدیک غروب",
},
},
"strategy_profiles": [
{
"code": "maintenance",
"label": "تغذیه نگهدارنده",
"multiplier": 0.8,
"focus": "پایه متعادل",
"application_method": "کودآبیاری",
"formula_override": "",
},
{
"code": "balanced",
"label": "تغذیه متعادل",
"multiplier": 1.0,
"focus": "ازت فسفر پتاس متعادل",
"application_method": "کودآبیاری",
"formula_override": "",
},
{
"code": "corrective",
"label": "تغذیه اصلاحی",
"multiplier": 1.2,
"focus": "ازت و پتاس اصلاحی",
"application_method": "کودآبیاری",
"formula_override": "",
},
],
}
def get_optimizer_defaults(self):
return self.optimizer_defaults
+43
View File
@@ -1,3 +1,5 @@
from functools import cached_property
from django.apps import AppConfig from django.apps import AppConfig
@@ -5,3 +7,44 @@ class IrrigationConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField" default_auto_field = "django.db.models.BigAutoField"
name = "irrigation" name = "irrigation"
verbose_name = "Irrigation" verbose_name = "Irrigation"
tone_file = "config/tones/irrigation_tone.txt"
@cached_property
def optimizer_defaults(self):
return {
"validity_days": 3,
"minimum_event_mm": 4.0,
"significant_rain_threshold_mm": 4.0,
"stage_targets": {
"initial": 65,
"vegetative": 70,
"flowering": 75,
"fruiting": 68,
},
"strategy_profiles": [
{
"code": "conservative",
"label": "آبیاری محافظه کارانه",
"multiplier": 0.82,
"frequency_factor": 0.85,
"event_count": 2,
},
{
"code": "balanced",
"label": "آبیاری متعادل",
"multiplier": 1.0,
"frequency_factor": 1.0,
"event_count": 3,
},
{
"code": "protective",
"label": "آبیاری حمایتی",
"multiplier": 1.12,
"frequency_factor": 1.2,
"event_count": 4,
},
],
}
def get_optimizer_defaults(self):
return self.optimizer_defaults
+184 -28
View File
@@ -5,6 +5,8 @@
import json import json
import logging import logging
from django.apps import apps
from farm_data.models import SensorData from farm_data.models import SensorData
from rag.api_provider import get_chat_client from rag.api_provider import get_chat_client
from rag.chat import ( from rag.chat import (
@@ -23,23 +25,153 @@ KB_NAME = "fertilization"
SERVICE_ID = "fertilization" SERVICE_ID = "fertilization"
DEFAULT_FERTILIZATION_PROMPT = ( DEFAULT_FERTILIZATION_PROMPT = (
"بر اساس دادههای خاک (NPK، pH)، مشخصات گیاه، مرحله رشد و پایگاه دانش کودهی، " "از داده های خاک، مرحله رشد و خروجی بهینه ساز شبیه سازی برای ساخت توصیه کودهی استفاده کن. "
"یک توصیه کودهی دقیق بده. " "اگر بلوک [خروجی بهینه ساز شبیه سازی] وجود داشت، همان را مرجع اصلی فرمول، مقدار، روش مصرف و اعتبار قرار بده. "
"پاسخ حتماً به فرمت JSON با ساختار زیر باشد:\n" "پاسخ فقط JSON معتبر با کلید sections باشد."
'{\n'
' "plan": {\n'
' "npkRatio": "<str>",\n'
' "amountPerHectare": "<str>",\n'
' "applicationMethod": "<str>",\n'
' "applicationInterval": "<str>",\n'
' "reasoning": "<str>"\n'
' }\n'
'}\n'
"فقط JSON خروجی بده، بدون توضیح اضافی. "
"مقادیر را بر اساس شرایط واقعی خاک و گیاه محاسبه کن."
) )
def _get_optimizer():
return apps.get_app_config("crop_simulation").get_recommendation_optimizer()
def _unique_items(items: list[str]) -> list[str]:
seen = set()
output = []
for item in items:
normalized = (item or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
output.append(normalized)
return output
def _find_section(sections: list[dict], section_type: str) -> dict | None:
for section in sections:
if isinstance(section, dict) and section.get("type") == section_type:
return section
return None
def _build_fertilization_fallback(*, optimized_result: dict | None) -> dict:
if optimized_result:
recommended = optimized_result["recommended_strategy"]
list_items = [
f"دوز پیشنهادی: {recommended['amount_kg_per_ha']} کیلوگرم در هکتار",
f"روش مصرف: {recommended['application_method']}",
f"پنجره اجرا: {recommended['validity_period']}",
]
warning_text = "قبل از اختلاط یا محلول سازی، سازگاری کود با آب و شرایط مزرعه بررسی شود."
return {
"sections": [
{
"type": "recommendation",
"title": "برنامه کودهی بهینه",
"icon": "leaf",
"content": (
f"سناریوی {recommended['label']} برای این مزرعه مناسب تر ارزیابی شد."
),
"fertilizerType": recommended["fertilizer_type"],
"amount": f"{recommended['amount_kg_per_ha']} کیلوگرم در هکتار",
"applicationMethod": recommended["application_method"],
"timing": recommended["timing"],
"validityPeriod": recommended["validity_period"],
"expandableExplanation": " ".join(recommended.get("reasoning", [])),
},
{
"type": "list",
"title": "نکات اجرایی و اختلاط",
"icon": "list",
"items": _unique_items(list_items),
},
{
"type": "warning",
"title": "هشدار کودهی",
"icon": "alert-triangle",
"content": warning_text,
},
]
}
return {
"sections": [
{
"type": "recommendation",
"title": "برنامه کودهی پیشنهادی",
"icon": "leaf",
"content": "پیشنهاد کودهی بر اساس داده های فعلی با قطعیت متوسط آماده شده است.",
"fertilizerType": "کود کامل متعادل",
"amount": "پس از پایش دوباره عناصر اصلی تعیین شود",
"applicationMethod": "ترجیحا همراه آب آبیاری",
"timing": "صبح زود",
"validityPeriod": "معتبر برای 5 روز آینده",
"expandableExplanation": "به دلیل محدود بودن داده های تغذیه ای، تصمیم نهایی باید با پایش مجدد تکمیل شود.",
},
{
"type": "list",
"title": "نکات اجرایی و اختلاط",
"icon": "list",
"items": [
"قبل از مصرف، EC و pH محلول بررسی شود.",
"در صورت مشاهده بارش موثر، زمان مصرف بازبینی شود.",
],
},
{
"type": "warning",
"title": "هشدار کودهی",
"icon": "alert-triangle",
"content": "بدون بررسی دوباره مزرعه از مصرف سنگین کود خودداری شود.",
},
]
}
def _merge_fertilization_response(
*,
parsed_result: dict,
optimized_result: dict | None,
) -> dict:
fallback = _build_fertilization_fallback(optimized_result=optimized_result)
if not isinstance(parsed_result, dict):
return fallback
sections = parsed_result.get("sections")
if not isinstance(sections, list):
return fallback
recommendation = _find_section(sections, "recommendation") or {}
list_section = _find_section(sections, "list") or {}
warning_section = _find_section(sections, "warning") or {}
fallback_recommendation = fallback["sections"][0]
fallback_list = fallback["sections"][1]
fallback_warning = fallback["sections"][2]
merged_recommendation = {**recommendation, **fallback_recommendation}
merged_recommendation["content"] = recommendation.get("content") or fallback_recommendation["content"]
merged_recommendation["title"] = recommendation.get("title") or fallback_recommendation["title"]
merged_recommendation["expandableExplanation"] = (
recommendation.get("expandableExplanation")
or fallback_recommendation["expandableExplanation"]
)
merged_list = {
**fallback_list,
**list_section,
"items": _unique_items(
list(list_section.get("items", [])) + list(fallback_list["items"])
)[:5],
}
merged_warning = {
**fallback_warning,
**warning_section,
"content": warning_section.get("content") or fallback_warning["content"],
}
return {"sections": [merged_recommendation, merged_list, merged_warning]}
def get_fertilization_recommendation( def get_fertilization_recommendation(
sensor_uuid: str, sensor_uuid: str,
plant_name: str | None = None, plant_name: str | None = None,
@@ -80,15 +212,38 @@ def get_fertilization_recommendation(
user_query = query or "توصیه کودهی برای مزرعه من چیست؟" user_query = query or "توصیه کودهی برای مزرعه من چیست؟"
sensor = ( sensor = (
SensorData.objects.prefetch_related("plants") SensorData.objects.select_related("center_location")
.prefetch_related("plants")
.filter(farm_uuid=sensor_uuid) .filter(farm_uuid=sensor_uuid)
.first() .first()
) )
resolved_plant_name = plant_name resolved_plant_name = plant_name
plant = None
if not resolved_plant_name and sensor is not None: if not resolved_plant_name and sensor is not None:
plant = sensor.plants.first() plant = sensor.plants.first()
if plant is not None: if plant is not None:
resolved_plant_name = plant.name resolved_plant_name = plant.name
elif sensor is not None and plant_name:
plant = sensor.plants.filter(name=plant_name).first() or sensor.plants.first()
forecasts = []
optimized_result = None
if sensor is not None and getattr(sensor, "center_location", None) is not None:
from weather.models import WeatherForecast
forecasts = list(
WeatherForecast.objects.filter(
location=sensor.center_location,
forecast_date__isnull=False,
).order_by("forecast_date")[:7]
)
if sensor is not None and plant is not None:
optimized_result = _get_optimizer().optimize_fertilization(
sensor=sensor,
plant=plant,
forecasts=forecasts,
growth_stage=growth_stage,
)
context = build_rag_context( context = build_rag_context(
user_query, sensor_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID, user_query, sensor_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID,
@@ -99,6 +254,11 @@ def get_fertilization_recommendation(
plant_text = build_plant_text(resolved_plant_name, growth_stage) plant_text = build_plant_text(resolved_plant_name, growth_stage)
if plant_text: if plant_text:
extra_parts.append("[اطلاعات گیاه]\n" + plant_text) extra_parts.append("[اطلاعات گیاه]\n" + plant_text)
if optimized_result is not None:
extra_parts.append(
"[خروجی بهینه ساز شبیه سازی]\n"
+ optimized_result["context_text"]
)
if extra_parts: if extra_parts:
context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "") context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "")
@@ -132,14 +292,9 @@ def get_fertilization_recommendation(
raw = response.choices[0].message.content.strip() raw = response.choices[0].message.content.strip()
except Exception as exc: except Exception as exc:
logger.error("Fertilization recommendation error for %s: %s", sensor_uuid, exc) logger.error("Fertilization recommendation error for %s: %s", sensor_uuid, exc)
result = { result = _build_fertilization_fallback(optimized_result=optimized_result)
"fertilizer_needed": None, result["error"] = f"خطا در دریافت توصیه: {exc}"
"fertilizer_type": None, result["raw_response"] = None
"amount_kg_per_hectare": None,
"reason": f"خطا در دریافت توصیه: {exc}",
"npk_status": None,
"raw_response": None,
}
_fail_audit_log( _fail_audit_log(
audit_log, audit_log,
str(exc), str(exc),
@@ -153,13 +308,14 @@ def get_fertilization_recommendation(
cleaned = cleaned.strip("`").removeprefix("json").strip() cleaned = cleaned.strip("`").removeprefix("json").strip()
result = json.loads(cleaned) result = json.loads(cleaned)
except (json.JSONDecodeError, ValueError): except (json.JSONDecodeError, ValueError):
result = { result = {}
"plan": {
"reasoning": raw,
},
}
result = _merge_fertilization_response(
parsed_result=result,
optimized_result=optimized_result,
)
result["raw_response"] = raw result["raw_response"] = raw
result["simulation_optimizer"] = optimized_result
_complete_audit_log( _complete_audit_log(
audit_log, audit_log,
json.dumps(result, ensure_ascii=False, default=str), json.dumps(result, ensure_ascii=False, default=str),
+191 -26
View File
@@ -5,6 +5,7 @@
import json import json
import logging import logging
from django.apps import apps
from django.db import transaction from django.db import transaction
from irrigation.models import IrrigationMethod from irrigation.models import IrrigationMethod
from irrigation.evapotranspiration import calculate_forecast_water_needs, resolve_crop_profile, resolve_kc from irrigation.evapotranspiration import calculate_forecast_water_needs, resolve_crop_profile, resolve_kc
@@ -27,23 +28,171 @@ KB_NAME = "irrigation"
SERVICE_ID = "irrigation" SERVICE_ID = "irrigation"
DEFAULT_IRRIGATION_PROMPT = ( DEFAULT_IRRIGATION_PROMPT = (
"بر اساس محاسبات نهایی تبخیر-تعرق و نیاز آبی که در ورودی آمده، " "از محاسبات FAO-56 و خروجی بهینه ساز شبیه سازی برای ساخت توصیه آبیاری استفاده کن. "
"یک برنامه آبیاری قابل‌فهم برای کشاورز تولید کن. " "اگر بلوک [خروجی بهینه ساز شبیه سازی] وجود داشت، همان را مرجع اصلی اعداد قرار بده. "
"پاسخ حتماً به فرمت JSON با ساختار زیر باشد:\n" "پاسخ فقط JSON معتبر با کلید sections باشد و عدد جدید متناقض نساز."
'{\n'
' "plan": {\n'
' "frequencyPerWeek": <int>,\n'
' "durationMinutes": <int>,\n'
' "bestTimeOfDay": "<str>",\n'
' "moistureLevel": <int>,\n'
' "warning": "<str>"\n'
' }\n'
'}\n'
"فقط JSON خروجی بده، بدون توضیح اضافی. "
"از انجام هرگونه محاسبه عددی جدید خودداری کن و فقط از داده‌های ساختاریافته ورودی استفاده کن."
) )
def _get_optimizer():
return apps.get_app_config("crop_simulation").get_recommendation_optimizer()
def _unique_items(items: list[str]) -> list[str]:
seen = set()
output = []
for item in items:
normalized = (item or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
output.append(normalized)
return output
def _find_section(sections: list[dict], section_type: str) -> dict | None:
for section in sections:
if isinstance(section, dict) and section.get("type") == section_type:
return section
return None
def _build_irrigation_fallback(
*,
optimized_result: dict | None,
daily_water_needs: list[dict],
) -> dict:
if optimized_result:
recommended = optimized_result["recommended_strategy"]
content = (
f"{recommended['events']} نوبت آبیاری با حدود "
f"{recommended['amount_per_event_mm']} میلی متر در هر نوبت اجرا شود."
)
list_items = [
f"در بازه اعتبار حدود {recommended['total_irrigation_mm']} میلی متر آب توزیع شود.",
f"نوبت های پیشنهادی: {', '.join(recommended['event_dates']) or 'بر اساس پایش روزانه مزرعه'}",
f"رطوبت خاک نزدیک {recommended['moisture_target_percent']} درصد نگه داشته شود.",
]
warning = optimized_result.get("alternatives", [])
warning_text = (
f"اگر شرایط مزرعه تغییر کرد، سناریوی جایگزین {warning[0]['label']} هم قابل بررسی است."
if warning
else "در صورت تغییر ناگهانی بارش یا باد، برنامه را دوباره ارزیابی کنید."
)
explanation = " ".join(recommended.get("reasoning", []))
return {
"sections": [
{
"type": "recommendation",
"title": "برنامه آبیاری بهینه",
"icon": "droplet",
"content": content,
"frequency": f"{recommended['events']} نوبت در بازه اعتبار",
"amount": (
f"{recommended['amount_per_event_mm']} میلی متر در هر نوبت "
f"(جمع کل {recommended['total_irrigation_mm']} میلی متر)"
),
"timing": recommended["timing"],
"validityPeriod": recommended["validity_period"],
"expandableExplanation": explanation,
},
{
"type": "list",
"title": "اقدامات اجرایی",
"icon": "list",
"items": _unique_items(list_items),
},
{
"type": "warning",
"title": "هشدار آبیاری",
"icon": "alert-triangle",
"content": warning_text,
},
]
}
total_mm = round(sum(float(item.get("gross_irrigation_mm", 0.0) or 0.0) for item in daily_water_needs), 2)
return {
"sections": [
{
"type": "recommendation",
"title": "برنامه آبیاری پیشنهادی",
"icon": "droplet",
"content": "پایش رطوبت خاک ادامه پیدا کند و برنامه آبیاری بر اساس نیاز روزانه تنظیم شود.",
"frequency": "بر اساس پایش روزانه",
"amount": f"جمع نیاز تخمینی این بازه حدود {total_mm} میلی متر است.",
"timing": "اوایل صبح یا نزدیک غروب",
"validityPeriod": "معتبر برای 3 روز آینده",
"expandableExplanation": "به دلیل محدود بودن داده ها، توصیه با اتکا به نیاز آبی روزانه ساخته شده است.",
},
{
"type": "list",
"title": "اقدامات اجرایی",
"icon": "list",
"items": [
"قبل از هر نوبت آبیاری رطوبت خاک سطحی را دوباره بررسی کنید.",
"اگر بارش موثر رخ داد، نوبت بعدی را به تعویق بیندازید.",
],
},
{
"type": "warning",
"title": "هشدار آبیاری",
"icon": "alert-triangle",
"content": "با تغییر دما یا بارش پیش بینی شده، برنامه فعلی باید بازبینی شود.",
},
]
}
def _merge_irrigation_response(
*,
parsed_result: dict,
optimized_result: dict | None,
daily_water_needs: list[dict],
) -> dict:
fallback = _build_irrigation_fallback(
optimized_result=optimized_result,
daily_water_needs=daily_water_needs,
)
if not isinstance(parsed_result, dict):
return fallback
sections = parsed_result.get("sections")
if not isinstance(sections, list):
return fallback
recommendation = _find_section(sections, "recommendation") or {}
list_section = _find_section(sections, "list") or {}
warning_section = _find_section(sections, "warning") or {}
fallback_recommendation = fallback["sections"][0]
fallback_list = fallback["sections"][1]
fallback_warning = fallback["sections"][2]
merged_recommendation = {**recommendation, **fallback_recommendation}
merged_recommendation["expandableExplanation"] = (
recommendation.get("expandableExplanation")
or fallback_recommendation["expandableExplanation"]
)
merged_recommendation["content"] = recommendation.get("content") or fallback_recommendation["content"]
merged_recommendation["title"] = recommendation.get("title") or fallback_recommendation["title"]
merged_list = {
**fallback_list,
**list_section,
"items": _unique_items(
list(list_section.get("items", [])) + list(fallback_list["items"])
)[:5],
}
merged_warning = {
**fallback_warning,
**warning_section,
"content": warning_section.get("content") or fallback_warning["content"],
}
return {"sections": [merged_recommendation, merged_list, merged_warning]}
def _resolve_irrigation_method( def _resolve_irrigation_method(
sensor: SensorData | None, sensor: SensorData | None,
irrigation_method_name: str | None, irrigation_method_name: str | None,
@@ -131,6 +280,7 @@ def get_irrigation_recommendation(
active_kc = resolve_kc(crop_profile, growth_stage=growth_stage) active_kc = resolve_kc(crop_profile, growth_stage=growth_stage)
forecasts = [] forecasts = []
daily_water_needs = [] daily_water_needs = []
optimized_result = None
if sensor is not None: if sensor is not None:
forecasts = list( forecasts = list(
WeatherForecast.objects.filter(location=sensor.center_location, forecast_date__isnull=False) WeatherForecast.objects.filter(location=sensor.center_location, forecast_date__isnull=False)
@@ -148,6 +298,15 @@ def get_irrigation_recommendation(
growth_stage=growth_stage, growth_stage=growth_stage,
irrigation_efficiency_percent=efficiency_percent, irrigation_efficiency_percent=efficiency_percent,
) )
if plant is not None and forecasts:
optimized_result = _get_optimizer().optimize_irrigation(
sensor=sensor,
plant=plant,
forecasts=forecasts,
daily_water_needs=daily_water_needs,
growth_stage=growth_stage,
irrigation_method=irrigation_method,
)
context = build_rag_context( context = build_rag_context(
user_query, sensor_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID, user_query, sensor_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID,
@@ -179,6 +338,11 @@ def get_irrigation_recommendation(
f"Kc مورد استفاده: {active_kc}\n" f"Kc مورد استفاده: {active_kc}\n"
+ "\n".join(schedule_lines) + "\n".join(schedule_lines)
) )
if optimized_result is not None:
extra_parts.append(
"[خروجی بهینه ساز شبیه سازی]\n"
+ optimized_result["context_text"]
)
if extra_parts: if extra_parts:
context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "") context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "")
@@ -212,13 +376,12 @@ def get_irrigation_recommendation(
raw = response.choices[0].message.content.strip() raw = response.choices[0].message.content.strip()
except Exception as exc: except Exception as exc:
logger.error("Irrigation recommendation error for %s: %s", sensor_uuid, exc) logger.error("Irrigation recommendation error for %s: %s", sensor_uuid, exc)
result = { result = _build_irrigation_fallback(
"irrigation_needed": None, optimized_result=optimized_result,
"amount_mm": None, daily_water_needs=daily_water_needs,
"reason": f"خطا در دریافت توصیه: {exc}", )
"next_check_date": None, result["error"] = f"خطا در دریافت توصیه: {exc}"
"raw_response": None, result["raw_response"] = None
}
_fail_audit_log( _fail_audit_log(
audit_log, audit_log,
str(exc), str(exc),
@@ -232,18 +395,20 @@ def get_irrigation_recommendation(
cleaned = cleaned.strip("`").removeprefix("json").strip() cleaned = cleaned.strip("`").removeprefix("json").strip()
result = json.loads(cleaned) result = json.loads(cleaned)
except (json.JSONDecodeError, ValueError): except (json.JSONDecodeError, ValueError):
result = { result = {}
"plan": {
"warning": raw,
},
}
result = _merge_irrigation_response(
parsed_result=result,
optimized_result=optimized_result,
daily_water_needs=daily_water_needs,
)
result["raw_response"] = raw result["raw_response"] = raw
result["water_balance"] = { result["water_balance"] = {
"daily": daily_water_needs, "daily": daily_water_needs,
"crop_profile": crop_profile, "crop_profile": crop_profile,
"active_kc": active_kc, "active_kc": active_kc,
} }
result["simulation_optimizer"] = optimized_result
result["selected_irrigation_method"] = ( result["selected_irrigation_method"] = (
{ {
"id": irrigation_method.id, "id": irrigation_method.id,
+115 -6
View File
@@ -34,20 +34,87 @@ class RecommendationServiceDefaultsTests(TestCase):
farm_uuid=self.farm_uuid, farm_uuid=self.farm_uuid,
center_location=self.location, center_location=self.location,
irrigation_method=self.irrigation_method, irrigation_method=self.irrigation_method,
sensor_payload={"sensor-7-1": {"soil_moisture": 30.0}}, sensor_payload={
"sensor-7-1": {
"soil_moisture": 30.0,
"nitrogen": 18.0,
"phosphorus": 12.0,
"potassium": 14.0,
"soil_ph": 6.9,
}
},
) )
self.farm.plants.set([self.plant]) self.farm.plants.set([self.plant])
def build_irrigation_optimizer_result(self):
return {
"engine": "crop_simulation_heuristic",
"context_text": "optimizer irrigation context",
"recommended_strategy": {
"code": "balanced",
"label": "آبیاری متعادل",
"score": 88.0,
"expected_yield_index": 91.0,
"total_irrigation_mm": 24.0,
"amount_per_event_mm": 8.0,
"events": 3,
"frequency_per_week": 3,
"event_dates": ["2026-04-10"],
"timing": "اوایل صبح",
"moisture_target_percent": 70,
"validity_period": "معتبر برای 3 روز آینده",
"reasoning": ["شبیه ساز این سناریو را برتر ارزیابی کرد."],
},
"alternatives": [
{
"code": "protective",
"label": "آبیاری حمایتی",
"score": 80.0,
"expected_yield_index": 85.0,
"total_irrigation_mm": 28.0,
}
],
}
def build_fertilization_optimizer_result(self):
return {
"engine": "crop_simulation_heuristic",
"context_text": "optimizer fertilization context",
"recommended_strategy": {
"code": "balanced",
"label": "تغذیه متعادل",
"score": 84.0,
"expected_yield_index": 88.0,
"fertilizer_type": "20-20-20",
"amount_kg_per_ha": 65.0,
"application_method": "کودآبیاری",
"timing": "صبح زود",
"validity_period": "معتبر برای 7 روز آینده",
"reasoning": ["کسری عناصر با این سناریو بهتر پوشش داده می شود."],
},
"alternatives": [
{
"code": "maintenance",
"label": "تغذیه نگهدارنده",
"score": 72.0,
"expected_yield_index": 78.0,
"amount_kg_per_ha": 45.0,
}
],
}
@patch("rag.services.irrigation.calculate_forecast_water_needs", return_value=[]) @patch("rag.services.irrigation.calculate_forecast_water_needs", return_value=[])
@patch("rag.services.irrigation.resolve_kc", return_value=0.9) @patch("rag.services.irrigation.resolve_kc", return_value=0.9)
@patch("rag.services.irrigation.resolve_crop_profile", return_value={}) @patch("rag.services.irrigation.resolve_crop_profile", return_value={})
@patch("rag.services.irrigation.build_irrigation_method_text", return_value="method text") @patch("rag.services.irrigation.build_irrigation_method_text", return_value="method text")
@patch("rag.services.irrigation.build_plant_text", return_value="plant text") @patch("rag.services.irrigation.build_plant_text", return_value="plant text")
@patch("rag.services.irrigation.build_rag_context", return_value="") @patch("rag.services.irrigation.build_rag_context", return_value="")
@patch("rag.services.irrigation._get_optimizer")
@patch("rag.services.irrigation.get_chat_client") @patch("rag.services.irrigation.get_chat_client")
def test_irrigation_recommendation_uses_farm_relations_when_request_omits_names( def test_irrigation_recommendation_uses_farm_relations_when_request_omits_names(
self, self,
mock_get_chat_client, mock_get_chat_client,
mock_get_optimizer,
mock_build_rag_context, mock_build_rag_context,
mock_build_plant_text, mock_build_plant_text,
mock_build_irrigation_method_text, mock_build_irrigation_method_text,
@@ -55,8 +122,11 @@ class RecommendationServiceDefaultsTests(TestCase):
_mock_resolve_kc, _mock_resolve_kc,
_mock_calculate_forecast_water_needs, _mock_calculate_forecast_water_needs,
): ):
mock_get_optimizer.return_value.optimize_irrigation.return_value = (
self.build_irrigation_optimizer_result()
)
mock_response = Mock() mock_response = Mock()
mock_response.choices = [Mock(message=Mock(content='{"plan": {"frequencyPerWeek": 2}}'))] mock_response.choices = [Mock(message=Mock(content='{"sections": [{"type": "list", "title": "custom", "icon": "list", "items": ["مورد سفارشی"]}]}'))]
mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response
result = get_irrigation_recommendation( result = get_irrigation_recommendation(
@@ -64,7 +134,8 @@ class RecommendationServiceDefaultsTests(TestCase):
growth_stage="میوه‌دهی", growth_stage="میوه‌دهی",
) )
self.assertEqual(result["plan"]["frequencyPerWeek"], 2) self.assertEqual(result["sections"][0]["type"], "recommendation")
self.assertEqual(result["sections"][0]["amount"], "8.0 میلی متر در هر نوبت (جمع کل 24.0 میلی متر)")
mock_build_rag_context.assert_called_once() mock_build_rag_context.assert_called_once()
mock_build_plant_text.assert_called_once_with("گوجه‌فرنگی", "میوه‌دهی") mock_build_plant_text.assert_called_once_with("گوجه‌فرنگی", "میوه‌دهی")
mock_build_irrigation_method_text.assert_called_once_with("آبیاری قطره‌ای") mock_build_irrigation_method_text.assert_called_once_with("آبیاری قطره‌ای")
@@ -72,6 +143,7 @@ class RecommendationServiceDefaultsTests(TestCase):
result["selected_irrigation_method"]["name"], result["selected_irrigation_method"]["name"],
"آبیاری قطره‌ای", "آبیاری قطره‌ای",
) )
self.assertEqual(result["simulation_optimizer"]["engine"], "crop_simulation_heuristic")
@patch("rag.services.irrigation.calculate_forecast_water_needs", return_value=[]) @patch("rag.services.irrigation.calculate_forecast_water_needs", return_value=[])
@patch("rag.services.irrigation.resolve_kc", return_value=0.9) @patch("rag.services.irrigation.resolve_kc", return_value=0.9)
@@ -79,10 +151,12 @@ class RecommendationServiceDefaultsTests(TestCase):
@patch("rag.services.irrigation.build_irrigation_method_text", return_value="method text") @patch("rag.services.irrigation.build_irrigation_method_text", return_value="method text")
@patch("rag.services.irrigation.build_plant_text", return_value="plant text") @patch("rag.services.irrigation.build_plant_text", return_value="plant text")
@patch("rag.services.irrigation.build_rag_context", return_value="") @patch("rag.services.irrigation.build_rag_context", return_value="")
@patch("rag.services.irrigation._get_optimizer")
@patch("rag.services.irrigation.get_chat_client") @patch("rag.services.irrigation.get_chat_client")
def test_irrigation_recommendation_persists_selected_method_on_farm( def test_irrigation_recommendation_persists_selected_method_on_farm(
self, self,
mock_get_chat_client, mock_get_chat_client,
mock_get_optimizer,
_mock_build_rag_context, _mock_build_rag_context,
_mock_build_plant_text, _mock_build_plant_text,
mock_build_irrigation_method_text, mock_build_irrigation_method_text,
@@ -94,8 +168,11 @@ class RecommendationServiceDefaultsTests(TestCase):
self.farm.irrigation_method = None self.farm.irrigation_method = None
self.farm.save(update_fields=["irrigation_method", "updated_at"]) self.farm.save(update_fields=["irrigation_method", "updated_at"])
mock_get_optimizer.return_value.optimize_irrigation.return_value = (
self.build_irrigation_optimizer_result()
)
mock_response = Mock() mock_response = Mock()
mock_response.choices = [Mock(message=Mock(content='{"plan": {"frequencyPerWeek": 4}}'))] mock_response.choices = [Mock(message=Mock(content='{"sections": [{"type": "recommendation", "title": "test", "icon": "droplet", "content": "custom"}]}'))]
mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response
result = get_irrigation_recommendation( result = get_irrigation_recommendation(
@@ -111,15 +188,20 @@ class RecommendationServiceDefaultsTests(TestCase):
@patch("rag.services.fertilization.build_plant_text", return_value="plant text") @patch("rag.services.fertilization.build_plant_text", return_value="plant text")
@patch("rag.services.fertilization.build_rag_context", return_value="") @patch("rag.services.fertilization.build_rag_context", return_value="")
@patch("rag.services.fertilization._get_optimizer")
@patch("rag.services.fertilization.get_chat_client") @patch("rag.services.fertilization.get_chat_client")
def test_fertilization_recommendation_uses_farm_plant_when_request_omits_name( def test_fertilization_recommendation_uses_farm_plant_when_request_omits_name(
self, self,
mock_get_chat_client, mock_get_chat_client,
mock_get_optimizer,
_mock_build_rag_context, _mock_build_rag_context,
mock_build_plant_text, mock_build_plant_text,
): ):
mock_get_optimizer.return_value.optimize_fertilization.return_value = (
self.build_fertilization_optimizer_result()
)
mock_response = Mock() mock_response = Mock()
mock_response.choices = [Mock(message=Mock(content='{"plan": {"npkRatio": "20-20-20"}}'))] mock_response.choices = [Mock(message=Mock(content='{"sections": [{"type": "warning", "title": "هشدار", "icon": "alert-triangle", "content": "از اختلاط نامناسب خودداری شود."}]}'))]
mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response
result = get_fertilization_recommendation( result = get_fertilization_recommendation(
@@ -127,5 +209,32 @@ class RecommendationServiceDefaultsTests(TestCase):
growth_stage="رویشی", growth_stage="رویشی",
) )
self.assertEqual(result["plan"]["npkRatio"], "20-20-20") self.assertEqual(result["sections"][0]["fertilizerType"], "20-20-20")
mock_build_plant_text.assert_called_once_with("گوجه‌فرنگی", "رویشی") mock_build_plant_text.assert_called_once_with("گوجه‌فرنگی", "رویشی")
self.assertEqual(result["simulation_optimizer"]["engine"], "crop_simulation_heuristic")
@patch("rag.services.fertilization.build_plant_text", return_value="plant text")
@patch("rag.services.fertilization.build_rag_context", return_value="")
@patch("rag.services.fertilization._get_optimizer")
@patch("rag.services.fertilization.get_chat_client")
def test_fertilization_recommendation_falls_back_to_optimizer_json_when_llm_returns_invalid_payload(
self,
mock_get_chat_client,
mock_get_optimizer,
_mock_build_rag_context,
_mock_build_plant_text,
):
mock_get_optimizer.return_value.optimize_fertilization.return_value = (
self.build_fertilization_optimizer_result()
)
mock_response = Mock()
mock_response.choices = [Mock(message=Mock(content="not-json"))]
mock_get_chat_client.return_value.chat.completions.create.return_value = mock_response
result = get_fertilization_recommendation(
sensor_uuid=str(self.farm_uuid),
growth_stage="رویشی",
)
self.assertEqual(result["sections"][0]["applicationMethod"], "کودآبیاری")
self.assertEqual(result["sections"][2]["type"], "warning")