import celery.app.task import importlib import kombu.utils.json from typing import ( Any, ) class Task(celery.app.task.Task): def __call__(self, *args, **kwargs): res = super().__call__(*args, **kwargs) return self._dumps(res) @classmethod def _loads(self, data_str: str) -> Any: data = kombu.utils.json.loads(data_str) if isinstance(data, dict) and data.get('type') == 'dataclass_json': module_name = data['module'] class_names = data['_class'].split('.') m = importlib.import_module(module_name) c = m for current_name in class_names: c = getattr(c, current_name) return c(**data['data']) else: return data @classmethod def _dumps(self, data: Any) -> str: if hasattr(data, 'to_dict'): return kombu.utils.json.dumps(dict( type='dataclass_json', module=data.__class__.__module__, _class=data.__class__.__qualname__, data=data.to_dict(), )) else: return kombu.utils.json.dumps(data) def kombu_register_json_dataclass(): import kombu.serialization kombu.serialization.register( 'json-dataclass', Task._dumps, Task._loads, content_type='application/json', content_encoding='text', )