Files
Ai/config/celery.py
T
2026-04-24 18:34:17 +03:30

57 lines
1.9 KiB
Python

import os
from types import SimpleNamespace
from uuid import uuid4
try:
from celery import Celery
except ImportError: # pragma: no cover - test/dev fallback when celery is absent
Celery = None
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
if Celery is not None:
app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
else:
class _FallbackCeleryApp:
def config_from_object(self, *_args, **_kwargs):
return None
def autodiscover_tasks(self, *_args, **_kwargs):
return None
def task(self, *decorator_args, **decorator_kwargs):
bind = decorator_kwargs.get("bind", False)
def decorator(func):
def delay(*args, **kwargs):
task_id = f"missing-celery-{uuid4()}"
return SimpleNamespace(
id=task_id,
status="FAILURE",
result={"error": "Celery is not installed."},
)
if bind:
def wrapped(*args, **kwargs):
dummy_self = SimpleNamespace(
request=SimpleNamespace(id=f"missing-celery-{uuid4()}"),
update_state=lambda **_kw: None,
)
return func(dummy_self, *args, **kwargs)
wrapped.delay = delay
wrapped.__name__ = func.__name__
wrapped.__doc__ = func.__doc__
return wrapped
func.delay = delay
return func
if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1:
return decorator(decorator_args[0])
return decorator
app = _FallbackCeleryApp()