From 6adf6106f1adb5c7c33830b76a95bd5d19002d8d Mon Sep 17 00:00:00 2001 From: Siarhei Siniak Date: Sat, 6 Jul 2024 19:42:12 +0300 Subject: [PATCH] [~] Refactor --- docker/tiktok/Dockerfile | 1 + docker/tiktok/Makefile | 1 + python/tasks/tiktok/__init__.py | 399 -------------------------------- python/tasks/tiktok/celery.py | 11 +- python/tasks/tiktok/config.py | 71 ++++++ python/tasks/tiktok/tasks.py | 351 ++++++++++++++++++++++++++++ 6 files changed, 432 insertions(+), 402 deletions(-) create mode 100644 python/tasks/tiktok/config.py create mode 100644 python/tasks/tiktok/tasks.py diff --git a/docker/tiktok/Dockerfile b/docker/tiktok/Dockerfile index 4b97fb5..e40bb30 100644 --- a/docker/tiktok/Dockerfile +++ b/docker/tiktok/Dockerfile @@ -10,6 +10,7 @@ RUN python3 -m playwright install RUN pip3 install tqdm RUN apt-get install -yy ffmpeg RUN pip3 install celery redis +RUN pip3 install dataclasses-json WORKDIR /app diff --git a/docker/tiktok/Makefile b/docker/tiktok/Makefile index f94d020..1b300d7 100644 --- a/docker/tiktok/Makefile +++ b/docker/tiktok/Makefile @@ -31,6 +31,7 @@ run: sudo docker-compose \ -f docker/tiktok/docker-compose.yml \ run \ + --use-aliases \ --rm tiktok jupyter: diff --git a/python/tasks/tiktok/__init__.py b/python/tasks/tiktok/__init__.py index a2a662f..e69de29 100644 --- a/python/tasks/tiktok/__init__.py +++ b/python/tasks/tiktok/__init__.py @@ -1,399 +0,0 @@ -import logging -import enum -import dataclasses -import multiprocessing -import traceback -import subprocess -import os -import sys -import json -from typing import ( - Any, - Literal, - Optional, - Iterable, -) -import celery - -logger = logging.getLogger(__name__) - -#logging.getLogger().setLevel(logging.INFO) - -class tiktok_config_t: - @dataclasses.dataclass - class res_t: - project_root: str='' - cache: str='' - videos: str='' - audios: str='' - celery_broker: str='' - celery_result_backend: str='' - celery_imports: Iterable[str]=tuple() - -def tiktok_config() -> tiktok_config_t.res_t: - res = tiktok_config_t.res_t( - project_root=os.path.abspath( - os.path.join( - os.path.dirname(__file__), - '..', '..', '..', - ), - ), - ) - - res.celery_broker = 'redis://@redis/1' - res.celery_result_backend = 'redis://@redis/2' - res.celery_imports = ['python.tasks.tiktok'] - res.cache = os.path.join( - res.project_root, - 'tmp/cache/tiktok', - ) - res.videos = os.path.join( - res.cache, - 'videos', - ) - res.audios = os.path.join( - res.cache, - 'audios', - ) - - os.makedirs(res.videos, exist_ok=True) - os.makedirs(res.audios, exist_ok=True) - - return res - -def logger_setup(): - if len(logger.handlers) == 0: - handler = logging.StreamHandler(sys.stderr) - logger.addHandler(handler) - logger.setLevel(logging.INFO) - -logger_setup() - -@celery.shared_task() -async def tiktok_videos_links_get( - query: Optional[str]=None, - screenshot_path: Optional[str]=None, - max_time: Optional[int | float]=None, - max_links: Optional[int]=None, -) -> Iterable[str]: - import datetime - import TikTokApi - import pyktok - import asyncio - import re - - if max_links is None: - max_links = 100 - - if max_time is None: - max_time = 10 - - async with TikTokApi.TikTokApi() as client: - await client.create_sessions() - - session = client.sessions[0] - - if not query is None: - await session.page.goto( - 'https://www.tiktok.com/search?q=%s' % query - ) - - if not screenshot_path is None: - await session.page.screenshot( - path=screenshot_path, - ) - - links = list() - links_set = set() - - started_at = datetime.datetime.now() - - while True: - content = await session.page.content() - new_links = re.compile( - r'https://www.tiktok.com/@\w+/video/\d+' - ).findall(content) - - old_size = len(links) - - for o in new_links: - if not o in links_set: - links_set.add(o) - links.append(o) - - await session.page.mouse.wheel(0, 100) - - elapsed = ( - datetime.datetime.now() - started_at - ).total_seconds() - - if elapsed > max_time: - break; - - if len(links_set) > max_links: - break - - if old_size < len(links): - logger.info(json.dumps(dict( - total=len(links), - elapsed=elapsed, - scroll_y=await session.page.evaluate('window.scrollY'), - ))) - - return list(links)[:max_links] - -@celery.shared_task() -def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]: - res = [] - for o in links: - parts = o.split('/') - - res.append(dict( - url=o, - id=int(parts[-1]), - fname='_'.join(parts[-3:]) +'.mp4', - result_dir=tiktok_config().videos, - )) - - return res - -class tiktok_video_fetch_t: - class method_t(enum.Enum): - pyktok = 'pyktok' - tikcdn_io_curl = 'tikcdn.io-curl' - tikcdn_io_wget = 'tikcdn.io-wget' - -@celery.shared_task() -def tiktok_video_fetch( - id: int, - url: str, - fname: str, - result_dir: str, - method: Optional[tiktok_video_fetch_t.method_t]=None, - method_str: Optional[str]=None, -) -> None: - os.chdir(result_dir) - - if not method_str is None: - method = tiktok_video_fetch_t.method_t(method_str) - - if method is None: - method = tiktok_video_fetch_t.method_t.tikcdn_io_curl - - if method == tiktok_video_fetch_t.method_t.pyktok: - import pyktok - pyktok.save_tiktok(url) - elif method == tiktok_video_fetch_t.method_t.tikcdn_io_curl: - subprocess.check_call([ - 'curl', - '-v', - 'https://tikcdn.io/ssstik/%d' % id, - '-o', fname, - ]) - elif method == tiktok_video_fetch_t.method_t.tikcdn_io_wget: - subprocess.check_call([ - 'wget', - 'https://tikcdn.io/ssstik/%d' % id, - '-O', - fname, - ]) - else: - raise NotImplementedError - - mime_type = file_mime_type(fname) - - if mime_type in ['empty']: - raise RuntimeError('notdownloaded') - -def file_mime_type(path: str) -> Optional[str]: - if os.path.exists(path): - mime_type = subprocess.check_output([ - 'file', - '-b', path, - ]).strip().decode('utf-8') - return mime_type - else: - return None - -async def playwright_save(url: str): - import TikTokApi - - async with TikTokApi.TikTokApi() as client: - await client.create_sessions() - session = client.sessions[0] - page = session.page - - async with page.expect_download() as download_info: - await page.goto(url) - download = download_info.value - path = download.path() - download.save_as(path) - print(path) - -@celery.shared_task() -def tiktok_videos_fetch( - meta: Iterable[dict[str, Any]], - method: Optional[tiktok_video_fetch_t.method_t]=None, - method_str: Optional[str]=None, - force: Optional[bool]=None, -) -> Iterable[dict[str, Any]]: - import tqdm - - if force is None: - force = False - - stats = dict( - saved=0, - total=0, - skipped=0, - error=0, - ) - - with multiprocessing.Pool(processes=1) as pool: - for o in tqdm.tqdm(meta): - stats['total'] += 1 - path = os.path.join( - o['result_dir'], - o['fname'], - ) - - if ( - not os.path.exists(path) or - file_mime_type(path) in ['empty'] or - force - ): - try: - pool.apply( - tiktok_video_fetch, - kwds=dict( - id=o['id'], - url=o['url'], - fname=o['fname'], - method=method, - method_str=method_str, - result_dir=o['result_dir'], - ), - ) - stats['saved'] += 1 - except KeyboardInterrupt: - break - except: - logger.error(json.dumps(dict( - msg=traceback.format_exc(), - ))) - stats['error'] += 1 - else: - stats['skipped'] += 1 - - return stats - -@celery.shared_task() -def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]: - import tqdm - stats = dict( - saved=0, - total=0, - skipped=0, - error=0, - ) - - song = audio_get() - - for o in tqdm.tqdm(meta): - stats['total'] += 1 - - path = os.path.join( - o['result_dir'], - o['fname'], - ) - - try: - path_parts = os.path.splitext(path) - - processed_path = path_parts[0] + '-proc' + path_parts[1] - processed_path_tmp = path_parts[0] + '-proc.tmp' + path_parts[1] - - if not os.path.exists(path) or os.path.exists(processed_path): - stats['skipped'] += 1 - continue - - if os.path.exists(processed_path_tmp): - os.unlink(processed_path_tmp) - - ffmpeg = [ - 'ffmpeg', - '-i', path, - '-i', song.path_mp3, - '-shortest', - '-vf', - ','.join([ - 'setpts=1.1*PTS', - 'scale=trunc(iw/0.9):trunc(ow/a/2)*2', - ]), - '-sws_flags', 'bilinear', - '-map', '0:v:0', - '-map', '1:a:0', - processed_path_tmp, - ] - - subprocess.check_call( - ffmpeg, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL - ) - - os.rename(processed_path_tmp, processed_path) - - stats['saved'] += 1 - except KeyboardInterrupt: - break - except: - logger.error(json.dumps(dict( - msg=traceback.format_exc(), - ))) - stats['error'] += 1 - - return stats - -class audio_get_t: - @dataclasses.dataclass - class res_t: - file: str - file_mp3: str - path: str - path_mp3: str - url: str - -@celery.shared_task() -def audio_get() -> audio_get_t.res_t: - c = tiktok_config() - url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ' - file = 'song.dat' - file_mp3 = 'song.mp3' - - path = os.path.join(c.audios, file) - path_mp3 = os.path.join(c.audios, file_mp3) - - if not os.path.exists(path): - subprocess.check_call([ - 'yt-dlp', - '-f', 'bestaudio', - url, - '-o', path, - ]) - - if not os.path.exists(path_mp3): - subprocess.check_call([ - 'ffmpeg', - '-i', path, - path_mp3, - ]) - - return audio_get_t.res_t( - file=file, - file_mp3=file_mp3, - path=path, - path_mp3=path_mp3, - url=url, - ) diff --git a/python/tasks/tiktok/celery.py b/python/tasks/tiktok/celery.py index a2ac4e9..99a145e 100644 --- a/python/tasks/tiktok/celery.py +++ b/python/tasks/tiktok/celery.py @@ -1,14 +1,19 @@ -from . import tiktok_config +from .config import tiktok_config import celery +import redis c = tiktok_config() -CELERY_IMPORTS = c.celery_imports - app = celery.Celery( __name__, broker=c.celery_broker, result_backend=c.celery_result_backend, ) +app.autodiscover_tasks(c.celery_imports) + +redis = dict( + broker=redis.Redis(host='redis', db=int(c.celery_broker.split('/')[-1])), + result_backend=redis.Redis(host='redis', db=int(c.celery_result_backend.split('/')[-1])), +) diff --git a/python/tasks/tiktok/config.py b/python/tasks/tiktok/config.py new file mode 100644 index 0000000..73e1d18 --- /dev/null +++ b/python/tasks/tiktok/config.py @@ -0,0 +1,71 @@ +import logging +import enum +import dataclasses +import dataclasses_json +import multiprocessing +import traceback +import subprocess +import os +import sys +import json +from typing import ( + Any, + Literal, + Optional, + Iterable, +) + +logger = logging.getLogger(__name__) + +#logging.getLogger().setLevel(logging.INFO) + +class tiktok_config_t: + @dataclasses_json.dataclass_json + @dataclasses.dataclass + class res_t: + project_root: str='' + cache: str='' + videos: str='' + audios: str='' + celery_broker: str='' + celery_result_backend: str='' + celery_imports: Iterable[str]=tuple() + +def tiktok_config() -> tiktok_config_t.res_t: + res = tiktok_config_t.res_t( + project_root=os.path.abspath( + os.path.join( + os.path.dirname(__file__), + '..', '..', '..', + ), + ), + ) + + res.celery_broker = 'redis://redis:6379/1' + res.celery_result_backend = 'redis://redis:6379/2' + res.celery_imports = ['python.tasks.tiktok.tasks'] + res.cache = os.path.join( + res.project_root, + 'tmp/cache/tiktok', + ) + res.videos = os.path.join( + res.cache, + 'videos', + ) + res.audios = os.path.join( + res.cache, + 'audios', + ) + + os.makedirs(res.videos, exist_ok=True) + os.makedirs(res.audios, exist_ok=True) + + return res + +def logger_setup(): + if len(logger.handlers) == 0: + handler = logging.StreamHandler(sys.stderr) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + +logger_setup() diff --git a/python/tasks/tiktok/tasks.py b/python/tasks/tiktok/tasks.py new file mode 100644 index 0000000..b803921 --- /dev/null +++ b/python/tasks/tiktok/tasks.py @@ -0,0 +1,351 @@ +import logging +import enum +import dataclasses +import dataclasses_json +import multiprocessing +import traceback +import subprocess +import os +import sys +import json +from typing import ( + Any, + Literal, + Optional, + Iterable, +) +import celery +from .config import tiktok_config + +#logging.getLogger().setLevel(logging.INFO) + +@celery.shared_task() +async def tiktok_videos_links_get( + query: Optional[str]=None, + screenshot_path: Optional[str]=None, + max_time: Optional[int | float]=None, + max_links: Optional[int]=None, +) -> Iterable[str]: + import datetime + import TikTokApi + import pyktok + import asyncio + import re + + if max_links is None: + max_links = 100 + + if max_time is None: + max_time = 10 + + async with TikTokApi.TikTokApi() as client: + await client.create_sessions() + + session = client.sessions[0] + + if not query is None: + await session.page.goto( + 'https://www.tiktok.com/search?q=%s' % query + ) + + if not screenshot_path is None: + await session.page.screenshot( + path=screenshot_path, + ) + + links = list() + links_set = set() + + started_at = datetime.datetime.now() + + while True: + content = await session.page.content() + new_links = re.compile( + r'https://www.tiktok.com/@\w+/video/\d+' + ).findall(content) + + old_size = len(links) + + for o in new_links: + if not o in links_set: + links_set.add(o) + links.append(o) + + await session.page.mouse.wheel(0, 100) + + elapsed = ( + datetime.datetime.now() - started_at + ).total_seconds() + + if elapsed > max_time: + break; + + if len(links_set) > max_links: + break + + if old_size < len(links): + logger.info(json.dumps(dict( + total=len(links), + elapsed=elapsed, + scroll_y=await session.page.evaluate('window.scrollY'), + ))) + + return list(links)[:max_links] + +@celery.shared_task() +def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]: + res = [] + for o in links: + parts = o.split('/') + + res.append(dict( + url=o, + id=int(parts[-1]), + fname='_'.join(parts[-3:]) +'.mp4', + result_dir=tiktok_config().videos, + )) + + return res + +class tiktok_video_fetch_t: + class method_t(enum.Enum): + pyktok = 'pyktok' + tikcdn_io_curl = 'tikcdn.io-curl' + tikcdn_io_wget = 'tikcdn.io-wget' + +@celery.shared_task() +def tiktok_video_fetch( + id: int, + url: str, + fname: str, + result_dir: str, + method: Optional[tiktok_video_fetch_t.method_t]=None, + method_str: Optional[str]=None, +) -> None: + os.chdir(result_dir) + + if not method_str is None: + method = tiktok_video_fetch_t.method_t(method_str) + + if method is None: + method = tiktok_video_fetch_t.method_t.tikcdn_io_curl + + if method == tiktok_video_fetch_t.method_t.pyktok: + import pyktok + pyktok.save_tiktok(url) + elif method == tiktok_video_fetch_t.method_t.tikcdn_io_curl: + subprocess.check_call([ + 'curl', + '-v', + 'https://tikcdn.io/ssstik/%d' % id, + '-o', fname, + ]) + elif method == tiktok_video_fetch_t.method_t.tikcdn_io_wget: + subprocess.check_call([ + 'wget', + 'https://tikcdn.io/ssstik/%d' % id, + '-O', + fname, + ]) + else: + raise NotImplementedError + + mime_type = file_mime_type(fname) + + if mime_type in ['empty']: + raise RuntimeError('notdownloaded') + +def file_mime_type(path: str) -> Optional[str]: + if os.path.exists(path): + mime_type = subprocess.check_output([ + 'file', + '-b', path, + ]).strip().decode('utf-8') + return mime_type + else: + return None + +async def playwright_save(url: str): + import TikTokApi + + async with TikTokApi.TikTokApi() as client: + await client.create_sessions() + session = client.sessions[0] + page = session.page + + async with page.expect_download() as download_info: + await page.goto(url) + download = download_info.value + path = download.path() + download.save_as(path) + print(path) + +@celery.shared_task() +def tiktok_videos_fetch( + meta: Iterable[dict[str, Any]], + method: Optional[tiktok_video_fetch_t.method_t]=None, + method_str: Optional[str]=None, + force: Optional[bool]=None, +) -> Iterable[dict[str, Any]]: + import tqdm + + if force is None: + force = False + + stats = dict( + saved=0, + total=0, + skipped=0, + error=0, + ) + + with multiprocessing.Pool(processes=1) as pool: + for o in tqdm.tqdm(meta): + stats['total'] += 1 + path = os.path.join( + o['result_dir'], + o['fname'], + ) + + if ( + not os.path.exists(path) or + file_mime_type(path) in ['empty'] or + force + ): + try: + pool.apply( + tiktok_video_fetch, + kwds=dict( + id=o['id'], + url=o['url'], + fname=o['fname'], + method=method, + method_str=method_str, + result_dir=o['result_dir'], + ), + ) + stats['saved'] += 1 + except KeyboardInterrupt: + break + except: + logger.error(json.dumps(dict( + msg=traceback.format_exc(), + ))) + stats['error'] += 1 + else: + stats['skipped'] += 1 + + return stats + +@celery.shared_task() +def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]: + import tqdm + stats = dict( + saved=0, + total=0, + skipped=0, + error=0, + ) + + song = audio_get() + + for o in tqdm.tqdm(meta): + stats['total'] += 1 + + path = os.path.join( + o['result_dir'], + o['fname'], + ) + + try: + path_parts = os.path.splitext(path) + + processed_path = path_parts[0] + '-proc' + path_parts[1] + processed_path_tmp = path_parts[0] + '-proc.tmp' + path_parts[1] + + if not os.path.exists(path) or os.path.exists(processed_path): + stats['skipped'] += 1 + continue + + if os.path.exists(processed_path_tmp): + os.unlink(processed_path_tmp) + + ffmpeg = [ + 'ffmpeg', + '-i', path, + '-i', song.path_mp3, + '-shortest', + '-vf', + ','.join([ + 'setpts=1.1*PTS', + 'scale=trunc(iw/0.9):trunc(ow/a/2)*2', + ]), + '-sws_flags', 'bilinear', + '-map', '0:v:0', + '-map', '1:a:0', + processed_path_tmp, + ] + + subprocess.check_call( + ffmpeg, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdout=subprocess.DEVNULL + ) + + os.rename(processed_path_tmp, processed_path) + + stats['saved'] += 1 + except KeyboardInterrupt: + break + except: + logger.error(json.dumps(dict( + msg=traceback.format_exc(), + ))) + stats['error'] += 1 + + return stats + +class audio_get_t: + @dataclasses.dataclass + @dataclasses_json.dataclass_json + class res_t: + file: str + file_mp3: str + path: str + path_mp3: str + url: str + +@celery.shared_task() +def audio_get() -> audio_get_t.res_t: + c = tiktok_config() + url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ' + file = 'song.dat' + file_mp3 = 'song.mp3' + + path = os.path.join(c.audios, file) + path_mp3 = os.path.join(c.audios, file_mp3) + + if not os.path.exists(path): + subprocess.check_call([ + 'yt-dlp', + '-f', 'bestaudio', + url, + '-o', path, + ]) + + if not os.path.exists(path_mp3): + subprocess.check_call([ + 'ffmpeg', + '-i', path, + path_mp3, + ]) + + return audio_get_t.res_t( + file=file, + file_mp3=file_mp3, + path=path, + path_mp3=path_mp3, + url=url, + ) +