[~] Refactor

This commit is contained in:
Siarhei Siniak 2024-07-06 14:27:14 +03:00
parent 2f5a5d0e78
commit 75f41b03db
4 changed files with 156 additions and 0 deletions

@ -2,6 +2,11 @@ FROM python:latest
RUN pip3 install ipython jupyter
RUN apt-get update -yy && apt-get install -yy zsh htop mc git
RUN pip3 install jupyterlab-vim
RUN pip3 install pyktok yt-dlp playwright==1.44.0 TikTokApi
RUN pip3 install numpy pandas browser_cookie3 ipdb asgiref
RUN python3 -m playwright install-deps
RUN python3 -m playwright install
WORKDIR /app

@ -5,4 +5,6 @@ mkdir -p tmp/cache/tiktok/jupyter
ln -sf $PWD/tmp/cache/tiktok/zsh/histfile ~/.histfile
ln -sf $PWD/tmp/cache/tiktok/jupyter ~/.jupyter
ln -sf $PWD/tmp/cache/tiktok/ipython ~/.ipython
ipython3 profile create
ln -sf $PWD/docker/tiktok/ipython_config.py ~/.ipython/profile_default/
exec $@

@ -0,0 +1,71 @@
c.InteractiveShellApp.exec_lines = [
'%autoreload 2',
r'''
def ipython_update_shortcuts():
import IPython
import prompt_toolkit.filters
import prompt_toolkit.document
import functools
import tempfile
import io
import subprocess
def ipython_edit_in_vim(*args, pt_app):
content = pt_app.app.current_buffer.document.text
lines_count = lambda text: len(text.splitlines())
with tempfile.NamedTemporaryFile(
suffix='.py',
mode='w',
) as f:
with io.open(f.name, 'w') as f2:
f2.write(content)
f2.flush()
result = subprocess.call([
'vim',
'+%d' % lines_count(content),
f.name,
])
if result != 0:
return
f.seek(0, io.SEEK_SET)
with io.open(f.name, 'r') as f2:
new_content = f2.read()
pt_app.app.current_buffer.document = \
prompt_toolkit.document.Document(
new_content,
cursor_position=len(new_content.rstrip()),
)
t1 = IPython.get_ipython()
t2 = t1.pt_app
t3 = [o for o in t2.key_bindings.bindings if 'f2' in repr(o.keys).lower()]
assert len(t3) == 1
t4 = t3[0]
t2.key_bindings.remove(t4.handler)
t2.key_bindings.add(
'\\', 'e', filter=~prompt_toolkit.filters.vi_insert_mode,
)(
functools.partial(
ipython_edit_in_vim,
pt_app=t2,
)
#t4.handler
)
''',
'ipython_update_shortcuts()',
]
c.InteractiveShellApp.extensions = ['autoreload']
c.InteractiveShell.history_length = 100 * 1000 * 1000
c.InteractiveShell.history_load_length = 100 * 1000 * 1000
c.InteractiveShell.enable_history_search = False
c.InteractiveShell.autosuggestions_provider = None
c.InteractiveShell.pdb = True
c.TerminalInteractiveShell.editing_mode = 'vi'
c.TerminalInteractiveShell.modal_cursor = False
c.TerminalInteractiveShell.emacs_bindings_in_vi_insert_mode = False

@ -0,0 +1,78 @@
import logging
import sys
import json
from typing import (
Optional,
Iterable,
)
logger = logging.getLogger(__name__)
#logging.getLogger().setLevel(logging.INFO)
def logger_setup():
if len(logger.handlers) == 0:
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger_setup()
async def tiktok_videos_links_get(
query: Optional[str]=None,
screenshot_path: Optional[str]=None,
max_time: Optional[int | float]=None,
) -> Iterable[str]:
import datetime
import TikTokApi
import pyktok
import asyncio
import re
if max_time is None:
max_time = 10
async with TikTokApi.TikTokApi() as client:
await client.create_sessions()
session = client.sessions[0]
if not query is None:
await session.page.goto(
'https://www.tiktok.com/search?q=%s' % query
)
if not screenshot_path is None:
await session.page.screenshot(
path=screenshot_path,
)
links = set()
started_at = datetime.datetime.now()
while True:
content = await session.page.content()
new_links = re.compile(
r'https://www.tiktok.com/@\w+/video/\d+'
).findall(content)
for o in new_links:
links.add(o)
await session.page.mouse.wheel(0, 100)
elapsed = (
datetime.datetime.now() - started_at
).total_seconds()
if elapsed > max_time:
break;
logger.info(json.dumps(dict(
total=len(links),
elapsed=elapsed,
scroll_y=await session.page.evaluate('window.scrollY'),
)))
return links