diff --git a/python/tasks/tiktok/celery.py b/python/tasks/tiktok/celery.py index 50746e4..7a21704 100644 --- a/python/tasks/tiktok/celery.py +++ b/python/tasks/tiktok/celery.py @@ -1,15 +1,19 @@ from .config import tiktok_config +from .utils import kombu_register_json_dataclass import celery import redis c = tiktok_config() +kombu_register_json_dataclass() + app = celery.Celery( __name__, broker=c.celery_broker, result_backend=c.celery_result_backend, - accept_content=['pickle'], + accept_content=['json-dataclass', 'json',], + task_serializer='json-dataclass', ) app.autodiscover_tasks(c.celery_imports) diff --git a/python/tasks/tiktok/utils.py b/python/tasks/tiktok/utils.py index 191633a..a81f043 100644 --- a/python/tasks/tiktok/utils.py +++ b/python/tasks/tiktok/utils.py @@ -1,16 +1,14 @@ import celery.app.task import importlib -import kombu.serialization import kombu.utils.json from typing import ( Any, ) - class Task(celery.app.task.Task): def __call__(self, *args, **kwargs): res = super().__call__(*args, **kwargs) - return self._to_native(res) + return self._dumps(res) @classmethod def _loads(self, data_str: str) -> Any: @@ -42,10 +40,12 @@ class Task(celery.app.task.Task): else: return kombu.utils.json.dumps(data) -kombu.serialization.register( - 'json-dataclass', - Task._dumps, - Task._loads, - content_type='application/json', - content_encoding='text', -) +def kombu_register_json_dataclass(): + import kombu.serialization + kombu.serialization.register( + 'json-dataclass', + Task._dumps, + Task._loads, + content_type='application/json', + content_encoding='text', + )