freelance-project-34-market.../python/tasks/tiktok/__init__.py
2024-07-06 17:43:37 +03:00

365 lines
8.8 KiB
Python

import logging
import enum
import dataclasses
import multiprocessing
import traceback
import subprocess
import os
import sys
import json
from typing import (
Any,
Literal,
Optional,
Iterable,
)
logger = logging.getLogger(__name__)
#logging.getLogger().setLevel(logging.INFO)
class tiktok_config_t:
@dataclasses.dataclass
class res_t:
project_root: str=''
cache: str=''
videos: str=''
audios: str=''
def tiktok_config() -> tiktok_config_t.res_t:
res = tiktok_config_t.res_t(
project_root=os.path.abspath(
os.path.join(
os.path.dirname(__file__),
'..', '..', '..',
),
),
)
res.cache = os.path.join(
res.project_root,
'tmp/cache/tiktok',
)
res.videos = os.path.join(
res.cache,
'videos',
)
res.audios = os.path.join(
res.cache,
'audios',
)
os.makedirs(res.videos, exist_ok=True)
os.makedirs(res.audios, exist_ok=True)
return res
def logger_setup():
if len(logger.handlers) == 0:
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger_setup()
async def tiktok_videos_links_get(
query: Optional[str]=None,
screenshot_path: Optional[str]=None,
max_time: Optional[int | float]=None,
) -> Iterable[str]:
import datetime
import TikTokApi
import pyktok
import asyncio
import re
if max_time is None:
max_time = 10
async with TikTokApi.TikTokApi() as client:
await client.create_sessions()
session = client.sessions[0]
if not query is None:
await session.page.goto(
'https://www.tiktok.com/search?q=%s' % query
)
if not screenshot_path is None:
await session.page.screenshot(
path=screenshot_path,
)
links = set()
started_at = datetime.datetime.now()
while True:
content = await session.page.content()
new_links = re.compile(
r'https://www.tiktok.com/@\w+/video/\d+'
).findall(content)
old_size = len(links)
for o in new_links:
links.add(o)
await session.page.mouse.wheel(0, 100)
elapsed = (
datetime.datetime.now() - started_at
).total_seconds()
if elapsed > max_time:
break;
if old_size < len(links):
logger.info(json.dumps(dict(
total=len(links),
elapsed=elapsed,
scroll_y=await session.page.evaluate('window.scrollY'),
)))
return links
def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]:
res = []
for o in links:
parts = o.split('/')
res.append(dict(
url=o,
id=int(parts[-1]),
fname='_'.join(parts[-3:]) +'.mp4',
result_dir=tiktok_config().videos,
))
return res
class tiktok_video_fetch_t:
class method_t(enum.Enum):
pyktok = 'pyktok'
tikcdn_io_curl = 'tikcdn.io-curl'
tikcdn_io_wget = 'tikcdn.io-wget'
def tiktok_video_fetch(
id: int,
url: str,
fname: str,
result_dir: str,
method: Optional[tiktok_video_fetch_t.method_t]=None,
method_str: Optional[str]=None,
) -> None:
os.chdir(result_dir)
if not method_str is None:
method = tiktok_video_fetch_t.method_t(method_str)
if method is None:
method = tiktok_video_fetch_t.method_t.tikcdn_io_curl
if method == tiktok_video_fetch_t.method_t.pyktok:
import pyktok
pyktok.save_tiktok(url)
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_curl:
subprocess.check_call([
'curl',
'-v',
'https://tikcdn.io/ssstik/%d' % id,
'-o', fname,
])
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_wget:
subprocess.check_call([
'wget',
'https://tikcdn.io/ssstik/%d' % id,
'-O',
fname,
])
else:
raise NotImplementedError
mime_type = file_mime_type(fname)
if mime_type in ['empty']:
raise RuntimeError('notdownloaded')
def file_mime_type(path: str) -> Optional[str]:
if os.path.exists(path):
mime_type = subprocess.check_output([
'file',
'-b', path,
]).strip().decode('utf-8')
return mime_type
else:
return None
async def playwright_save(url: str):
import TikTokApi
async with TikTokApi.TikTokApi() as client:
await client.create_sessions()
session = client.sessions[0]
page = session.page
async with page.expect_download() as download_info:
await page.goto(url)
download = download_info.value
path = download.path()
download.save_as(path)
print(path)
def tiktok_videos_fetch(
meta: Iterable[dict[str, Any]],
method: Optional[tiktok_video_fetch_t.method_t]=None,
method_str: Optional[str]=None,
force: Optional[bool]=None,
) -> Iterable[dict[str, Any]]:
import tqdm
if force is None:
force = False
stats = dict(
saved=0,
total=0,
skipped=0,
error=0,
)
with multiprocessing.Pool(processes=1) as pool:
for o in tqdm.tqdm(meta):
stats['total'] += 1
path = os.path.join(
o['result_dir'],
o['fname'],
)
if (
not os.path.exists(path) or
file_mime_type(path) in ['empty'] or
force
):
try:
pool.apply(
tiktok_video_fetch,
kwds=dict(
id=o['id'],
url=o['url'],
fname=o['fname'],
method=method,
method_str=method_str,
result_dir=o['result_dir'],
),
)
stats['saved'] += 1
except KeyboardInterrupt:
break
except:
logger.error(json.dumps(dict(
msg=traceback.format_exc(),
)))
stats['error'] += 1
else:
stats['skipped'] += 1
return stats
def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]:
import tqdm
stats = dict(
saved=0,
total=0,
skipped=0,
error=0,
)
song = audio_get()
for o in tqdm.tqdm(meta):
stats['total'] += 1
path = os.path.join(
o['result_dir'],
o['fname'],
)
try:
path_parts = os.path.splitext(path)
processed_path = path_parts[0] + '-proc' + path_parts[1]
print(processed_path)
if not os.path.exists(path) or os.path.exists(processed_path):
stats['skipped'] += 1
continue
ffmpeg = [
'ffmpeg',
'-i', path,
'-i', song.path_mp3,
'-shortest',
'-vf',
','.join([
'setpts=1.1*PTS',
'scale=trunc(iw/0.9):trunc(ow/a/2)*2',
]),
'-sws_flags', 'bilinear',
'-map', '0:v:0',
'-map', '1:a:0',
processed_path,
]
subprocess.check_call(ffmpeg)
stats['saved'] += 1
except KeyboardInterrupt:
break
except:
logger.error(json.dumps(dict(
msg=traceback.format_exc(),
)))
stats['error'] += 1
return stats
class audio_get_t:
@dataclasses.dataclass
class res_t:
file: str
file_mp3: str
path: str
path_mp3: str
url: str
def audio_get() -> audio_get_t.res_t:
c = tiktok_config()
url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ'
file = 'song.dat'
file_mp3 = 'song.mp3'
path = os.path.join(c.audios, file)
path_mp3 = os.path.join(c.audios, file_mp3)
if not os.path.exists(path):
subprocess.check_call([
'yt-dlp',
'-f', 'bestaudio',
url,
'-o', path,
])
if not os.path.exists(path_mp3):
subprocess.check_call([
'ffmpeg',
'-i', path,
path_mp3,
])
return audio_get_t.res_t(
file=file,
file_mp3=file_mp3,
path=path,
path_mp3=path_mp3,
url=url,
)