import logging import os import sys import json from typing import ( Any, Optional, Iterable, ) logger = logging.getLogger(__name__) #logging.getLogger().setLevel(logging.INFO) def logger_setup(): if len(logger.handlers) == 0: handler = logging.StreamHandler(sys.stderr) logger.addHandler(handler) logger.setLevel(logging.INFO) logger_setup() async def tiktok_videos_links_get( query: Optional[str]=None, screenshot_path: Optional[str]=None, max_time: Optional[int | float]=None, ) -> Iterable[str]: import datetime import TikTokApi import pyktok import asyncio import re if max_time is None: max_time = 10 async with TikTokApi.TikTokApi() as client: await client.create_sessions() session = client.sessions[0] if not query is None: await session.page.goto( 'https://www.tiktok.com/search?q=%s' % query ) if not screenshot_path is None: await session.page.screenshot( path=screenshot_path, ) links = set() started_at = datetime.datetime.now() while True: content = await session.page.content() new_links = re.compile( r'https://www.tiktok.com/@\w+/video/\d+' ).findall(content) old_size = len(links) for o in new_links: links.add(o) await session.page.mouse.wheel(0, 100) elapsed = ( datetime.datetime.now() - started_at ).total_seconds() if elapsed > max_time: break; if old_size < len(links): logger.info(json.dumps(dict( total=len(links), elapsed=elapsed, scroll_y=await session.page.evaluate('window.scrollY'), ))) return links def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]: res = [] for o in links: parts = o.split('/') res.append(dict( url=o, fname='_'.join(parts[-3:]) +'.mp4', )) return res def tiktok_videos_fetch( meta: Iterable[dict[str, Any]] ) -> Iterable[dict[str, Any]]: import pyktok import tqdm stats = dict( saved=0, total=0, skipped=0, error=0, ) for o in tqdm.tqdm(meta): stats['total'] += 1 if not os.path.exists(o['fname']): try: pyktok.save_tiktok(o['url']) stats['saved'] += 1 except: stats['error'] += 1 else: stats['skipped'] += 1 return stats