262 lines
6.4 KiB
Python
262 lines
6.4 KiB
Python
import logging
|
|
import enum
|
|
import dataclasses
|
|
import multiprocessing
|
|
import traceback
|
|
import subprocess
|
|
import os
|
|
import sys
|
|
import json
|
|
from typing import (
|
|
Any,
|
|
Literal,
|
|
Optional,
|
|
Iterable,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
#logging.getLogger().setLevel(logging.INFO)
|
|
|
|
class tiktok_config_t:
|
|
@dataclasses.dataclass
|
|
class res_t:
|
|
project_root: str=''
|
|
cache: str=''
|
|
videos: str=''
|
|
|
|
def tiktok_config() -> tiktok_config_t.res_t:
|
|
res = tiktok_config_t.res_t(
|
|
project_root=os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
'..', '..', '..',
|
|
),
|
|
),
|
|
)
|
|
|
|
res.cache = os.path.join(
|
|
res.project_root,
|
|
'tmp/cache/tiktok',
|
|
)
|
|
res.videos = os.path.join(
|
|
res.cache,
|
|
'videos',
|
|
)
|
|
|
|
os.makedirs(res.videos, exist_ok=True)
|
|
|
|
return res
|
|
|
|
def logger_setup():
|
|
if len(logger.handlers) == 0:
|
|
handler = logging.StreamHandler(sys.stderr)
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
logger_setup()
|
|
|
|
async def tiktok_videos_links_get(
|
|
query: Optional[str]=None,
|
|
screenshot_path: Optional[str]=None,
|
|
max_time: Optional[int | float]=None,
|
|
) -> Iterable[str]:
|
|
import datetime
|
|
import TikTokApi
|
|
import pyktok
|
|
import asyncio
|
|
import re
|
|
|
|
if max_time is None:
|
|
max_time = 10
|
|
|
|
async with TikTokApi.TikTokApi() as client:
|
|
await client.create_sessions()
|
|
|
|
session = client.sessions[0]
|
|
|
|
if not query is None:
|
|
await session.page.goto(
|
|
'https://www.tiktok.com/search?q=%s' % query
|
|
)
|
|
|
|
if not screenshot_path is None:
|
|
await session.page.screenshot(
|
|
path=screenshot_path,
|
|
)
|
|
|
|
links = set()
|
|
|
|
started_at = datetime.datetime.now()
|
|
|
|
while True:
|
|
content = await session.page.content()
|
|
new_links = re.compile(
|
|
r'https://www.tiktok.com/@\w+/video/\d+'
|
|
).findall(content)
|
|
|
|
old_size = len(links)
|
|
|
|
for o in new_links:
|
|
links.add(o)
|
|
|
|
await session.page.mouse.wheel(0, 100)
|
|
|
|
elapsed = (
|
|
datetime.datetime.now() - started_at
|
|
).total_seconds()
|
|
|
|
if elapsed > max_time:
|
|
break;
|
|
|
|
if old_size < len(links):
|
|
logger.info(json.dumps(dict(
|
|
total=len(links),
|
|
elapsed=elapsed,
|
|
scroll_y=await session.page.evaluate('window.scrollY'),
|
|
)))
|
|
|
|
return links
|
|
|
|
def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]:
|
|
res = []
|
|
for o in links:
|
|
parts = o.split('/')
|
|
|
|
res.append(dict(
|
|
url=o,
|
|
id=int(parts[-1]),
|
|
fname='_'.join(parts[-3:]) +'.mp4',
|
|
result_dir=tiktok_config().videos,
|
|
))
|
|
|
|
return res
|
|
|
|
class tiktok_video_fetch_t:
|
|
class method_t(enum.Enum):
|
|
pyktok = 'pyktok'
|
|
tikcdn_io_curl = 'tikcdn.io-curl'
|
|
tikcdn_io_wget = 'tikcdn.io-wget'
|
|
|
|
def tiktok_video_fetch(
|
|
id: int,
|
|
url: str,
|
|
fname: str,
|
|
result_dir: str,
|
|
method: Optional[tiktok_video_fetch_t.method_t]=None,
|
|
method_str: Optional[str]=None,
|
|
) -> None:
|
|
os.chdir(result_dir)
|
|
|
|
if not method_str is None:
|
|
method = tiktok_video_fetch_t.method_t(method_str)
|
|
|
|
if method is None:
|
|
method = tiktok_video_fetch_t.method_t.tikcdn_io_curl
|
|
|
|
if method == tiktok_video_fetch_t.method_t.pyktok:
|
|
import pyktok
|
|
pyktok.save_tiktok(url)
|
|
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_curl:
|
|
subprocess.check_call([
|
|
'curl',
|
|
'-v',
|
|
'https://tikcdn.io/ssstik/%d' % id,
|
|
'-o', fname,
|
|
])
|
|
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_wget:
|
|
subprocess.check_call([
|
|
'wget',
|
|
'https://tikcdn.io/ssstik/%d' % id,
|
|
'-O',
|
|
fname,
|
|
])
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
mime_type = file_mime_type(fname)
|
|
|
|
if mime_type in ['empty']:
|
|
raise RuntimeError('notdownloaded')
|
|
|
|
def file_mime_type(path: str) -> Optional[str]:
|
|
if os.path.exists(path):
|
|
mime_type = subprocess.check_output([
|
|
'file',
|
|
'-b', path,
|
|
]).strip().decode('utf-8')
|
|
return mime_type
|
|
else:
|
|
return None
|
|
|
|
async def playwright_save(url: str):
|
|
import TikTokApi
|
|
|
|
async with TikTokApi.TikTokApi() as client:
|
|
await client.create_sessions()
|
|
session = client.sessions[0]
|
|
page = session.page
|
|
|
|
async with page.expect_download() as download_info:
|
|
await page.goto(url)
|
|
download = download_info.value
|
|
path = download.path()
|
|
download.save_as(path)
|
|
print(path)
|
|
|
|
def tiktok_videos_fetch(
|
|
meta: Iterable[dict[str, Any]],
|
|
method: Optional[tiktok_video_fetch_t.method_t]=None,
|
|
method_str: Optional[str]=None,
|
|
force: Optional[bool]=None,
|
|
) -> Iterable[dict[str, Any]]:
|
|
import tqdm
|
|
|
|
if force is None:
|
|
force = False
|
|
|
|
stats = dict(
|
|
saved=0,
|
|
total=0,
|
|
skipped=0,
|
|
error=0,
|
|
)
|
|
|
|
with multiprocessing.Pool(processes=1) as pool:
|
|
for o in tqdm.tqdm(meta):
|
|
stats['total'] += 1
|
|
path = os.path.join(
|
|
o['result_dir'],
|
|
o['fname'],
|
|
)
|
|
|
|
if (
|
|
not os.path.exists(path) or
|
|
file_mime_type(path) in ['empty'] or
|
|
force
|
|
):
|
|
try:
|
|
pool.apply(
|
|
tiktok_video_fetch,
|
|
kwds=dict(
|
|
id=o['id'],
|
|
url=o['url'],
|
|
fname=o['fname'],
|
|
method=method,
|
|
method_str=method_str,
|
|
result_dir=o['result_dir'],
|
|
),
|
|
)
|
|
stats['saved'] += 1
|
|
except KeyboardInterrupt:
|
|
break
|
|
except:
|
|
logger.error(json.dumps(dict(
|
|
msg=traceback.format_exc(),
|
|
)))
|
|
stats['error'] += 1
|
|
else:
|
|
stats['skipped'] += 1
|
|
|
|
return stats
|