[~] Refactor

This commit is contained in:
Siarhei Siniak 2024-07-06 21:06:09 +03:00
parent b2afd124a7
commit d7832d7574
5 changed files with 35 additions and 14 deletions

@ -13,6 +13,7 @@ RUN pip3 install celery redis
RUN pip3 install dataclasses-json
RUN pip3 install rpdb
RUN apt-get install -yy netcat-traditional
RUN apt-get install vim
WORKDIR /app

@ -60,11 +60,12 @@ def ipython_update_shortcuts():
''',
'ipython_update_shortcuts()',
]
c.IPCompleter.use_jedi = False
c.InteractiveShellApp.extensions = ['autoreload']
c.InteractiveShell.history_length = 100 * 1000 * 1000
c.InteractiveShell.history_load_length = 100 * 1000 * 1000
c.InteractiveShell.enable_history_search = False
c.InteractiveShell.autosuggestions_provider = None
#c.InteractiveShell.enable_history_search = False
#c.InteractiveShell.autosuggestions_provider = None
c.InteractiveShell.pdb = True
c.TerminalInteractiveShell.editing_mode = 'vi'
c.TerminalInteractiveShell.modal_cursor = False

@ -62,10 +62,13 @@ def tiktok_config() -> tiktok_config_t.res_t:
return res
def logger_setup():
def logger_setup(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if len(logger.handlers) == 0:
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger_setup()
return logger
logger = logger_setup(__name__)

@ -15,12 +15,15 @@ from typing import (
Iterable,
)
import celery
from .config import tiktok_config
from .utils import Task
from .config import tiktok_config, logger_setup
from .utils import Task, shared_task
logger = logger_setup(__name__)
#logging.getLogger().setLevel(logging.INFO)
@celery.shared_task()
@shared_task()
async def tiktok_videos_links_get(
query: Optional[str]=None,
screenshot_path: Optional[str]=None,
@ -93,7 +96,7 @@ async def tiktok_videos_links_get(
return list(links)[:max_links]
@celery.shared_task()
@shared_task()
def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]:
res = []
for o in links:
@ -114,7 +117,7 @@ class tiktok_video_fetch_t:
tikcdn_io_curl = 'tikcdn.io-curl'
tikcdn_io_wget = 'tikcdn.io-wget'
@celery.shared_task()
@shared_task()
def tiktok_video_fetch(
id: int,
url: str,
@ -181,7 +184,7 @@ async def playwright_save(url: str):
download.save_as(path)
print(path)
@celery.shared_task()
@shared_task()
def tiktok_videos_fetch(
meta: Iterable[dict[str, Any]],
method: Optional[tiktok_video_fetch_t.method_t]=None,
@ -238,7 +241,7 @@ def tiktok_videos_fetch(
return stats
@celery.shared_task()
@shared_task()
def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]:
import tqdm
stats = dict(
@ -317,7 +320,7 @@ class audio_get_t:
path_mp3: str
url: str
@celery.shared_task(base=Task)
@shared_task()
def audio_get() -> audio_get_t.res_t:
c = tiktok_config()
url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ'

@ -1,4 +1,6 @@
import celery.app.task
import asyncio
import inspect
import importlib
import kombu.utils.json
from typing import (
@ -6,10 +8,21 @@ from typing import (
Optional,
)
def shared_task(*args: Any, **kwargs: Any) -> Any:
return celery.shared_task(
*args,
base=Task,
**kwargs
)
class Task(celery.app.task.Task):
def __call__(self, *args, **kwargs):
def __call__(self, *args, **kwargs) -> Any:
res = super().__call__(*args, **kwargs)
return res
print([res, inspect.isawaitable(res)])
if inspect.isawaitable(res):
return asyncio.run(res)
else:
return res
@classmethod
def _loads(