freelance-project-34-market.../python/tasks/tiktok/__init__.py
2024-07-06 15:13:45 +03:00

171 lines
3.9 KiB
Python

import logging
import dataclasses
import traceback
import subprocess
import os
import sys
import json
from typing import (
Any,
Literal,
Optional,
Iterable,
)
logger = logging.getLogger(__name__)
#logging.getLogger().setLevel(logging.INFO)
class tiktok_config_t:
@dataclasses.dataclass
class res_t:
project_root: str=''
cache: str=''
videos: str=''
def tiktok_config() -> tiktok_config_t.res_t:
res = tiktok_config_t.res_t(
project_root=os.path.abspath(
os.path.join(
__file__,
'..', '..', '..',
),
),
)
res.cache = os.path.join(
res.project_root,
'tmp/cache/tiktok',
)
res.videos = os.path.join(
res.cache,
'videos',
)
os.makedirs(res.videos, exist_ok=True)
return res
def logger_setup():
if len(logger.handlers) == 0:
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger_setup()
async def tiktok_videos_links_get(
query: Optional[str]=None,
screenshot_path: Optional[str]=None,
max_time: Optional[int | float]=None,
) -> Iterable[str]:
import datetime
import TikTokApi
import pyktok
import asyncio
import re
if max_time is None:
max_time = 10
async with TikTokApi.TikTokApi() as client:
await client.create_sessions()
session = client.sessions[0]
if not query is None:
await session.page.goto(
'https://www.tiktok.com/search?q=%s' % query
)
if not screenshot_path is None:
await session.page.screenshot(
path=screenshot_path,
)
links = set()
started_at = datetime.datetime.now()
while True:
content = await session.page.content()
new_links = re.compile(
r'https://www.tiktok.com/@\w+/video/\d+'
).findall(content)
old_size = len(links)
for o in new_links:
links.add(o)
await session.page.mouse.wheel(0, 100)
elapsed = (
datetime.datetime.now() - started_at
).total_seconds()
if elapsed > max_time:
break;
if old_size < len(links):
logger.info(json.dumps(dict(
total=len(links),
elapsed=elapsed,
scroll_y=await session.page.evaluate('window.scrollY'),
)))
return links
def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]:
res = []
for o in links:
parts = o.split('/')
res.append(dict(
url=o,
id=int(parts[-1]),
fname='_'.join(parts[-3:]) +'.mp4',
))
return res
def tiktok_videos_fetch(
meta: Iterable[dict[str, Any]],
method: Optional[Literal['pyktok', 'tikcdn.io']]=None,
) -> Iterable[dict[str, Any]]:
import pyktok
import tqdm
if method is None:
method = 'pyktok'
stats = dict(
saved=0,
total=0,
skipped=0,
error=0,
)
for o in tqdm.tqdm(meta):
stats['total'] += 1
if not os.path.exists(o['fname']):
try:
if method == 'pyktok':
pyktok.save_tiktok(o['url'])
elif method == 'tikcdn.io':
subprocess.check_call([
'curl',
'https://tikcdn.io/ssstik/%d' % o['id'],
'-o', o['fname'],
])
stats['saved'] += 1
except:
logger.error(json.dumps(dict(
msg=traceback.format_exc(),
)))
stats['error'] += 1
else:
stats['skipped'] += 1
return stats