57 lines
1.9 KiB
Python
57 lines
1.9 KiB
Python
|
|
import os
|
||
|
|
from types import SimpleNamespace
|
||
|
|
from uuid import uuid4
|
||
|
|
|
||
|
|
try:
|
||
|
|
from celery import Celery
|
||
|
|
except ImportError: # pragma: no cover - test/dev fallback when celery is absent
|
||
|
|
Celery = None
|
||
|
|
|
||
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
|
||
|
|
|
||
|
|
if Celery is not None:
|
||
|
|
app = Celery("config")
|
||
|
|
app.config_from_object("django.conf:settings", namespace="CELERY")
|
||
|
|
app.autodiscover_tasks()
|
||
|
|
else:
|
||
|
|
class _FallbackCeleryApp:
|
||
|
|
def config_from_object(self, *_args, **_kwargs):
|
||
|
|
return None
|
||
|
|
|
||
|
|
def autodiscover_tasks(self, *_args, **_kwargs):
|
||
|
|
return None
|
||
|
|
|
||
|
|
def task(self, *decorator_args, **decorator_kwargs):
|
||
|
|
bind = decorator_kwargs.get("bind", False)
|
||
|
|
|
||
|
|
def decorator(func):
|
||
|
|
def delay(*args, **kwargs):
|
||
|
|
task_id = f"missing-celery-{uuid4()}"
|
||
|
|
return SimpleNamespace(
|
||
|
|
id=task_id,
|
||
|
|
status="FAILURE",
|
||
|
|
result={"error": "Celery is not installed."},
|
||
|
|
)
|
||
|
|
|
||
|
|
if bind:
|
||
|
|
def wrapped(*args, **kwargs):
|
||
|
|
dummy_self = SimpleNamespace(
|
||
|
|
request=SimpleNamespace(id=f"missing-celery-{uuid4()}"),
|
||
|
|
update_state=lambda **_kw: None,
|
||
|
|
)
|
||
|
|
return func(dummy_self, *args, **kwargs)
|
||
|
|
|
||
|
|
wrapped.delay = delay
|
||
|
|
wrapped.__name__ = func.__name__
|
||
|
|
wrapped.__doc__ = func.__doc__
|
||
|
|
return wrapped
|
||
|
|
|
||
|
|
func.delay = delay
|
||
|
|
return func
|
||
|
|
|
||
|
|
if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1:
|
||
|
|
return decorator(decorator_args[0])
|
||
|
|
return decorator
|
||
|
|
|
||
|
|
app = _FallbackCeleryApp()
|