freelance-project-34-market.../python/tasks/tiktok/__init__.py
2024-07-06 14:30:27 +03:00

82 lines
1.9 KiB
Python

import logging
import sys
import json
from typing import (
Optional,
Iterable,
)
logger = logging.getLogger(__name__)
#logging.getLogger().setLevel(logging.INFO)
def logger_setup():
if len(logger.handlers) == 0:
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger_setup()
async def tiktok_videos_links_get(
query: Optional[str]=None,
screenshot_path: Optional[str]=None,
max_time: Optional[int | float]=None,
) -> Iterable[str]:
import datetime
import TikTokApi
import pyktok
import asyncio
import re
if max_time is None:
max_time = 10
async with TikTokApi.TikTokApi() as client:
await client.create_sessions()
session = client.sessions[0]
if not query is None:
await session.page.goto(
'https://www.tiktok.com/search?q=%s' % query
)
if not screenshot_path is None:
await session.page.screenshot(
path=screenshot_path,
)
links = set()
started_at = datetime.datetime.now()
while True:
content = await session.page.content()
new_links = re.compile(
r'https://www.tiktok.com/@\w+/video/\d+'
).findall(content)
old_size = len(links)
for o in new_links:
links.add(o)
await session.page.mouse.wheel(0, 100)
elapsed = (
datetime.datetime.now() - started_at
).total_seconds()
if elapsed > max_time:
break;
if old_size < len(links):
logger.info(json.dumps(dict(
total=len(links),
elapsed=elapsed,
scroll_y=await session.page.evaluate('window.scrollY'),
)))
return links