[~] add scrap-yt-music

This commit is contained in:
Siarhei Siniak 2024-05-04 15:11:34 +03:00
parent 2fb6f3893f
commit 6abf28632e

@ -1,5 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import enum import enum
import threading
import collections import collections
import datetime import datetime
import functools import functools
@ -1509,6 +1511,213 @@ def pm_service(argv):
print('') print('')
def scrap_yt_music(argv):
parser = optparse.OptionParser()
parser.add_option(
'--verbose',
dest='verbose',
type=str,
default=None,
help='true,false',
)
parser.add_option(
'-l',
'--library_path',
dest='library_path',
type=str,
default=None,
)
options, args = parser.parse_args(argv)
if options.library_path is None:
options.library_path = os.path.abspath(os.path.curdir)
if options.verbose is None:
options.verbose = False
else:
val = json.loads(options.verbose)
assert isinstance(val, bool)
options.verbose = val
import aiohttp.web
def http_events(context, res_cb):
data = []
async def handle(request):
data.append(request.rel_url.query.copy())
res_cb(event=data[-1], events=data)
if len(data) > 128:
del data[:128]
return aiohttp.web.Response(text='ok')
async def serve():
logging.info('http_events starting')
app = aiohttp.web.Application()
app.add_routes([aiohttp.web.get('/status', handle)])
runner = aiohttp.web.AppRunner(app, handle_signals=False,)
await runner.setup()
site = aiohttp.web.TCPSite(runner, host='127.0.0.1', port=8877)
await site.start()
logging.info('http_events started')
while True:
await asyncio.sleep(1)
if context['shutdown']:
break
await runner.cleanup()
logging.info('http_events done')
asyncio.run(serve())
#aiohttp.web.run_app(
# app,
# host='127.0.0.1',
# port=8877,
# handle_signals=False,
#)
#while True:
# data.append(
# subprocess.check_output(r'''
# nc -w 1 -l 127.0.0.1 8877 | head -n 1
# ''', shell=True,)
# )
def audio_recorder(context):
current_name = None
p = None
try:
while True:
with context['track_cv']:
context['track_cv'].wait(1)
if context['track_name'] != current_name:
logging.info('audio_record, track changed, started')
if not p is None:
logging.info('audio_record, track changed, terminating')
p.terminate()
p.wait()
p = None
logging.info('audio_record, track changed, terminated')
current_name = context['track_name']
if context['shutdown']:
if not p is None:
p.terminate()
break
if p is None and not current_name is None:
output_name = os.path.join(
options.library_path,
'%s.mp3' % current_name
)
logging.info('audio_record, new recording')
p = subprocess.Popen(
['sox', '-d', output_name],
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
logging.info(json.dumps(dict(output_name=output_name)))
except:
logging.error(traceback.format_exc())
finally:
if not p is None:
p.terminate()
context = dict(
http_on_event=lambda *args, **kwargs: None,
shutdown=False,
workers=[],
track_cv=threading.Condition(),
main_cv=threading.Condition(),
track_name=None,
)
context['workers'].extend([
threading.Thread(
target=functools.partial(
http_events,
context=context,
res_cb=lambda *args, **kwargs: context['http_on_event'](*args, **kwargs),
)
),
threading.Thread(
target=functools.partial(
audio_recorder,
context=context,
)
),
])
def http_on_event(event, events):
if 'title' in event:
with context['track_cv']:
context['track_name'] = str(event['title'])[:128].replace('\n', '')
logging.info(event)
context['http_on_event'] = http_on_event
print(r'''
Open Youtube Music,
and launch the following JS script:
```js
if (timer !== undefined)
{
clearInterval(timer);
}
timer = setInterval(
function(){
fetch(
'http://127.0.0.1:8877/status?title=' +
encodeURI($$('.ytmusic-player-bar.middle-controls')[0].innerText)
);
},
5000
);
```
''')
for w in context['workers']:
w.start()
#context['main_cv'] = threading.Condition()
def on_interrupt(*args, **kwargs):
logging.info('on_interrupt')
with context['main_cv']:
context['main_cv'].notify()
signal.signal(
signal.SIGINT,
on_interrupt,
)
signal.signal(
signal.SIGTERM,
on_interrupt,
)
with context['main_cv']:
context['main_cv'].wait()
with context['main_cv']:
context['shutdown'] = True
context['main_cv'].notify()
with context['track_cv']:
context['track_cv'].notify()
for o in context['workers']:
o.join()
def desktop_services(argv): def desktop_services(argv):
parser = optparse.OptionParser() parser = optparse.OptionParser()
parser.add_option( parser.add_option(
@ -2996,6 +3205,8 @@ def commands_cli():
desktop_services(sys.argv[2:]) desktop_services(sys.argv[2:])
elif sys.argv[1] == 'pm-service': elif sys.argv[1] == 'pm-service':
pm_service(sys.argv[2:]) pm_service(sys.argv[2:])
elif sys.argv[1] == 'scrap-yt-music':
scrap_yt_music(sys.argv[2:])
else: else:
raise NotImplementedError raise NotImplementedError
except SystemExit: except SystemExit: