From cffbe712c609b15aeb247bd60564193014b4c1fc Mon Sep 17 00:00:00 2001 From: Siarhei Siniak Date: Sat, 6 Jul 2024 19:17:56 +0300 Subject: [PATCH] [~] Refactor --- docker/tiktok/Makefile | 6 ++++++ python/tasks/tiktok/__init__.py | 11 +++++++++++ python/tasks/tiktok/celery.py | 4 ++++ 3 files changed, 21 insertions(+) diff --git a/docker/tiktok/Makefile b/docker/tiktok/Makefile index d88754d..f94d020 100644 --- a/docker/tiktok/Makefile +++ b/docker/tiktok/Makefile @@ -20,6 +20,12 @@ celery-up: celery-stop: $(MAKE) c ARGS="stop redis celery" +logs: + $(MAKE) c ARGS="logs --tail=100 -f" + +celery-restart: + $(MAKE) c ARGS="restart celery" + run: cd ${PROJECT_ROOT} && \ sudo docker-compose \ diff --git a/python/tasks/tiktok/__init__.py b/python/tasks/tiktok/__init__.py index d2a1637..a2a662f 100644 --- a/python/tasks/tiktok/__init__.py +++ b/python/tasks/tiktok/__init__.py @@ -13,6 +13,7 @@ from typing import ( Optional, Iterable, ) +import celery logger = logging.getLogger(__name__) @@ -26,6 +27,8 @@ class tiktok_config_t: videos: str='' audios: str='' celery_broker: str='' + celery_result_backend: str='' + celery_imports: Iterable[str]=tuple() def tiktok_config() -> tiktok_config_t.res_t: res = tiktok_config_t.res_t( @@ -38,6 +41,8 @@ def tiktok_config() -> tiktok_config_t.res_t: ) res.celery_broker = 'redis://@redis/1' + res.celery_result_backend = 'redis://@redis/2' + res.celery_imports = ['python.tasks.tiktok'] res.cache = os.path.join( res.project_root, 'tmp/cache/tiktok', @@ -64,6 +69,7 @@ def logger_setup(): logger_setup() +@celery.shared_task() async def tiktok_videos_links_get( query: Optional[str]=None, screenshot_path: Optional[str]=None, @@ -136,6 +142,7 @@ async def tiktok_videos_links_get( return list(links)[:max_links] +@celery.shared_task() def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]: res = [] for o in links: @@ -156,6 +163,7 @@ class tiktok_video_fetch_t: tikcdn_io_curl = 'tikcdn.io-curl' tikcdn_io_wget = 'tikcdn.io-wget' +@celery.shared_task() def tiktok_video_fetch( id: int, url: str, @@ -222,6 +230,7 @@ async def playwright_save(url: str): download.save_as(path) print(path) +@celery.shared_task() def tiktok_videos_fetch( meta: Iterable[dict[str, Any]], method: Optional[tiktok_video_fetch_t.method_t]=None, @@ -278,6 +287,7 @@ def tiktok_videos_fetch( return stats +@celery.shared_task() def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]: import tqdm stats = dict( @@ -355,6 +365,7 @@ class audio_get_t: path_mp3: str url: str +@celery.shared_task() def audio_get() -> audio_get_t.res_t: c = tiktok_config() url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ' diff --git a/python/tasks/tiktok/celery.py b/python/tasks/tiktok/celery.py index 0cbcd0f..a2ac4e9 100644 --- a/python/tasks/tiktok/celery.py +++ b/python/tasks/tiktok/celery.py @@ -4,7 +4,11 @@ import celery c = tiktok_config() +CELERY_IMPORTS = c.celery_imports + app = celery.Celery( __name__, broker=c.celery_broker, + result_backend=c.celery_result_backend, ) +