[~] Refactor
This commit is contained in:
parent
cffbe712c6
commit
6adf6106f1
@ -10,6 +10,7 @@ RUN python3 -m playwright install
|
||||
RUN pip3 install tqdm
|
||||
RUN apt-get install -yy ffmpeg
|
||||
RUN pip3 install celery redis
|
||||
RUN pip3 install dataclasses-json
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
|
@ -31,6 +31,7 @@ run:
|
||||
sudo docker-compose \
|
||||
-f docker/tiktok/docker-compose.yml \
|
||||
run \
|
||||
--use-aliases \
|
||||
--rm tiktok
|
||||
|
||||
jupyter:
|
||||
|
@ -1,399 +0,0 @@
|
||||
import logging
|
||||
import enum
|
||||
import dataclasses
|
||||
import multiprocessing
|
||||
import traceback
|
||||
import subprocess
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from typing import (
|
||||
Any,
|
||||
Literal,
|
||||
Optional,
|
||||
Iterable,
|
||||
)
|
||||
import celery
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
#logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
class tiktok_config_t:
|
||||
@dataclasses.dataclass
|
||||
class res_t:
|
||||
project_root: str=''
|
||||
cache: str=''
|
||||
videos: str=''
|
||||
audios: str=''
|
||||
celery_broker: str=''
|
||||
celery_result_backend: str=''
|
||||
celery_imports: Iterable[str]=tuple()
|
||||
|
||||
def tiktok_config() -> tiktok_config_t.res_t:
|
||||
res = tiktok_config_t.res_t(
|
||||
project_root=os.path.abspath(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
'..', '..', '..',
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
res.celery_broker = 'redis://@redis/1'
|
||||
res.celery_result_backend = 'redis://@redis/2'
|
||||
res.celery_imports = ['python.tasks.tiktok']
|
||||
res.cache = os.path.join(
|
||||
res.project_root,
|
||||
'tmp/cache/tiktok',
|
||||
)
|
||||
res.videos = os.path.join(
|
||||
res.cache,
|
||||
'videos',
|
||||
)
|
||||
res.audios = os.path.join(
|
||||
res.cache,
|
||||
'audios',
|
||||
)
|
||||
|
||||
os.makedirs(res.videos, exist_ok=True)
|
||||
os.makedirs(res.audios, exist_ok=True)
|
||||
|
||||
return res
|
||||
|
||||
def logger_setup():
|
||||
if len(logger.handlers) == 0:
|
||||
handler = logging.StreamHandler(sys.stderr)
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
logger_setup()
|
||||
|
||||
@celery.shared_task()
|
||||
async def tiktok_videos_links_get(
|
||||
query: Optional[str]=None,
|
||||
screenshot_path: Optional[str]=None,
|
||||
max_time: Optional[int | float]=None,
|
||||
max_links: Optional[int]=None,
|
||||
) -> Iterable[str]:
|
||||
import datetime
|
||||
import TikTokApi
|
||||
import pyktok
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
if max_links is None:
|
||||
max_links = 100
|
||||
|
||||
if max_time is None:
|
||||
max_time = 10
|
||||
|
||||
async with TikTokApi.TikTokApi() as client:
|
||||
await client.create_sessions()
|
||||
|
||||
session = client.sessions[0]
|
||||
|
||||
if not query is None:
|
||||
await session.page.goto(
|
||||
'https://www.tiktok.com/search?q=%s' % query
|
||||
)
|
||||
|
||||
if not screenshot_path is None:
|
||||
await session.page.screenshot(
|
||||
path=screenshot_path,
|
||||
)
|
||||
|
||||
links = list()
|
||||
links_set = set()
|
||||
|
||||
started_at = datetime.datetime.now()
|
||||
|
||||
while True:
|
||||
content = await session.page.content()
|
||||
new_links = re.compile(
|
||||
r'https://www.tiktok.com/@\w+/video/\d+'
|
||||
).findall(content)
|
||||
|
||||
old_size = len(links)
|
||||
|
||||
for o in new_links:
|
||||
if not o in links_set:
|
||||
links_set.add(o)
|
||||
links.append(o)
|
||||
|
||||
await session.page.mouse.wheel(0, 100)
|
||||
|
||||
elapsed = (
|
||||
datetime.datetime.now() - started_at
|
||||
).total_seconds()
|
||||
|
||||
if elapsed > max_time:
|
||||
break;
|
||||
|
||||
if len(links_set) > max_links:
|
||||
break
|
||||
|
||||
if old_size < len(links):
|
||||
logger.info(json.dumps(dict(
|
||||
total=len(links),
|
||||
elapsed=elapsed,
|
||||
scroll_y=await session.page.evaluate('window.scrollY'),
|
||||
)))
|
||||
|
||||
return list(links)[:max_links]
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]:
|
||||
res = []
|
||||
for o in links:
|
||||
parts = o.split('/')
|
||||
|
||||
res.append(dict(
|
||||
url=o,
|
||||
id=int(parts[-1]),
|
||||
fname='_'.join(parts[-3:]) +'.mp4',
|
||||
result_dir=tiktok_config().videos,
|
||||
))
|
||||
|
||||
return res
|
||||
|
||||
class tiktok_video_fetch_t:
|
||||
class method_t(enum.Enum):
|
||||
pyktok = 'pyktok'
|
||||
tikcdn_io_curl = 'tikcdn.io-curl'
|
||||
tikcdn_io_wget = 'tikcdn.io-wget'
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_video_fetch(
|
||||
id: int,
|
||||
url: str,
|
||||
fname: str,
|
||||
result_dir: str,
|
||||
method: Optional[tiktok_video_fetch_t.method_t]=None,
|
||||
method_str: Optional[str]=None,
|
||||
) -> None:
|
||||
os.chdir(result_dir)
|
||||
|
||||
if not method_str is None:
|
||||
method = tiktok_video_fetch_t.method_t(method_str)
|
||||
|
||||
if method is None:
|
||||
method = tiktok_video_fetch_t.method_t.tikcdn_io_curl
|
||||
|
||||
if method == tiktok_video_fetch_t.method_t.pyktok:
|
||||
import pyktok
|
||||
pyktok.save_tiktok(url)
|
||||
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_curl:
|
||||
subprocess.check_call([
|
||||
'curl',
|
||||
'-v',
|
||||
'https://tikcdn.io/ssstik/%d' % id,
|
||||
'-o', fname,
|
||||
])
|
||||
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_wget:
|
||||
subprocess.check_call([
|
||||
'wget',
|
||||
'https://tikcdn.io/ssstik/%d' % id,
|
||||
'-O',
|
||||
fname,
|
||||
])
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
mime_type = file_mime_type(fname)
|
||||
|
||||
if mime_type in ['empty']:
|
||||
raise RuntimeError('notdownloaded')
|
||||
|
||||
def file_mime_type(path: str) -> Optional[str]:
|
||||
if os.path.exists(path):
|
||||
mime_type = subprocess.check_output([
|
||||
'file',
|
||||
'-b', path,
|
||||
]).strip().decode('utf-8')
|
||||
return mime_type
|
||||
else:
|
||||
return None
|
||||
|
||||
async def playwright_save(url: str):
|
||||
import TikTokApi
|
||||
|
||||
async with TikTokApi.TikTokApi() as client:
|
||||
await client.create_sessions()
|
||||
session = client.sessions[0]
|
||||
page = session.page
|
||||
|
||||
async with page.expect_download() as download_info:
|
||||
await page.goto(url)
|
||||
download = download_info.value
|
||||
path = download.path()
|
||||
download.save_as(path)
|
||||
print(path)
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_videos_fetch(
|
||||
meta: Iterable[dict[str, Any]],
|
||||
method: Optional[tiktok_video_fetch_t.method_t]=None,
|
||||
method_str: Optional[str]=None,
|
||||
force: Optional[bool]=None,
|
||||
) -> Iterable[dict[str, Any]]:
|
||||
import tqdm
|
||||
|
||||
if force is None:
|
||||
force = False
|
||||
|
||||
stats = dict(
|
||||
saved=0,
|
||||
total=0,
|
||||
skipped=0,
|
||||
error=0,
|
||||
)
|
||||
|
||||
with multiprocessing.Pool(processes=1) as pool:
|
||||
for o in tqdm.tqdm(meta):
|
||||
stats['total'] += 1
|
||||
path = os.path.join(
|
||||
o['result_dir'],
|
||||
o['fname'],
|
||||
)
|
||||
|
||||
if (
|
||||
not os.path.exists(path) or
|
||||
file_mime_type(path) in ['empty'] or
|
||||
force
|
||||
):
|
||||
try:
|
||||
pool.apply(
|
||||
tiktok_video_fetch,
|
||||
kwds=dict(
|
||||
id=o['id'],
|
||||
url=o['url'],
|
||||
fname=o['fname'],
|
||||
method=method,
|
||||
method_str=method_str,
|
||||
result_dir=o['result_dir'],
|
||||
),
|
||||
)
|
||||
stats['saved'] += 1
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except:
|
||||
logger.error(json.dumps(dict(
|
||||
msg=traceback.format_exc(),
|
||||
)))
|
||||
stats['error'] += 1
|
||||
else:
|
||||
stats['skipped'] += 1
|
||||
|
||||
return stats
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]:
|
||||
import tqdm
|
||||
stats = dict(
|
||||
saved=0,
|
||||
total=0,
|
||||
skipped=0,
|
||||
error=0,
|
||||
)
|
||||
|
||||
song = audio_get()
|
||||
|
||||
for o in tqdm.tqdm(meta):
|
||||
stats['total'] += 1
|
||||
|
||||
path = os.path.join(
|
||||
o['result_dir'],
|
||||
o['fname'],
|
||||
)
|
||||
|
||||
try:
|
||||
path_parts = os.path.splitext(path)
|
||||
|
||||
processed_path = path_parts[0] + '-proc' + path_parts[1]
|
||||
processed_path_tmp = path_parts[0] + '-proc.tmp' + path_parts[1]
|
||||
|
||||
if not os.path.exists(path) or os.path.exists(processed_path):
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
if os.path.exists(processed_path_tmp):
|
||||
os.unlink(processed_path_tmp)
|
||||
|
||||
ffmpeg = [
|
||||
'ffmpeg',
|
||||
'-i', path,
|
||||
'-i', song.path_mp3,
|
||||
'-shortest',
|
||||
'-vf',
|
||||
','.join([
|
||||
'setpts=1.1*PTS',
|
||||
'scale=trunc(iw/0.9):trunc(ow/a/2)*2',
|
||||
]),
|
||||
'-sws_flags', 'bilinear',
|
||||
'-map', '0:v:0',
|
||||
'-map', '1:a:0',
|
||||
processed_path_tmp,
|
||||
]
|
||||
|
||||
subprocess.check_call(
|
||||
ffmpeg,
|
||||
stdin=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL
|
||||
)
|
||||
|
||||
os.rename(processed_path_tmp, processed_path)
|
||||
|
||||
stats['saved'] += 1
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except:
|
||||
logger.error(json.dumps(dict(
|
||||
msg=traceback.format_exc(),
|
||||
)))
|
||||
stats['error'] += 1
|
||||
|
||||
return stats
|
||||
|
||||
class audio_get_t:
|
||||
@dataclasses.dataclass
|
||||
class res_t:
|
||||
file: str
|
||||
file_mp3: str
|
||||
path: str
|
||||
path_mp3: str
|
||||
url: str
|
||||
|
||||
@celery.shared_task()
|
||||
def audio_get() -> audio_get_t.res_t:
|
||||
c = tiktok_config()
|
||||
url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ'
|
||||
file = 'song.dat'
|
||||
file_mp3 = 'song.mp3'
|
||||
|
||||
path = os.path.join(c.audios, file)
|
||||
path_mp3 = os.path.join(c.audios, file_mp3)
|
||||
|
||||
if not os.path.exists(path):
|
||||
subprocess.check_call([
|
||||
'yt-dlp',
|
||||
'-f', 'bestaudio',
|
||||
url,
|
||||
'-o', path,
|
||||
])
|
||||
|
||||
if not os.path.exists(path_mp3):
|
||||
subprocess.check_call([
|
||||
'ffmpeg',
|
||||
'-i', path,
|
||||
path_mp3,
|
||||
])
|
||||
|
||||
return audio_get_t.res_t(
|
||||
file=file,
|
||||
file_mp3=file_mp3,
|
||||
path=path,
|
||||
path_mp3=path_mp3,
|
||||
url=url,
|
||||
)
|
@ -1,14 +1,19 @@
|
||||
from . import tiktok_config
|
||||
from .config import tiktok_config
|
||||
|
||||
import celery
|
||||
import redis
|
||||
|
||||
c = tiktok_config()
|
||||
|
||||
CELERY_IMPORTS = c.celery_imports
|
||||
|
||||
app = celery.Celery(
|
||||
__name__,
|
||||
broker=c.celery_broker,
|
||||
result_backend=c.celery_result_backend,
|
||||
)
|
||||
|
||||
app.autodiscover_tasks(c.celery_imports)
|
||||
|
||||
redis = dict(
|
||||
broker=redis.Redis(host='redis', db=int(c.celery_broker.split('/')[-1])),
|
||||
result_backend=redis.Redis(host='redis', db=int(c.celery_result_backend.split('/')[-1])),
|
||||
)
|
||||
|
71
python/tasks/tiktok/config.py
Normal file
71
python/tasks/tiktok/config.py
Normal file
@ -0,0 +1,71 @@
|
||||
import logging
|
||||
import enum
|
||||
import dataclasses
|
||||
import dataclasses_json
|
||||
import multiprocessing
|
||||
import traceback
|
||||
import subprocess
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from typing import (
|
||||
Any,
|
||||
Literal,
|
||||
Optional,
|
||||
Iterable,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
#logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
class tiktok_config_t:
|
||||
@dataclasses_json.dataclass_json
|
||||
@dataclasses.dataclass
|
||||
class res_t:
|
||||
project_root: str=''
|
||||
cache: str=''
|
||||
videos: str=''
|
||||
audios: str=''
|
||||
celery_broker: str=''
|
||||
celery_result_backend: str=''
|
||||
celery_imports: Iterable[str]=tuple()
|
||||
|
||||
def tiktok_config() -> tiktok_config_t.res_t:
|
||||
res = tiktok_config_t.res_t(
|
||||
project_root=os.path.abspath(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
'..', '..', '..',
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
res.celery_broker = 'redis://redis:6379/1'
|
||||
res.celery_result_backend = 'redis://redis:6379/2'
|
||||
res.celery_imports = ['python.tasks.tiktok.tasks']
|
||||
res.cache = os.path.join(
|
||||
res.project_root,
|
||||
'tmp/cache/tiktok',
|
||||
)
|
||||
res.videos = os.path.join(
|
||||
res.cache,
|
||||
'videos',
|
||||
)
|
||||
res.audios = os.path.join(
|
||||
res.cache,
|
||||
'audios',
|
||||
)
|
||||
|
||||
os.makedirs(res.videos, exist_ok=True)
|
||||
os.makedirs(res.audios, exist_ok=True)
|
||||
|
||||
return res
|
||||
|
||||
def logger_setup():
|
||||
if len(logger.handlers) == 0:
|
||||
handler = logging.StreamHandler(sys.stderr)
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
logger_setup()
|
351
python/tasks/tiktok/tasks.py
Normal file
351
python/tasks/tiktok/tasks.py
Normal file
@ -0,0 +1,351 @@
|
||||
import logging
|
||||
import enum
|
||||
import dataclasses
|
||||
import dataclasses_json
|
||||
import multiprocessing
|
||||
import traceback
|
||||
import subprocess
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from typing import (
|
||||
Any,
|
||||
Literal,
|
||||
Optional,
|
||||
Iterable,
|
||||
)
|
||||
import celery
|
||||
from .config import tiktok_config
|
||||
|
||||
#logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
@celery.shared_task()
|
||||
async def tiktok_videos_links_get(
|
||||
query: Optional[str]=None,
|
||||
screenshot_path: Optional[str]=None,
|
||||
max_time: Optional[int | float]=None,
|
||||
max_links: Optional[int]=None,
|
||||
) -> Iterable[str]:
|
||||
import datetime
|
||||
import TikTokApi
|
||||
import pyktok
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
if max_links is None:
|
||||
max_links = 100
|
||||
|
||||
if max_time is None:
|
||||
max_time = 10
|
||||
|
||||
async with TikTokApi.TikTokApi() as client:
|
||||
await client.create_sessions()
|
||||
|
||||
session = client.sessions[0]
|
||||
|
||||
if not query is None:
|
||||
await session.page.goto(
|
||||
'https://www.tiktok.com/search?q=%s' % query
|
||||
)
|
||||
|
||||
if not screenshot_path is None:
|
||||
await session.page.screenshot(
|
||||
path=screenshot_path,
|
||||
)
|
||||
|
||||
links = list()
|
||||
links_set = set()
|
||||
|
||||
started_at = datetime.datetime.now()
|
||||
|
||||
while True:
|
||||
content = await session.page.content()
|
||||
new_links = re.compile(
|
||||
r'https://www.tiktok.com/@\w+/video/\d+'
|
||||
).findall(content)
|
||||
|
||||
old_size = len(links)
|
||||
|
||||
for o in new_links:
|
||||
if not o in links_set:
|
||||
links_set.add(o)
|
||||
links.append(o)
|
||||
|
||||
await session.page.mouse.wheel(0, 100)
|
||||
|
||||
elapsed = (
|
||||
datetime.datetime.now() - started_at
|
||||
).total_seconds()
|
||||
|
||||
if elapsed > max_time:
|
||||
break;
|
||||
|
||||
if len(links_set) > max_links:
|
||||
break
|
||||
|
||||
if old_size < len(links):
|
||||
logger.info(json.dumps(dict(
|
||||
total=len(links),
|
||||
elapsed=elapsed,
|
||||
scroll_y=await session.page.evaluate('window.scrollY'),
|
||||
)))
|
||||
|
||||
return list(links)[:max_links]
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_videos_meta(links: Iterable[str]) -> Iterable[dict[str, Any]]:
|
||||
res = []
|
||||
for o in links:
|
||||
parts = o.split('/')
|
||||
|
||||
res.append(dict(
|
||||
url=o,
|
||||
id=int(parts[-1]),
|
||||
fname='_'.join(parts[-3:]) +'.mp4',
|
||||
result_dir=tiktok_config().videos,
|
||||
))
|
||||
|
||||
return res
|
||||
|
||||
class tiktok_video_fetch_t:
|
||||
class method_t(enum.Enum):
|
||||
pyktok = 'pyktok'
|
||||
tikcdn_io_curl = 'tikcdn.io-curl'
|
||||
tikcdn_io_wget = 'tikcdn.io-wget'
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_video_fetch(
|
||||
id: int,
|
||||
url: str,
|
||||
fname: str,
|
||||
result_dir: str,
|
||||
method: Optional[tiktok_video_fetch_t.method_t]=None,
|
||||
method_str: Optional[str]=None,
|
||||
) -> None:
|
||||
os.chdir(result_dir)
|
||||
|
||||
if not method_str is None:
|
||||
method = tiktok_video_fetch_t.method_t(method_str)
|
||||
|
||||
if method is None:
|
||||
method = tiktok_video_fetch_t.method_t.tikcdn_io_curl
|
||||
|
||||
if method == tiktok_video_fetch_t.method_t.pyktok:
|
||||
import pyktok
|
||||
pyktok.save_tiktok(url)
|
||||
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_curl:
|
||||
subprocess.check_call([
|
||||
'curl',
|
||||
'-v',
|
||||
'https://tikcdn.io/ssstik/%d' % id,
|
||||
'-o', fname,
|
||||
])
|
||||
elif method == tiktok_video_fetch_t.method_t.tikcdn_io_wget:
|
||||
subprocess.check_call([
|
||||
'wget',
|
||||
'https://tikcdn.io/ssstik/%d' % id,
|
||||
'-O',
|
||||
fname,
|
||||
])
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
mime_type = file_mime_type(fname)
|
||||
|
||||
if mime_type in ['empty']:
|
||||
raise RuntimeError('notdownloaded')
|
||||
|
||||
def file_mime_type(path: str) -> Optional[str]:
|
||||
if os.path.exists(path):
|
||||
mime_type = subprocess.check_output([
|
||||
'file',
|
||||
'-b', path,
|
||||
]).strip().decode('utf-8')
|
||||
return mime_type
|
||||
else:
|
||||
return None
|
||||
|
||||
async def playwright_save(url: str):
|
||||
import TikTokApi
|
||||
|
||||
async with TikTokApi.TikTokApi() as client:
|
||||
await client.create_sessions()
|
||||
session = client.sessions[0]
|
||||
page = session.page
|
||||
|
||||
async with page.expect_download() as download_info:
|
||||
await page.goto(url)
|
||||
download = download_info.value
|
||||
path = download.path()
|
||||
download.save_as(path)
|
||||
print(path)
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_videos_fetch(
|
||||
meta: Iterable[dict[str, Any]],
|
||||
method: Optional[tiktok_video_fetch_t.method_t]=None,
|
||||
method_str: Optional[str]=None,
|
||||
force: Optional[bool]=None,
|
||||
) -> Iterable[dict[str, Any]]:
|
||||
import tqdm
|
||||
|
||||
if force is None:
|
||||
force = False
|
||||
|
||||
stats = dict(
|
||||
saved=0,
|
||||
total=0,
|
||||
skipped=0,
|
||||
error=0,
|
||||
)
|
||||
|
||||
with multiprocessing.Pool(processes=1) as pool:
|
||||
for o in tqdm.tqdm(meta):
|
||||
stats['total'] += 1
|
||||
path = os.path.join(
|
||||
o['result_dir'],
|
||||
o['fname'],
|
||||
)
|
||||
|
||||
if (
|
||||
not os.path.exists(path) or
|
||||
file_mime_type(path) in ['empty'] or
|
||||
force
|
||||
):
|
||||
try:
|
||||
pool.apply(
|
||||
tiktok_video_fetch,
|
||||
kwds=dict(
|
||||
id=o['id'],
|
||||
url=o['url'],
|
||||
fname=o['fname'],
|
||||
method=method,
|
||||
method_str=method_str,
|
||||
result_dir=o['result_dir'],
|
||||
),
|
||||
)
|
||||
stats['saved'] += 1
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except:
|
||||
logger.error(json.dumps(dict(
|
||||
msg=traceback.format_exc(),
|
||||
)))
|
||||
stats['error'] += 1
|
||||
else:
|
||||
stats['skipped'] += 1
|
||||
|
||||
return stats
|
||||
|
||||
@celery.shared_task()
|
||||
def tiktok_videos_process(meta: Iterable[dict[str, Any]]) -> dict[str, Any]:
|
||||
import tqdm
|
||||
stats = dict(
|
||||
saved=0,
|
||||
total=0,
|
||||
skipped=0,
|
||||
error=0,
|
||||
)
|
||||
|
||||
song = audio_get()
|
||||
|
||||
for o in tqdm.tqdm(meta):
|
||||
stats['total'] += 1
|
||||
|
||||
path = os.path.join(
|
||||
o['result_dir'],
|
||||
o['fname'],
|
||||
)
|
||||
|
||||
try:
|
||||
path_parts = os.path.splitext(path)
|
||||
|
||||
processed_path = path_parts[0] + '-proc' + path_parts[1]
|
||||
processed_path_tmp = path_parts[0] + '-proc.tmp' + path_parts[1]
|
||||
|
||||
if not os.path.exists(path) or os.path.exists(processed_path):
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
if os.path.exists(processed_path_tmp):
|
||||
os.unlink(processed_path_tmp)
|
||||
|
||||
ffmpeg = [
|
||||
'ffmpeg',
|
||||
'-i', path,
|
||||
'-i', song.path_mp3,
|
||||
'-shortest',
|
||||
'-vf',
|
||||
','.join([
|
||||
'setpts=1.1*PTS',
|
||||
'scale=trunc(iw/0.9):trunc(ow/a/2)*2',
|
||||
]),
|
||||
'-sws_flags', 'bilinear',
|
||||
'-map', '0:v:0',
|
||||
'-map', '1:a:0',
|
||||
processed_path_tmp,
|
||||
]
|
||||
|
||||
subprocess.check_call(
|
||||
ffmpeg,
|
||||
stdin=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL
|
||||
)
|
||||
|
||||
os.rename(processed_path_tmp, processed_path)
|
||||
|
||||
stats['saved'] += 1
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except:
|
||||
logger.error(json.dumps(dict(
|
||||
msg=traceback.format_exc(),
|
||||
)))
|
||||
stats['error'] += 1
|
||||
|
||||
return stats
|
||||
|
||||
class audio_get_t:
|
||||
@dataclasses.dataclass
|
||||
@dataclasses_json.dataclass_json
|
||||
class res_t:
|
||||
file: str
|
||||
file_mp3: str
|
||||
path: str
|
||||
path_mp3: str
|
||||
url: str
|
||||
|
||||
@celery.shared_task()
|
||||
def audio_get() -> audio_get_t.res_t:
|
||||
c = tiktok_config()
|
||||
url = 'https://www.youtube.com/watch?v=dQw4w9WgXcQ'
|
||||
file = 'song.dat'
|
||||
file_mp3 = 'song.mp3'
|
||||
|
||||
path = os.path.join(c.audios, file)
|
||||
path_mp3 = os.path.join(c.audios, file_mp3)
|
||||
|
||||
if not os.path.exists(path):
|
||||
subprocess.check_call([
|
||||
'yt-dlp',
|
||||
'-f', 'bestaudio',
|
||||
url,
|
||||
'-o', path,
|
||||
])
|
||||
|
||||
if not os.path.exists(path_mp3):
|
||||
subprocess.check_call([
|
||||
'ffmpeg',
|
||||
'-i', path,
|
||||
path_mp3,
|
||||
])
|
||||
|
||||
return audio_get_t.res_t(
|
||||
file=file,
|
||||
file_mp3=file_mp3,
|
||||
path=path,
|
||||
path_mp3=path_mp3,
|
||||
url=url,
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user