From b2afd124a7505ba4a99eaf75356287db26f0f884 Mon Sep 17 00:00:00 2001 From: Siarhei Siniak Date: Sat, 6 Jul 2024 20:47:12 +0300 Subject: [PATCH] [~] Refactor --- docker/tiktok/Dockerfile | 2 ++ python/tasks/tiktok/celery.py | 8 +++-- python/tasks/tiktok/utils.py | 59 +++++++++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/docker/tiktok/Dockerfile b/docker/tiktok/Dockerfile index e40bb30..bb8c9d3 100644 --- a/docker/tiktok/Dockerfile +++ b/docker/tiktok/Dockerfile @@ -11,6 +11,8 @@ RUN pip3 install tqdm RUN apt-get install -yy ffmpeg RUN pip3 install celery redis RUN pip3 install dataclasses-json +RUN pip3 install rpdb +RUN apt-get install -yy netcat-traditional WORKDIR /app diff --git a/python/tasks/tiktok/celery.py b/python/tasks/tiktok/celery.py index 7a21704..27452ed 100644 --- a/python/tasks/tiktok/celery.py +++ b/python/tasks/tiktok/celery.py @@ -6,16 +6,18 @@ import redis c = tiktok_config() -kombu_register_json_dataclass() - app = celery.Celery( __name__, broker=c.celery_broker, result_backend=c.celery_result_backend, - accept_content=['json-dataclass', 'json',], + accept_content=['json-dataclass'], task_serializer='json-dataclass', + result_serializer='json-dataclass', ) +kombu_register_json_dataclass() + + app.autodiscover_tasks(c.celery_imports) redis = dict( diff --git a/python/tasks/tiktok/utils.py b/python/tasks/tiktok/utils.py index a81f043..c6ea154 100644 --- a/python/tasks/tiktok/utils.py +++ b/python/tasks/tiktok/utils.py @@ -3,16 +3,22 @@ import importlib import kombu.utils.json from typing import ( Any, + Optional, ) class Task(celery.app.task.Task): def __call__(self, *args, **kwargs): res = super().__call__(*args, **kwargs) - return self._dumps(res) + return res @classmethod - def _loads(self, data_str: str) -> Any: - data = kombu.utils.json.loads(data_str) + def _loads( + cls, + data_str: Optional[str]=None, + data: Optional[Any]=None, + ) -> Any: + if not data_str is None: + data = kombu.utils.json.loads(data_str) if isinstance(data, dict) and data.get('type') == 'dataclass_json': module_name = data['module'] @@ -26,19 +32,54 @@ class Task(celery.app.task.Task): return c(**data['data']) else: - return data + if isinstance(data, list): + return [ + cls._loads(data=o) + for o in data + ] + elif isinstance(data, dict): + return { + k : cls._loads(data=v) + for k, v in data.items() + } + else: + return data @classmethod - def _dumps(self, data: Any) -> str: + def _dumps( + cls, + data: Any, + need_native: Optional[bool]=None, + ) -> Any: + if need_native is None: + need_native = False + + native = None if hasattr(data, 'to_dict'): - return kombu.utils.json.dumps(dict( + native = dict( type='dataclass_json', module=data.__class__.__module__, _class=data.__class__.__qualname__, data=data.to_dict(), - )) + ) else: - return kombu.utils.json.dumps(data) + if isinstance(data, (list, tuple)): + native = [ + cls._dumps(o, need_native=True,) + for o in data + ] + elif isinstance(data, dict): + native = { + k : cls._dumps(v, need_native=True,) + for k, v in data.items() + } + else: + native = data + + if not need_native: + return kombu.utils.json.dumps(native) + else: + return native def kombu_register_json_dataclass(): import kombu.serialization @@ -47,5 +88,5 @@ def kombu_register_json_dataclass(): Task._dumps, Task._loads, content_type='application/json', - content_encoding='text', + content_encoding='utf-8', )