freelance-project-34-market.../dotfiles/.local/bin/commands
2024-09-30 23:29:37 +03:00

3606 lines
105 KiB
Python
Executable File

#!/usr/bin/env python3
import asyncio
import enum
import threading
import shutil
import collections
import datetime
import functools
import io
import json
import logging
import optparse
import os
import pprint
import re
import select
import signal
import socket
import subprocess
import sys
import tempfile
import time
import traceback
from typing import (Literal, Optional,)
logger = logging.getLogger(__name__)
def custom_notify(
title: Optional[str]=None,
msg: Optional[str]=None,
timeout: Optional[int]=None,
) -> None:
if timeout is None:
timeout = 5
timeout2 = int(timeout * 1000)
assert isinstance(timeout2, int) and timeout2 >= 500
if title is None:
title = 'commands'
assert isinstance(title, str) and len(title) > 0
assert isinstance(msg, str) and len(msg) > 0
if sys.platform == 'darwin':
osascript_translate = functools.partial(
custom_translate,
check=lambda a, b:
not re.compile(
r'^[a-zA-Z0-9\<\>\/\(\)\s\.\,\:]*$'
)\
.match(b) is None,
)
subprocess.check_call([
'osascript',
'-e',
'display notification "%s" with title "%s"' % (
osascript_translate(msg),
osascript_translate(title),
)
])
else:
subprocess.check_call([
'notify-send',
'-t', '%d' % timeout2,
title,
msg[-128:]
])
def intercept_output(
current_subprocess,
return_aggregated=None,
transform_callback=None,
real_time=None,
timeout=None,
need_lines=None,
):
if real_time is None:
real_time = False
start_timestamp = datetime.datetime.now()
if not return_aggregated:
return_aggregated = False
t1 = select.poll()
t1.register(current_subprocess.stdout, select.POLLIN)
#print([current_subprocess, current_subprocess.poll()])
output = []
buffer = collections.deque()
buffer_lines = collections.deque()
last_data = None
while not (
not current_subprocess.poll() is None and \
not last_data is None
):
if not timeout is None and \
(datetime.datetime.now() - start_timestamp).total_seconds() > timeout:
break
t2 = t1.poll(100)
if len(t2) == 1 and (t2[0][1] & select.POLLIN) > 0 and \
not (isinstance(last_data, bytes) and len(last_data) == 0):
t3 = current_subprocess.stdout.peek()
t4 = current_subprocess.stdout.read(len(t3))
last_data = t3
output.append(t3)
if need_lines:
buffer.append(t3)
if need_lines:
if b'\n' in t3:
t3_pos = t3.rfind(b'\n')
buffer_lines.extend([
o + b'\n'
for o in b''.join(
list(buffer)[:-1] + [
t3[:t3_pos]
],
).splitlines()
])
buffer.clear()
buffer.append(t3[t3_pos + 1:])
while len(buffer_lines) > 0:
yield dict(
aggegated=False,
line=buffer_lines.popleft(),
)
else:
yield dict(
data=t3,
aggregated=False,
)
t6 = t3
if not transform_callback is None:
t5 = transform_callback(t3)
if not t5 is None:
t6 = t5
if len(t6) > 0:
os.write(sys.stdout.fileno(), t6)
elif real_time:
yield dict(
data=b'',
aggregated=False,
)
if return_aggregated:
yield dict(
data=b''.join(output),
aggregated=True,
returncode=current_subprocess.poll(),
)
def player_metadata():
for k in range(20):
try:
metadata = {
k : subprocess.check_output(['playerctl', 'metadata', k]).decode('utf-8').strip()
for k in ['artist', 'title']
}
return '%s - %s' % (metadata['artist'], metadata['title'])
time.sleep(1.0)
except:
continue
def memory_stats():
if sys.platform == 'linux':
with io.BytesIO(
subprocess.check_output(
'free',
shell=True
)
) as f:
t1 = f.read().decode('utf-8').splitlines()
mem_total = int(t1[1].strip().split()[1])
mem_used = \
int(t1[1].strip().split()[2]) + \
int(t1[1].strip().split()[4])
return dict(
mem_total=mem_total,
mem_used=mem_used,
)
elif sys.platform == 'darwin':
sysctl_value = lambda name, custom_cast=int: \
custom_cast(
subprocess.check_output(
'sysctl -a | grep %s' % name,
shell=True,
).decode('utf-8').split(':')[1]
)
vm_pagesize = sysctl_value('vm.pagesize')
mem_total = sysctl_value('hw.memsize')
t1 = subprocess.check_output('vm_stat').decode('utf-8')
t2 = [o.split(':') for o in t1.splitlines() if ':' in o]
t3 = {
o[0].replace(' ', '_').replace('-', '_').lower() \
: \
int(o[1].strip().rstrip('.'))
for o in t2
if len(o) == 2 and len(o[0]) > 0 \
and not re.compile(r'^\s*\d+\.\s*$').match(o[1]) is None \
and not re.compile(r'^[a-zA-Z0-9\_\-\s]+$').match(o[0]) is None
}
mem_used = (
t3['pages_active'] + \
t3['pages_wired_down']
) * vm_pagesize
return dict(
mem_total=mem_total / 1024,
mem_used=mem_used / 1024,
)
else:
raise NotImplementedError
def chrome(
argv
):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
'--user_data_dir',
dest='user_data_dir',
default=None,
type=str,
)
options, args = parser.parse_args(argv)
if options.user_data_dir is None:
options.user_data_dir = os.path.join(
os.environ['HOME'],
'.config',
'google-chrome',
)
#assert os.path.exists(options.user_data_dir)
if sys.platform == 'linux':
subprocess.check_call([
'google-chrome-stable',
'--enable-features=useOzonePlatform',
'--ozone-platform=wayland',
'--process-per-site',
'--user-data-dir=%s' % options.user_data_dir,
*args,
])
else:
raise NotImplementedError
def eternal_oom(argv):
import signal
import re
import time
import pprint
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
'--cpu_wait',
dest='cpu_wait',
default=None,
type=float,
)
parser.add_option(
'--mean_size',
dest='mean_size',
default=None,
type=int,
)
parser.add_option(
'--one_shot_clean',
dest='one_shot_clean',
action='store_true',
default=None,
)
parser.add_option(
'--cpu',
dest='cpu_json',
type=str,
default=None,
)
parser.add_option(
'--one_shot_app',
dest='one_shot_app',
default=[],
action='append',
)
parser.add_option(
'--period',
dest='period',
default=None,
type=float,
)
parser.add_option(
'--memory_limit',
dest='memory_limit',
default=None,
type=float,
)
parser.add_option(
'--cpu_limit',
dest='cpu_limit',
default=None,
type=float,
)
parser.add_option(
'--debug',
dest='debug',
action='store_true',
default=False,
)
options, args = parser.parse_args(argv)
if not options.cpu_json is None:
options.cpu = json.loads(options.cpu_json)
else:
options.cpu = True
self_pid = os.getpid()
if isinstance(options.one_shot_clean, bool) and options.one_shot_clean:
if len(options.one_shot_app) == 0:
options.one_shot_app = ['chrome', 'teams']
config = dict(
chrome=(r'chrome.*type=renderer', r'^.*--extension-process.*$'),
teams=(r'teams.*type=renderer', None),
)
for app in options.one_shot_app:
p = config[app]
try:
t1 = subprocess.check_output([
'pgrep', '-a', '-f', p[0]
]).decode('utf-8')
except:
continue
t2 = t1.splitlines()
if not p[1] is None:
t3 = [o for o in t2 if re.compile(p[1]).match(o) is None]
else:
t3 = t2
t4 = [
int(o.split()[0]) for o in t3
]
for pid in t4:
if pid == self_pid:
raise NotImplementedError
os.kill(pid, signal.SIGTERM)
logging.info(json.dumps(dict(
apps=options.one_shot_app,
count=len(t4),
processes=[
o.split()[:3] for o in t3
],
)))
if len(t4) > 0:
print(
'\n'.join([
str(o.split()[:3]) for o in t3
])
)
return
if options.period is None:
options.period = 1
if options.memory_limit is None:
options.memory_limit = 3 * 1024 * 1024
if options.cpu_limit is None:
options.cpu_limit = 0.6 * os.cpu_count()
if options.cpu_wait is None:
options.cpu_wait = 10
if options.mean_size is None:
options.mean_size = 30
if isinstance(options.memory_limit, float):
options.memory_limit = int(options.memory_limit)
assert isinstance(options.memory_limit, int) \
and options.memory_limit < memory_stats()['mem_total'] * 0.95 \
and options.memory_limit > 512 * 1024
assert isinstance(options.cpu_limit, float) \
and options.cpu_limit > 0.2 * os.cpu_count() and \
options.cpu_limit < os.cpu_count() * 0.95
assert options.period >= 1
assert options.cpu_wait >= 10
assert options.mean_size >= 16
def pandas_data_frame(lines, groups_regex, header_regex, extra_columns):
header = re.compile(header_regex).search(lines[0]).groups()
rows = [
re.compile(groups_regex).search(row).groups()
for row in lines[1:]
]
columns = {
column: []
for column in header
}
for row in rows:
for value, column in zip(row, header):
columns[column].append(value)
for column, transformation in extra_columns.items():
columns[column] = [
transformation(
{
k : v[index]
for k, v in columns.items()
}
)
for index in range(len(rows))
]
return columns
def pandas_merge(left, right, on):
index = {}
input_data_frames = [
('left', left),
('right', right),
]
for index_name, data_frame in input_data_frames:
current_index = {}
for row_index, value in enumerate(data_frame[on]):
if not value in current_index:
current_index[value] = []
current_index[value].append(row_index)
index[index_name] = current_index
merged_data_frame = dict(
header=[
column + '_x'
for column in left
] + [
column + '_y'
for column in right
],
columns={},
)
for column in merged_data_frame['header']:
merged_data_frame['columns'][column] = []
common_values = {
left_value
for left_value in index['left']
if left_value in index['right']
}
common_rows = sorted(
[
dict(
left_row_index=index['left'][value][0],
right_row_index=index['right'][value][0],
)
for value in common_values
],
key=lambda x: x['left_row_index'],
)
for common_row in common_rows:
row = sum([
[
values[
common_row['%s_row_index' % index_name]
]
for column, values in data_frame.items()
]
for index_name, data_frame in input_data_frames
], [])
for column, value in zip(merged_data_frame['header'], row):
merged_data_frame['columns'][column].append(value)
return merged_data_frame['columns']
def pandas_sort_values(data_frame, by, ascending):
assert len(by) == 1
assert ascending is False
t1 = [
o['row_index']
for o in sorted(
[
dict(
row_index=row_index,
value=value
)
for row_index, value in enumerate(data_frame[by[0]])
],
key=lambda x: x['value']
)[::-1]
]
return {
column : [
values[row_index]
for row_index in t1
]
for column, values in data_frame.items()
}
def pandas_filter_values(data_frame, condition):
shape = [
len(data_frame),
]
if shape[0] > 0:
shape.append(
len(list(data_frame.values())[0])
)
t1 = [
row_index
for row_index in range(shape[1])
if condition(
{
column : values[row_index]
for column, values in data_frame.items()
}
)
]
return {
column : [
values[row_index]
for row_index in t1
]
for column, values in data_frame.items()
}
def pandas_row(data_frame, row_index):
return {
column : values[row_index]
for column, values in data_frame.items()
}
def pandas_shape(data_frame):
columns_count = len(data_frame)
if columns_count > 0:
rows_count = len(data_frame[
next(iter(data_frame.keys()))
])
else:
rows_count = 0
return [
columns_count,
rows_count,
]
def ps_regex(groups_cnt):
assert groups_cnt >= 1
return ''.join([
r'^\s*',
r'([^\s]+)\s+' * (groups_cnt - 1),
r'([^\s]+)\s*$',
])
def oom_get_processes(extra_filter=None,):
with io.BytesIO(
subprocess.check_output(
'ps -e -o pid,rss,user,%cpu',
shell=True
)
) as f:
t1 = pandas_data_frame(
f.read().decode('utf-8').splitlines(),
ps_regex(4),
ps_regex(4),
dict(
PID=lambda row: int(row['PID']),
RSS=lambda row: int(row['RSS']),
CPU=lambda row: float(row['%CPU']),
),
)
del t1['%CPU']
assert set(t1.keys()) == set(['PID', 'RSS', 'USER', 'CPU'])
t5 = subprocess.check_output(
'ps -e -o pid,args',
shell=True
).decode('utf-8').splitlines()
t6 = pandas_data_frame(
t5,
r'^\s*(\d+)\s(.*)$',
r'^\s+(\w+)\s+(\w+)\s*$',
dict(
PID=lambda row: int(row['PID'])
),
)
if not 'COMMAND' in t6:
if sys.platform == 'darwin' and 'ARGS' in t6:
t6['COMMAND'] = t6['ARGS']
del t6['ARGS']
else:
raise NotImplementedError
assert set(t6.keys()) == set(['PID', 'COMMAND'])
t11 = pandas_merge(t1, t6, on='PID')
if extra_filter is None:
extra_filter = lambda *args : True
t7 = pandas_filter_values(
t11,
lambda row: \
row['PID_x'] != self_pid and \
not 'freelancer' in row['COMMAND_y'] and \
extra_filter(row)
)
t8 = pandas_sort_values(
t7,
by=['RSS_x'],
ascending=False
)
t9 = pandas_sort_values(
t7,
by=['CPU_x'],
ascending=False
)
t10 = sum(t9['CPU_x'], 0.0) / 100
if options.debug:
pprint.pprint([t9['CPU_x'][:10], t10 * 100])
return dict(
by_mem=t8,
by_cpu=t9,
total_cpu=t10,
)
def oom_display_rows(current_dataframe):
print('\n'.join([
(
lambda row: \
'% 8d\t% 6.3f GiB\t% 5.2f %%\t% 10s\t%s' % (
row['PID_x'],
row['RSS_x'] / 1024 / 1024,
row['CPU_x'],
row['USER_x'],
row['COMMAND_y'],
)
)(
pandas_row(current_dataframe, k)
)
for k in range(
0,
min(
5,
pandas_shape(current_dataframe)[1],
)
)
]))
def oom_kill(pid):
assert isinstance(pid, int)
try:
logging.info('%s oom_kill, pid %d' % (
datetime.datetime.now().isoformat(),
pid,
))
os.kill(pid, signal.SIGKILL)
except:
logging.error(traceback.format_exc())
custom_notify(
msg='oom_kill, failed to kill pid %d' % pid
)
def oom_status():
print(
'\r%s %6.2f / %.2f %%, %6.2f / %.2f GiB' % (
datetime.datetime.now().isoformat(),
oom_mean_cpu() / os.cpu_count() * 100,
options.cpu_limit / os.cpu_count() * 100,
memory_stats()['mem_used'] / 1024 / 1024,
options.memory_limit / 1024 / 1024,
),
end=''
)
def first_check():
current_memory_stats = memory_stats()
t11 = oom_get_processes()
t8 = t11['by_mem']
if current_memory_stats['mem_used'] > options.memory_limit:
oom_display_rows(t8)
if t11['total_cpu'] > options.cpu_limit:
oom_display_rows(t11['by_cpu'])
free_before_oom = (
options.memory_limit - current_memory_stats['mem_used']
)
print(
'available %5.2f %% out of %5.2f %% of cpu limit before OOC' % (
(options.cpu_limit - t11['total_cpu']) * 100 / os.cpu_count(),
options.cpu_limit * 100 / os.cpu_count(),
)
)
print(
'%5.2f GiB [%5.2f %%] out of %5.2f GiB of free memory before OOM' % (
free_before_oom / 1024 / 1024,
free_before_oom / options.memory_limit * 100,
options.memory_limit / 1024 / 1024,
)
)
del t8
del t11
print('press Enter to start monitoring: ...', end='')
input()
print('\nstarted...')
first_check()
last_total_cpu = []
last_cpu_high = None
def oom_add_cpu(total_cpu):
if options.debug:
pprint.pprint([total_cpu, last_total_cpu])
last_total_cpu.append(total_cpu)
if len(last_total_cpu) > options.mean_size:
del last_total_cpu[-options.mean_size:]
def oom_mean_cpu():
return sum(last_total_cpu) / (len(last_total_cpu) + 1e-8)
def oom_cpu_high(cpu_limit=None):
if cpu_limit is None:
cpu_limit = options.cpu_limit
nonlocal last_cpu_high
if oom_mean_cpu() > cpu_limit:
if last_cpu_high is None:
last_cpu_high = datetime.datetime.now().timestamp()
if datetime.datetime.now().timestamp() - last_cpu_high > options.cpu_wait:
last_cpu_high = None
del last_total_cpu[:]
return True
return False
mem_used = None
mem_stat = None
def oom_mem_high(memory_limit=None):
nonlocal mem_used
if memory_limit is None:
memory_limit = options.memory_limit
return mem_used > memory_limit
while True:
mem_stat = memory_stats()
mem_used = mem_stat['mem_used']
if options.memory_limit < mem_stat['mem_total'] and not oom_mem_high(
mem_stat['mem_total'] - (
mem_stat['mem_total'] - options.memory_limit
) / 2
):
extra_filters = lambda row: (
'chrome' in row['COMMAND_y'] and '--type=renderer' in row['COMMAND_y']
or not 'chrome' in row['COMMAND_y']
)
else:
extra_filters = None
t11 = oom_get_processes(extra_filters)
oom_add_cpu(t11['total_cpu'])
t8 = t11['by_mem']
t9 = t8
t4 = lambda : oom_kill(t9['PID_x'][0])
oom_status()
if oom_mem_high():
print('\n', end='')
pprint.pprint([
'Killing [OOM]',
pandas_row(t9, 0),
mem_used,
])
t4()
if options.cpu and oom_cpu_high():
oom_display_rows(t11['by_cpu'])
print('\n', end='')
pprint.pprint([
'Killing [CPU]',
pandas_row(t11['by_cpu'], 0),
[options.cpu_limit, oom_mean_cpu(), t11['total_cpu']],
])
oom_kill(t11['by_cpu']['PID_x'][0])
time.sleep(options.period)
def resilient_vlc(stream=None):
if stream is None:
streams_path = os.path.join(
os.environ['CACHE_PATH'],
'resilient-vlc-streams.json'
)
if os.path.exists(streams_path):
with io.open(
streams_path,
'r'
) as f:
stream = json.load(f)
else:
raise RuntimeError(
'not found, %s' % streams_path
)
if isinstance(stream, str):
stream = [stream]
if len(stream) == 0:
raise RuntimeError('no streams')
import subprocess
import time
while True:
print('new start')
with subprocess.Popen([
'cvlc', '--verbose', '2', *stream,
], stderr=subprocess.PIPE) as p:
while p.returncode is None:
t1 = p.stderr.readline().decode('utf-8')
if len(t1) > 0:
print(t1)
if not all([
o in t1
for o in [
'prefetch stream error',
'terror',
'main interface error',
]
]) and any([
o in t1
for o in [
'pulse audio output debug: underflow'
]
]):
print('shit')
p.kill()
while True:
try:
t2 = p.wait(timeout=1)
print(t2)
break
except:
print('shit')
pass
time.sleep(1.0)
def sway_sock():
import glob
uid = os.stat(os.environ['HOME']).st_uid
t1 = glob.glob(
os.path.join(
'/run',
'user',
'%d' % uid,
'sway-ipc.%d*.sock' % uid,
)
)
t2 = [
os.stat(o).st_mtime
for o in t1
]
t3 = sorted(enumerate(t1), key=lambda x: t2[x[0]])[-1][0]
return t1[t3]
def eternal_firefox(
tabs=None,
profile=None,
group_name=None,
window_position=None,
debug=None,
):
import os
import datetime
import pprint
import subprocess
import time
if debug is None:
debug = False
if tabs is None:
raise RuntimeError('no tabs provided')
if profile is None:
raise RuntimeError('no profile provided')
if group_name is None:
raise RuntimeError('no group provided')
if window_position is None:
#window_position = '1,600,0,600,540'
raise RuntimeError('no window-position provided')
while True:
os.system(r'''date''')
with subprocess.Popen([
'firefox',
'-P', profile,
*tabs,
]) as p:
try:
if debug:
assert subprocess.check_call(['notify-send', '%s:Starting' % group_name]) == 0
#t3 = ''
for k in range(300):
t1 = subprocess.check_output(r'''
swaymsg -t get_tree | jq -r '..|try select(.pid== %d)'
''' % p.pid, shell=True).decode('utf-8')
if len(t1) > 10:
break
#time.sleep(0.1)
#t1 = subprocess.check_output(['wmctrl', '-p', '-l']).decode('utf-8')
#t4 = [o for o in t1.splitlines() if str(p.pid) in o]
#if len(t4) == 1:
# t3 = t4[0]
# break
#if t3 == '':
# raise RuntimeError
#t2 = t3.split()[0]
#assert os.system('wmctrl -i -r %s -e %s' % (t2, window_position)) == 0
#assert os.system('wmctrl -i -r %s -b add,below' % t2) == 0
def reposition():
t1 = lambda s: \
s \
.replace('{{PID}}', str(p.pid)) \
.replace('{{X}}', str(window_position[1])) \
.replace('{{Y}}', str(window_position[2])) \
.replace('{{W}}', str(window_position[3])) \
.replace('{{H}}', str(window_position[4])) \
.replace('{{WORKSPACE}}', str(window_position[0]))
assert os.system(t1(r'''
swaymsg '[pid="{{PID}}"] move window to workspace {{WORKSPACE}}'
''')) == 0
if window_position[1] != '' and window_position[2] != '':
assert os.system(t1(r'''
swaymsg '[pid="{{PID}}"] floating enable' \
swaymsg '[pid="{{PID}}"] resize set width {{W}}px height {{H}}px' && \
swaymsg '[pid="{{PID}}"] move absolute position {{X}}px {{Y}}px'
''')) == 0
else:
assert os.system(t1(r'''
swaymsg '[pid="{{PID}}"] floating disable'
''')) == 0
if False:
for tab in tabs[1:]:
time.sleep(10)
assert subprocess.check_call([
'firefox',
'-P', profile,
'--new-tab',
tab,
]) == 0
reposition()
if debug:
assert subprocess.check_call(['notify-send', '%s:Started' % group_name]) == 0
start = datetime.datetime.now()
is_to_restart = lambda : (datetime.datetime.now() - start).total_seconds() >= 900 * 4
polling_count = 0
while not is_to_restart():
if polling_count == 0:
reposition()
if not p.poll() is None:
break
time.sleep(10)
polling_count += 1
if debug:
assert subprocess.check_call(['notify-send', '%s:Closing' % group_name]) == 0
#assert os.system('wmctrl -i -c %s' % t2) == 0
assert os.system(r'''
swaymsg '[pid="%d"] kill'
''' % (p.pid,)) == 0
except KeyboardInterrupt:
assert os.system(r'''
swaymsg '[pid="%d"] kill'
''' % (p.pid,)) == 0
break
except:
import traceback
import pprint
pprint.pprint(traceback.format_exc())
finally:
try:
p.wait(20)
except subprocess.TimeoutExpired:
pprint.pprint([p.pid, '20 seconds timeout', 'kill'])
p.kill()
if debug:
assert subprocess.check_call(['notify-send', '%s:Closed' % group_name]) == 0
def resilient_ethernet(ip_addr, ethernet_device):
subprocess.check_call(
r'''
sudo sh -c '\
while true; \
do ping -c 3 -w 3 -W 1 {{IP_ADDR}} || (\
ip link set {{ETHERNET_DEVICE}} down; \
ip link set {{ETHERNET_DEVICE}} up; \
sleep 4; true;\
); \
sleep 10; clear; date; \
done'
'''.replace(
'{{IP_ADDR}}',
ip_addr
).replace(
'{{ETHERNET_DEVICE}}}',
ethernet_device
),
shell=True
)
def http_server(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
'--public',
dest='public',
action='store_true',
default=False,
)
parser.add_option(
'--force',
dest='force',
action='store_true',
default=False,
)
parser.add_option(
'--token',
dest='token',
type=str,
default=None,
)
parser.add_option(
'--port',
dest='port',
type='int',
default=80,
)
parser.add_option(
'--no_docker',
dest='docker',
action='store_false',
default=None,
)
parser.add_option(
'-H', '--header',
dest='response_headers',
type='str',
action='append',
default=[],
)
parser.add_option(
'--host',
dest='host',
type='str',
default='127.0.0.1',
)
parser.add_option(
'--prefix',
dest='prefix',
type='str',
default=None,
)
options, args = parser.parse_args(argv)
assert options.port >= 1
try:
assert not socket.inet_aton(options.host) is None
subprocess.check_call([
'ping', '-w', '1',
options.host
])
except:
raise RuntimeError('invalid ip address %s' % options.host)
if options.docker is None:
options.docker = True
index_section = 'autoindex on;'
if options.public:
options.token = 'public'
location_section = 'location / {%s}' % index_section
else:
if options.token is None:
options.token = os.urandom(16).hex()
if options.docker:
DATA_DIR = '/usr/share/nginx'
APP_DIR = '/app'
CONF_DIR = '/etc/nginx'
else:
DATA_DIR = '/opt/nginx/%s/data/' % options.token
CONF_DIR = '/opt/nginx/%s/conf/' % options.token
APP_DIR = os.path.abspath(os.path.curdir)
if not options.public:
if not options.prefix is None:
path = options.prefix + options.token
else:
path = options.token
logger.info(
'access url is http://%s:%d/%s/' % (
options.host,
options.port,
path,
)
)
assert all([
not re.compile(
r'^[A-Za-z-]+ [a-z0-9A-Z-\.]+$'
).match(o) is None
for o in options.response_headers
])
location_section = (
'location / {'
'deny all;'
'}'
'location /%s/ {'
'alias %s/;'
'%s'
'%s'
'}'
) % (
path,
APP_DIR,
'\n'.join([
'add_header %s;' % o
for o in options.response_headers
]),
index_section
)
if options.docker:
subprocess.check_call(
r'''
sudo docker run \
-p %s:%d:80 \
-u root \
-it --entrypoint=/bin/bash \
-v $PWD:%s:ro \
--log-driver none \
nginx:latest \
-c 'echo "server{listen 80; charset UTF-8; root /app; %s}" > /etc/nginx/conf.d/default.conf; nginx -g "daemon off;"'
''' % (
options.host,
options.port,
APP_DIR,
location_section,
),
shell=True
)
else:
if os.path.exists(CONF_DIR):
assert options.force
shutil.rmtree(CONF_DIR)
if os.path.exists(DATA_DIR):
assert options.force
shutil.rmtree(DATA_DIR)
os.makedirs(CONF_DIR, exist_ok=True)
os.makedirs(
os.path.join(
CONF_DIR,
'conf.d',
),
exist_ok=True
)
os.makedirs(DATA_DIR, exist_ok=True)
with io.open(
os.path.join(CONF_DIR, 'nginx.conf'),
'w'
) as f:
f.write(r'''
events {
multi_accept on;
worker_connections 32;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
expires off;
open_file_cache off;
log_format main
'[$time_local][$remote_addr, $http_x_forwarded_for, $t1, $server_name]'
'[$request_length,$bytes_sent,$request_time]'
'[$status][$request]'
'[$http_user_agent][$http_referer]';
access_log /dev/null combined;
access_log /dev/stderr main;
include %s/conf.d/*.conf;
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
}
''' % CONF_DIR)
with io.open(
os.path.join(
CONF_DIR,
'conf.d',
'default.conf',
),
'w',
) as f:
f.write(r'''
server {
server_name %s;
listen %d;
charset UTF-8;
client_max_body_size 2K;
set $t1 $remote_addr;
if ($http_x_forwarded_for)
{
set $t1 $http_x_forwarded_for;
}
root %s;
%s
}
''' % (
options.host,
options.port,
DATA_DIR,
location_section,
))
sys.stderr.flush()
sys.stdout.flush()
os.execv(
'/usr/sbin/nginx',
[
'/usr/sbin/nginx',
'-c',
os.path.join(CONF_DIR, 'nginx.conf'),
'-p', DATA_DIR,
'-g', 'daemon off;',
],
)
def pass_ssh_osx(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
'--list',
dest='list',
default=False,
action='store_true',
)
parser.add_option(
'--pass_option',
dest='pass_option',
action='append',
default=[],
type=str,
help='pass secret path, like --pass_option google.com/login/password/v1',
)
parser.add_option(
'--clipboard_copy',
dest='clipboard_copy',
default=None,
type=str,
)
parser.add_option(
'--debug',
dest='debug',
action='store_true',
default=False,
)
assert sys.platform in ['darwin', 'linux']
options, args = parser.parse_args(argv)
if options.clipboard_copy is None:
if sys.platform == 'linux':
options.clipboard_copy = 'wl-copy'
elif sys.platform == 'darwin':
options.clipboard_copy = 'pbcopy'
else:
raise NotImplementedError
if len(args) == 0:
raise RuntimeError('ssh_command is required')
if options.debug:
print(options.pass_option)
pprint.pprint(args)
reset_gpg_agent = r'''
gpgconf --kill gpg-agent && \
gpgconf --reload gpg-agent
'''
if not options.list:
t1 = options.pass_option
assert len(t1) > 0
print(
'select on of pass names\n%s' % '\n'.join([
'%d: %s' % (k, v)
for k, v in enumerate(t1)
])
)
while True:
try:
t2 = input()
t3 = int(t2)
assert t3 >= 0 and t3 < len(t1)
break
except:
continue
command = r'''
%s
gpg \
--pinentry-mode=ask \
-q -u $(cat ~/.password-store/.gpg-id) \
--decrypt \
~/.password-store/%s.gpg && \
echo -n '['$?']' && \
%s
''' % (
reset_gpg_agent,
t1[t3],
reset_gpg_agent,
)
else:
command = 'pass list | less -R'
ssh_command = [
'ssh', '-C',
'-o', 'ConnectTimeout 10',
'-o', 'ServerAliveInterval 1',
*args,
'-t',
command,
]
if options.debug:
pprint.pprint(
dict(
ssh_command=ssh_command,
)
)
if options.list:
subprocess.check_call(ssh_command)
else:
def clipboard_set(text):
with subprocess.Popen([
options.clipboard_copy,
], stdin=subprocess.PIPE) as p:
p.stdin.write(text.encode('utf-8'))
p.stdin.flush()
p.stdin.close()
p.wait(1)
assert p.poll() == 0
with subprocess.Popen(
ssh_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
) as p:
password = None
last_chunk = None
hide_password = False
pinentry_delimeter = b'\x1b>'
def transform_callback(data):
nonlocal hide_password
nonlocal pinentry_delimeter
data2 = None
if not last_chunk is None:
data = last_chunk['data'] + data
if hide_password:
data2 = b''
elif pinentry_delimeter in data:
hide_password = True
pos = data.rfind(pinentry_delimeter)
if pos == -1:
data2 = data
else:
data2 = data[:pos + len(pinentry_delimeter)]
elif data == b'':
#return b'\r\n'
return b''
else:
data2 = None
return data2
for chunk in intercept_output(
current_subprocess=p,
return_aggregated=True,
transform_callback=transform_callback,
real_time=True,
#timeout=10,
):
if chunk['aggregated']:
last_chunk = chunk
break
assert not last_chunk is None
assert last_chunk['returncode'] == 0
if options.debug:
pprint.pprint(last_chunk['data'])
if last_chunk['data'].endswith('\r\n[0]'.encode('utf-8')) and \
last_chunk['data'].rfind(pinentry_delimeter) != -1:
last_line = last_chunk['data'].splitlines()[-2]
else:
raise RuntimeError(
'gpg failure %s' % str(
last_chunk['data'][
max(last_chunk['data'].find(pinentry_delimeter), -128):
]
)
)
pos2 = last_line.rfind(pinentry_delimeter)
if pos2 == -1:
last_line2 = last_line
else:
last_line2 = last_line[
pos2 + len(pinentry_delimeter):
]
password = last_line2.decode('utf-8').rstrip('\r\n')
assert not password is None
clipboard_set(password)
get_time = lambda : datetime.datetime.now().timestamp()
start = get_time()
while True:
cur = get_time()
remains = 10 - (cur - start)
if remains <= 1e-8:
break
else:
print('\r%5.2fs remains' % remains, end='')
time.sleep(0.1)
clipboard_set('')
print('\rcleared cliboard\n', end='')
def player_v1(folder_url, item_id):
import sys
import urllib.parse
import re
import subprocess
import os
import tqdm
t4 = folder_url
t1 = subprocess.check_output(['curl', '-s', t4]).decode('utf-8')
t2 = re.compile(r"href=\"(.*\.mp3)\"");
t3 = [o.group(1) for o in t2.finditer(t1)];
t5 = ['%s/%s' % (t4, o) for o in t3]
t6 = item_id
t9 = range(t6, len(t5))
with tqdm.tqdm(
total=len(t5),
) as progress_bar:
progress_bar.update(t6)
for k in t9:
t7 = t5[k]
t9 = urllib.parse.unquote(os.path.split(t7)[1])
progress_bar.set_description('%03d %s' % (k, t9))
with subprocess.Popen(['ffprobe', '-hide_banner', '-i', t7], stderr=subprocess.PIPE, stdout=subprocess.PIPE) as p:
p.wait()
assert p.returncode == 0
t8 = p.stderr.read().decode('utf-8')
#print(t8)
with subprocess.Popen(['ffplay', '-hide_banner', '-nodisp', '-autoexit', '-loop', '1', t7], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) as p:
p.wait()
assert p.returncode == 0
progress_bar.update(1)
def numpy_linspace(a, b, count):
pos = a
step = (b - a) / count
steps = []
for i in range(count):
if i == 0:
pos = a
elif i == count - 1:
pos = b
else:
pos = a + i * step
steps.append(pos)
return steps
def pm_service(argv):
parser = optparse.OptionParser()
parser.add_option(
'--events',
dest='events',
default=[],
action='append',
help='pb,tp,kb',
)
parser.add_option(
'--verbose',
dest='verbose',
type=str,
default=None,
help='true,false',
)
options, args = parser.parse_args(argv)
if options.verbose is None:
options.verbose = False
else:
val = json.loads(options.verbose)
assert isinstance(val, bool)
options.verbose = val
if len(options.events) == 0:
options.events.extend([
'pb',
#'tp', 'kb'
])
assert all([
o in [
'pb','tp', 'kb'
]
for o in options.events
])
assert sys.platform == 'darwin'
wu = 0
while True:
subprocess.check_call([
'osascript',
'-e',
'tell application "Finder" to sleep'
])
subprocess.check_call(
['pmset','sleepnow'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
wu += 1
sample = (
r'2024-03-19 22:00:36.808589+0300 0x4caa7 Default 0x0 102'
r'0 powerd: [com.apple.powerd:assertions] Process WindowServer.156 TurnedOn '
r'UserIsActive "com.apple.iohideventsystem.queue.tickle serviceID:10000267f '
r'service:AppleMultitouchDevice product:Apple Internal Keyboard / Trackpad '
r'eventType:11" age:00:00:00 id:38654742889 [System: PrevIdle DeclUser kDisp]'
)
action = None
with subprocess.Popen(['log', 'stream'], stdout=subprocess.PIPE) as p:
for chunk in intercept_output(
p,
return_aggregated=False,
need_lines=True,
transform_callback=lambda x: b'',
):
line = chunk['line'].decode('utf-8')
#p.stdout.readline().decode('utf-8')
cmd = None
if 'powerd' in line:
cmd = line
if options.verbose:
logging.error(json.dumps(dict(line=cmd)))
#cmd = subprocess.check_output(r'''
# log stream | grep --line-buffered -i \
# -E 'powerd.*TurnedOn.*UserIsActive' | head -n 1
#''', shell=True).decode('utf-8')
if not cmd is None and (
'TurnedOn' in cmd or
'PrevIdle' in cmd or
'PMRD: kIOMessageSystemWillPowerOn' in cmd
):
if (
('AppleMultitouchDevice' in cmd and 'tp' in options.events) or
('AppleACPIButton' in cmd and 'pb' in options.events) or
('eventType:29' in cmd and 'kb' in options.events)
):
action = 'wake-up'
break
else:
action = 'sleep'
break
if options.verbose:
logging.error(json.dumps(dict(cmd=cmd, action=action,)))
else:
print('\r%s wu : %d, la : %s' % (
datetime.datetime.now().isoformat(),
wu,
action
), end='')
if action == 'wake-up':
break
elif action == 'sleep':
continue
else:
raise NotImplementedError
print('')
def scrap_yt_music(argv: Iterable[str]) -> None:
parser = optparse.OptionParser()
parser.add_option(
'--verbose',
dest='verbose',
type=str,
default=None,
help='true,false',
)
parser.add_option(
'-l',
'--library_path',
dest='library_path',
type=str,
default=None,
)
options, args = parser.parse_args(argv)
if options.library_path is None:
options.library_path = os.path.abspath(os.path.curdir)
if options.verbose is None:
options.verbose = False
else:
val = json.loads(options.verbose)
assert isinstance(val, bool)
options.verbose = val
import aiohttp.web
def http_events(context, res_cb):
data = []
async def handle(request):
data.append(request.rel_url.query.copy())
res_cb(event=data[-1], events=data)
if len(data) > 128:
del data[:128]
return aiohttp.web.Response(text='ok')
async def serve():
logging.info('http_events starting')
app = aiohttp.web.Application()
app.add_routes([aiohttp.web.get('/status', handle)])
runner = aiohttp.web.AppRunner(app, handle_signals=False,)
await runner.setup()
site = aiohttp.web.TCPSite(runner, host='127.0.0.1', port=8877)
await site.start()
logging.info('http_events started')
while True:
await asyncio.sleep(1)
if context['shutdown']:
break
await runner.cleanup()
logging.info('http_events done')
asyncio.run(serve())
#aiohttp.web.run_app(
# app,
# host='127.0.0.1',
# port=8877,
# handle_signals=False,
#)
#while True:
# data.append(
# subprocess.check_output(r'''
# nc -w 1 -l 127.0.0.1 8877 | head -n 1
# ''', shell=True,)
# )
def audio_recorder(context):
current_name = None
p = None
try:
while True:
with context['track_cv']:
context['track_cv'].wait(1)
if context['track_name'] != current_name:
logging.info('audio_record, track changed, started')
if not p is None:
logging.info('audio_record, track changed, terminating')
p.terminate()
p.wait()
p = None
logging.info('audio_record, track changed, terminated')
current_name = context['track_name']
if context['shutdown']:
if not p is None:
p.terminate()
break
if p is None and not current_name is None:
output_name = os.path.join(
options.library_path,
'%s.mp3' % current_name
)
logging.info('audio_record, new recording')
p = subprocess.Popen(
['sox', '-d', output_name],
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
logging.info(json.dumps(dict(output_name=output_name)))
except:
logging.error(traceback.format_exc())
finally:
if not p is None:
p.terminate()
context = dict(
http_on_event=lambda *args, **kwargs: None,
shutdown=False,
workers=[],
track_cv=threading.Condition(),
main_cv=threading.Condition(),
track_name=None,
)
context['workers'].extend([
threading.Thread(
target=functools.partial(
http_events,
context=context,
res_cb=lambda *args, **kwargs: context['http_on_event'](*args, **kwargs),
)
),
threading.Thread(
target=functools.partial(
audio_recorder,
context=context,
)
),
])
def http_on_event(event, events):
with context['track_cv']:
if 'title' in event and event['title'].strip() != '':
context['track_name'] = str(event['title'])[:128].replace('\n', '')
else:
context['track_name'] = None
logging.info(event)
context['http_on_event'] = http_on_event
print(r'''
https://github.com/ExistentialAudio/BlackHole/wiki/Multi-Output-Device#5-set-audio-output-to-multi-output-device
Open Youtube Music,
and launch the following JS script:
```js
(function(){
let timer = null;
let is_first = true;
let last_play = null;
function last_play_update() {last_play = (new Date());}
let should_stop = () => (((new Date()).valueOf() - last_play.valueOf()) > 5 * 1000);
timer = setInterval(() => {
let is_playing = () => $$('#play-pause-button')[0].title == 'Pause';
let title = () => encodeURIComponent(
$$('.ytmusic-player-bar.middle-controls')[0].innerText
);
let update_status = (query) => fetch('http://127.0.0.1:8877/status?' + query);
if (is_playing())
{
last_play_update();
is_first = false;
update_status('title=' + title());
}
else if (!is_first)
{
update_status('')
if (should_stop())
{
console.log('should stop');
clearInterval(timer);
}
}
}, 1000);
})();
```
''')
for w in context['workers']:
w.start()
#context['main_cv'] = threading.Condition()
def on_interrupt(*args, **kwargs):
logging.info('on_interrupt')
with context['main_cv']:
context['main_cv'].notify()
signal.signal(
signal.SIGINT,
on_interrupt,
)
signal.signal(
signal.SIGTERM,
on_interrupt,
)
with context['main_cv']:
context['main_cv'].wait()
with context['main_cv']:
context['shutdown'] = True
context['main_cv'].notify()
with context['track_cv']:
context['track_cv'].notify()
for o in context['workers']:
o.join()
def desktop_services(argv):
parser = optparse.OptionParser()
parser.add_option(
'--background_image',
dest='background_image',
default=None,
type=str,
)
parser.add_option(
'--cpufreq',
dest='cpufreq',
default=None,
type=int,
help='0 - mac book air (no turbo boost, max pct 30, every 4 seconds',
)
parser.add_option(
'--cpufreq-action',
dest='cpufreq_action',
default=None,
choices=[
'performance',
'powersave',
],
#type=str,
)
parser.add_option(
'--battery',
dest='battery',
default=None,
type=int,
help='0 - battery check with sleep <10%, every 10 seconds',
)
parser.add_option(
'--backlight-increase',
dest='backlight_increase',
default=False,
action='store_true',
help='increase keyboard backlight',
)
parser.add_option(
'--backlight-type',
dest='backlight_type',
default=[],
action='append',
help='backlight type, like keyboard, output',
)
parser.add_option(
'--backlight-decrease',
dest='backlight_decrease',
default=False,
action='store_true',
help='decrease keyboard backlight',
)
parser.add_option(
'--backlight_service',
dest='backlight_service',
action='store_true',
default=False,
help='enable backlight_service',
)
options, args = parser.parse_args(argv)
class VLC:
@classmethod
def vlc_is_playing_fullscreen(cls):
import subprocess
import json
import sys
import pprint
t2 = []
try:
t1 = subprocess.check_output(['swaymsg', '-t', 'get_tree']).decode('utf-8')
t2 = json.loads(t1)
except:
logging.error(traceback.format_exc())
def walk(o, cb):
if isinstance(o, dict):
cb(o)
for k, v in o.items():
walk(v, cb,)
elif isinstance(o, list):
cb(o)
for o2 in o:
walk(o2, cb,)
else:
cb(o)
t3 = []
walk(t2, lambda o: [
t3.append(o)
if isinstance(o, dict) and \
'fullscreen_mode' in o and \
o['fullscreen_mode'] == 1 and \
'window_properties' in o and \
'class' in o['window_properties'] and \
o['window_properties']['class'] == 'vlc'
else None
])
t4 = False
try:
t4 = subprocess.check_output([
'playerctl', '-p', 'vlc', 'status'
], timeout=1,).decode('utf-8').strip() == 'Playing'
except:
logging.error(traceback.format_exc())
#pprint.pprint(t3)
return len(t3) > 0 and t4
class Battery:
def __init__(self, should_start=None,):
if should_start is None:
should_start = False
assert isinstance(should_start, bool)
self.last_check = None
self.period = 10
self.is_running = should_start
def check_is_needed(self):
now = datetime.datetime.now(tz=datetime.timezone.utc)
is_needed = None
if self.last_check is None:
is_needed = True
else:
if ((now - self.last_check).total_seconds() >= self.period):
is_needed = True
else:
is_needed = False
if is_needed:
self.last_check = now
return is_needed
def run(self):
while True:
self.check()
time.sleep(self.period)
def terminate(self):
self.is_running = False
def wait(self, *args, **kwargs):
if self.is_running:
raise NotImplementedError
def poll(self):
if self.is_running:
return None
else:
return 0
def check(self):
try:
if not self.check_is_needed():
return
t1 = subprocess.check_output(
['upower', '-d'],
timeout=1,
).decode('utf-8')
t2 = [
o for o in t1.splitlines() if 'percentage' in o.lower()
]
t4 = [
o for o in t1.splitlines() if 'state' in o.lower()
]
t3 = float(t2[0].split(':')[1].strip()[:-1])
t5 = any(['discharging' in o.lower() for o in t4])
if t3 < 10 and t5:
logging.error(json.dumps(dict(
msg='too low', t3=t3, t5=t5
)))
subprocess.check_call(['systemctl', 'suspend'])
elif t3 < 15 and t5:
msg = 'battery near low'
logging.error(json.dumps(dict(
msg=msg, t3=t3, t5=t5
)))
subprocess.check_call([
'notify-send', '-t', '%d' % (5 * 1000), msg
])
else:
pass
print(
'\r%s % 5.2f%% %s' % (
datetime.datetime.now().isoformat(), t3, str(t5)
),
end=''
)
except:
logging.error(traceback.format_exc())
class Backlight:
class Direction(enum.Enum):
increase = 'increase'
decrease = 'decrease'
absolute = 'absolute'
get_state = 'get_state'
class Mode(enum.Enum):
light = 'light'
def __init__(self):
self.state = []
self.dpms = Backlight.dpms_get()
@classmethod
def dpms_get(cls):
try:
t1 = subprocess.check_output(
['swaymsg', '-r', '-t', 'get_outputs'],
timeout=1
)
t2 = t1.decode('utf-8')
t3 = json.loads(t2)
t4 = [
dict(
id=o['id'],
name=o['name'],
dpms=o['dpms'],
)
for o in t3
]
return any([o['dpms'] for o in t4])
except:
return True
def check(self):
try:
new_dpms = Backlight.dpms_get()
if new_dpms != self.dpms:
logging.info(json.dumps(dict(
module='backlight',
action='new_dpms',
dpms=self.dpms,
new_dpms=new_dpms,
)))
if new_dpms:
Backlight.enable(self.state)
else:
self.state = Backlight.change(
Backlight.Direction.get_state,
)
logging.info(json.dumps(dict(
state=pprint.pformat(
self.state,
width=1e+8,
compact=True,
),
action='disable',
)))
Backlight.disable()
self.dpms = new_dpms
except:
logging.error(traceback.format_exc())
@classmethod
def get_state(cls):
raise NotImplementedError
@classmethod
def set_state(cls):
raise NotImplementedError
@classmethod
def disable(cls):
return cls.change(
cls.Direction.absolute,
0,
)
@classmethod
def enable(cls, state,):
res = []
for device_state in state:
res.append(
cls.change(
direction=cls.Direction.absolute,
value=device_state['value'],
device_name=device_state['device_name'],
)
)
return res
@classmethod
def change(
cls,
direction,
value=None,
devices=None,
device_name=None,
types=None,
):
assert isinstance(direction, Backlight.Direction)
state = []
devices_all = dict(
smc_kbd=dict(
sysfs_path='sysfs/leds/smc::kbd_backlight',
),
intel_backlight=dict(
sysfs_path='sysfs/backlight/intel_backlight',
),
)
if devices is None:
devices = []
if not device_name is None:
devices.append(device_name)
if len(devices) == 0:
if types is None:
types = [
'keyboard',
'output',
]
for current_type in types:
if current_type == 'keyboard':
devices.extend([
'smc_kbd'
])
elif current_type == 'output':
devices.extend([
'intel_backlight',
])
else:
raise NotImplementedError
else:
assert types is None
devices2 = list(set(devices))
if sys.platform == 'linux':
assert all([
o in devices_all
for o in devices2
])
leds = \
[
o.strip()
for o in subprocess.check_output(
['light', '-L'],
timeout=1,
)\
.decode('utf-8')\
.splitlines()[1:]
]
for current_device_name in devices2:
device = devices_all[current_device_name]
sysfs_path = device['sysfs_path']
if not sysfs_path in leds:
raise NotImplementedError
extra_args = []
if value is None:
value = 20.0
value2 = max(float(value), 0.0)
assert isinstance(value2, float) and value >= -1e-8
if direction == cls.Direction.increase:
extra_args.extend(['-A', '%f' % value2])
elif direction == cls.Direction.decrease:
extra_args.extend(['-U', '%f' % value2])
elif direction == cls.Direction.absolute:
extra_args.extend(['-S', '%f' % value2])
elif direction == cls.Direction.get_state:
pass
else:
raise NotImplementedError
get_current = lambda : float(subprocess.check_output([
'light', '-G',
'-s', sysfs_path,
], timeout=1).decode('utf-8'))
if not (direction == cls.Direction.get_state):
old_value = get_current()
value_steps = None
if direction == cls.Direction.decrease:
value_steps = numpy_linspace(
old_value,
max(old_value - value2, 0),
10,
)
elif direction == cls.Direction.increase:
value_steps = numpy_linspace(
old_value,
min(old_value + value2, 100),
10,
)
elif direction == cls.Direction.absolute:
value_steps = numpy_linspace(
old_value,
min(
max(
0,
value2,
),
100
),
10,
)
else:
raise NotImplementedError
for current_value in value_steps:
subprocess.check_call(
[
'light', '-v', '3',
'-s', sysfs_path,
'-S', '%f' % current_value,
],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE
)
time.sleep(0.05)
state.append(
dict(
mode=cls.Mode.light,
device_path=sysfs_path,
device_name=current_device_name,
value=get_current(),
)
)
else:
raise NotImplementedError
return state
class Cpufreq:
@classmethod
@property
def profile(cls) -> Literal['applesmc.768']:
if os.path.exists('/sys/bus/platform/devices/applesmc.768'):
return 'applesmc.768'
else:
raise NotImplementedError
@classmethod
def powersave(cls):
if cls.profile == 'applesmc.768':
subprocess.check_call(r'''
echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor;
echo powersave | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor;
echo 1 > /sys/bus/platform/devices/applesmc.768/fan1_manual;
echo 2000 > /sys/bus/platform/devices/applesmc.768/fan1_output;
''', shell=True)
else:
raise NotImplementedError
@classmethod
def performance(cls):
if cls.profile == 'applesmc.768':
subprocess.check_call(r'''
echo powersave | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor;
echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor;
echo 1 > /sys/bus/platform/devices/applesmc.768/fan1_manual;
echo 6500 > /sys/bus/platform/devices/applesmc.768/fan1_output;
''', shell=True)
else:
raise NotImplementedError
if options.backlight_increase or options.backlight_decrease:
if options.backlight_increase:
direction = Backlight.Direction.increase
elif options.backlight_decrease:
direction = Backlight.Direction.decrease
else:
raise NotImplementedError
Backlight.change(
direction=direction,
types=options.backlight_type,
)
return
elif not options.cpufreq_action is None:
if options.cpufreq_action == 'performance':
Cpufreq.performance()
elif options.cpufreq_action == 'powersave':
Cpufreq.powersave()
else:
raise NotImplementedError
return
else:
pass
os.environ['SWAYSOCK'] = sway_sock()
assert all([
env_name in os.environ
for env_name in [
'GTK_IM_MODULE',
'XMODIFIERS',
'QT_IM_MODULE',
'I3SOCK',
'SWAYSOCK',
'WAYLAND_DISPLAY',
]
]) and os.environ['SWAYSOCK'] == sway_sock()
services = []
shutdown = False
def on_interrupt(*args, **kwargs):
logging.info('blah')
nonlocal shutdown
shutdown = True
signal.signal(
signal.SIGINT,
on_interrupt,
)
signal.signal(
signal.SIGTERM,
on_interrupt,
)
try:
if options.cpufreq == 0:
logging.info('launching cpufreq, need sudo')
subprocess.check_call(['sudo', 'whoami'])
services.append(
subprocess.Popen(
r'''
exec sudo sh -c 'echo cpufreq, user; whoami;
while [[ -a /proc/{pid} ]]; do
echo passive > /sys/devices/system/cpu/intel_pstate/status;
echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo;
echo 40 > /sys/devices/system/cpu/intel_pstate/max_perf_pct;
for cpu_path in /sys/devices/system/cpu/cpu?; do
echo 900000 > $cpu_path/cpufreq/scaling_max_freq;
echo schedutil > $cpu_path/cpufreq/scaling_governor;
done;
sleep 10;
done;'
'''.format(pid=os.getpid()),
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
)
class start_swayidle:
def __init__(self):
swaylock_cmd = [
'swaylock', '-f', '-d',
]
if not options.background_image is None:
swaylock_cmd.extend(
[
'-i',
'"%s"' % options.background_image,
]
)
self.commands = dict(
swaylock_cmd2=' '.join(swaylock_cmd),
timeout1='echo timeout1; swaymsg "output * dpms off";',
lock='echo lock; pkill --signal SIGUSR1 swayidle;',
unlock='echo unlock; pkill --signal SIGINT swaylock; swaymsg "output * dpms on";',
unlock2='pkill --signal SIGINT swaylock;',
resume='echo resume; swaymsg "output * dpms on";',
before_sleep='echo before_sleep; loginctl lock-session;',
after_resume='echo after_resume; pkill --signal SIGUSR1 swayidle;',
)
self.last_force_idle = None
self.commands.update(
timeout2='echo timeout2; {swaylock_cmd};'.format(
swaylock_cmd=self.commands['swaylock_cmd2']
)
)
self.swayidle = subprocess.Popen(
r'''
exec swayidle -d -w \
timeout 300 'echo t1; read;' \
resume 'echo t5; ' \
timeout 900 'echo t4; read;' \
resume 'echo t5; ' \
lock 'echo t2; read;' \
unlock 'echo t3;' \
before-sleep 'echo t6; read;' \
after-resume 'echo t7; read;' 2>&1
''',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
restore_signals=False,
preexec_fn=lambda : os.setpgrp(),
)
self.output = intercept_output(
self.swayidle,
real_time=True,
transform_callback=lambda x: [logging.info(x), b''][-1],
)
self.events = []
self.last_skip_loop = None
self.data = []
if options.backlight_service:
self.backlight = Backlight()
else:
self.backlight = None
self.bg = None
self.bg_terminate = False
def skip_loop_long_ago(self):
if self.last_skip_loop is None or (
datetime.datetime.now() - self.last_skip_loop
).total_seconds() >= 30:
self.last_skip_loop = datetime.datetime.now()
return True
else:
return False
def background_check(self):
if (
self.bg is None or \
not self.bg.poll() is None
) and not self.bg_terminate:
if not options.background_image is None:
self.bg = subprocess.Popen([
'swaybg',
'--output',
'*',
'--image',
options.background_image,
'--mode',
'fill',
])
def background_terminate(self, *args, **kwargs):
if not self.bg is None:
self.bg_terminate = True
self.bg.terminate(*args, **kwargs)
def poll(self):
return self.swayidle.poll()
def release_lock(self):
self.swayidle.stdin.write(b'\n')
self.swayidle.stdin.flush()
def force_idle(self):
if self.last_force_idle is None or (
datetime.datetime.now() - self.last_force_idle
).total_seconds() >= 10:
self.last_force_idle = datetime.datetime.now()
return True
else:
return False
def terminate(self, *args, **kwargs):
self.background_terminate()
return self.swayidle.terminate(*args, **kwargs)
def wait(self, *args, **kwargs):
return self.swayidle.wait(*args, **kwargs)
def kill(self):
return self.swayidle.kill()
def dpms(self, direction):
assert direction in ['on', 'off']
raise NotImplementedError
def check(self):
new_events = []
class event_t(enum.Enum):
idle_state = 'idle state'
active_state = 'active state'
while True:
if self.output is None:
break
chunk = next(self.output)
if chunk['aggregated']:
self.output = None
continue
if len(chunk['data']) == 0:
break
self.data.append(chunk)
if b'\n' in chunk['data']:
total = b''.join([
o['data']
for o in self.data
]).decode('utf-8')
sep_pos = total.rfind('\n')
lines = total[:sep_pos].splitlines()
self.data = [
dict(
data=total[sep_pos:].encode('utf-8'),
aggregated=False,
)
]
for line in lines:
if event_t.idle_state.value in line:
line = event_t.idle_state.value
elif event_t.active_state.value in line:
line = event_t.active_state.value
else:
pass
if line in [
't1', 't2', 't3', 't4',
't5', 't5', 't6', 't7',
event_t.idle_state.value,
event_t.active_state.value,
]:
new_events.append(line)
def retry(cb, cnt=None):
if cnt is None:
cnt = 10
i = 0
while True:
logging.info('retry i = %d, cnt = %d' % (i, cnt))
if not (
subprocess.call(['swaymsg', '-t', 'get_version']) == 0
):
continue
if cb() == 0:
break
time.sleep(0.5)
i += 1
if (
len(new_events) > 0 or \
len(self.events) > 0 and \
self.skip_loop_long_ago()
):
self.events.extend(new_events)
skip_loop = False
if (
all([
o in ['t1', 't4']
for o in self.events
]) and \
VLC.vlc_is_playing_fullscreen() and \
self.backlight.dpms
):
skip_loop = True
logging.info(
'skip loop, %s' % (
[
json.dumps(self.events),
self.backlight.dpms,
VLC.vlc_is_playing_fullscreen(),
self.events,
new_events,
],
)
)
elif (
len(new_events) == 0 and \
len(self.events) > 1 and \
all([
o in ['t1', 't4']
for o in self.events
])
):
self.events = ['t4']
elif len(self.events) > 1 and (
self.events == ['t1', 't4', 't5', 't5'] or \
self.events == ['t1', 't5', 't5'] or \
self.events == ['t1', 't5']
):
for o in new_events:
self.release_lock()
self.events = []
for o in self.events:
if skip_loop:
self.release_lock()
continue
if o == 't1':
#if self.force_idle():
# subprocess.check_call(self.commands['lock'], shell=True)
logging.info('started t1')
if self.force_idle():
subprocess.check_call(self.commands['timeout1'], shell=True)
logging.info('done t1')
self.release_lock()
elif o == 't2':
logging.info('started lock')
if self.force_idle():
custom_notify(
title='swayidle',
msg='loginctl lock started',
)
while True:
if not subprocess.call(
self.commands['lock'], shell=True
) == 0:
continue
if not subprocess.call(
self.commands['timeout2'], shell=True
) == 0:
#continue
pass
if not subprocess.call(
self.commands['timeout1'], shell=True
) == 0:
continue
break
logging.info('done lock')
self.release_lock()
elif o == 't3':
pass
elif o == 't4':
logging.info('started t4')
if self.force_idle():
subprocess.check_call(self.commands['lock'], shell=True)
subprocess.call(self.commands['timeout2'], shell=True)
subprocess.check_call(self.commands['timeout1'], shell=True)
logging.info('done t4')
self.release_lock()
elif o == 't5':
logging.info('started timeout resume')
if self.force_idle():
subprocess.check_call(self.commands['lock'], shell=True)
retry(
lambda: subprocess.call(self.commands['resume'], shell=True),
)
logging.info('done timeout resume')
elif o == 't6':
logging.info('started before-sleep')
if self.force_idle():
subprocess.call(self.commands['timeout2'], shell=True),
subprocess.check_call(self.commands['timeout1'], shell=True),
logging.info('done before-sleep')
self.release_lock()
elif o == 't7':
logging.info('started after-resume')
#if self.force_idle():
#subprocess.check_call(self.commands['lock'], shell=True)
while True:
if subprocess.call(
self.commands['resume'],
shell=True
) == 0:
break
else:
time.sleep(0.5)
logging.info('done after-resume')
self.release_lock()
elif o in [
event_t.idle_state.value,
event_t.active_state.value,
]:
logging.info(json.dumps(dict(o=o)))
else:
logging.error(json.dumps(dict(o=o)))
raise NotImplementedError
if not skip_loop:
pprint.pprint(self.events)
del self.events[:]
if not self.backlight is None:
self.backlight.check()
self.background_check()
services.extend([
subprocess.Popen(
['ibus-daemon'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
),
start_swayidle(),
])
if not options.battery is None:
assert options.battery in [0]
logging.info('launching battery')
services.append(
Battery(
should_start=True,
)
)
while True:
if shutdown:
logging.info('shutdown')
break
if all([not o.poll() is None for o in services]):
logging.info('done')
break
for o in services:
if hasattr(o, 'check'):
o.check()
time.sleep(0.1)
except:
logging.error(traceback.format_exc())
finally:
for o in services:
try:
o.terminate()
o.wait(timeout=10)
except:
logging.error(traceback.format_exc())
logging.error('killed %s' % str(o.__dict__))
o.kill()
def suspend_timer(argv):
import datetime;
import subprocess;
import time;
import sys;
if len(argv) == 0:
print("enter HH:MM");
t3 = input().strip()
else:
t3 = argv[0]
t2 = datetime.datetime.strptime(t3, "%H:%M").time()
while True:
t1 = datetime.datetime.now()
if ((t1.hour, t1.minute) >= (t2.hour, t2.minute)):
break
else:
t3 = [
(t2.hour - t1.hour),
t2.minute - t1.minute
]
if t3[1] < 0:
t3[1] += 60
t3[0] -= 1
print("\r%s, %02d:%02d" % (
t1,
*t3,
), end="")
time.sleep(1)
print("suspend computer at %s" % t1.isoformat())
subprocess.check_call(["systemctl", "suspend"]);
def gnome_shortcuts(argv: list[str]) -> None:
parser = optparse.OptionParser()
parser.add_option(
'-a', '--add',
action='store_true',
default=None,
)
parser.add_option(
'-l', '--list',
action='store_true',
default=None,
)
options, args = parser.parse_args(argv)
def commands_ids() -> list[str]:
bindings = subprocess.check_output([
'gsettings', 'get', 'org.gnome.settings-daemon.plugins.media-keys',
'custom-keybindings',
]).decode('utf-8').strip().replace('\'', '"',)
if bindings == '@as []':
t1 = []
else:
t1 = json.loads(bindings)
return t1
def add_command(name, command, binding):
command_id = len(commands_ids())
for cmd in [
(
'gsettings', 'set', 'org.gnome.settings-daemon.plugins.media-keys',
'custom-keybindings', '[%s]' % ','.join([
"'%s'" % \
(
'/org/gnome/settings-daemon/plugins/media-keys'
'/custom-keybindings/custom%d/'
) % o
for o in range(command_id + 1)
]),
),
(
'gsettings', 'set',
(
'org.gnome.settings-daemon.plugins.media-keys.custom-keybinding'
':/org/gnome/settings-daemon/plugins/media-keys/custom-keybindings/custom%d/'
) % command_id,
'name', name,
),
(
'gsettings', 'set',
(
'org.gnome.settings-daemon.plugins.media-keys.custom-keybinding'
':/org/gnome/settings-daemon/plugins/media-keys/custom-keybindings/custom%d/'
) % command_id,
'command', command,
),
(
'gsettings', 'set',
(
'org.gnome.settings-daemon.plugins.media-keys.custom-keybinding'
':/org/gnome/settings-daemon/plugins/media-keys/custom-keybindings/custom%d/'
) % command_id,
'binding', binding,
),
]:
subprocess.check_call(cmd)
if options.list:
t1 = commands_ids()
t2 = [
{
k : json.loads(subprocess.check_output([
'gsettings', 'get',
(
'org.gnome.settings-daemon.plugins.media-keys.custom-keybinding:%s'
) % o,
k,
]).decode('utf-8').replace('\'', '"',))
for k in ['name', 'binding', 'command']
}
for o in t1
]
pprint.pprint(t2)
elif options.add:
add_command(*args)
else:
raise NotImplementedError
def socat_ssh(argv):
parser = optparse.OptionParser()
parser.add_option(
'--local_port',
dest='local_port',
default=None,
type=int,
)
parser.add_option(
'--ssh_key',
dest='ssh_key',
default=None,
type=str,
)
parser.add_option(
'--socat_verbose',
dest='socat_verbose',
action='store_true',
default=False,
)
parser.add_option(
'--ssh_host',
dest='ssh_host',
default=None,
type=str,
)
parser.add_option(
'--target_port',
dest='target_port',
default=None,
type=int,
)
parser.add_option(
'--ssh_command',
dest='ssh_command',
default=None,
type=str,
)
parser.add_option(
'--gateway_command',
dest='gateway_command',
default=None,
type=str,
help=(
'a shell command that forwards ssh socket data '
'somewhere else, like '
'busybox nc 127.0.0.1 $(cat remote-ssh.port)'
),
)
options, args = parser.parse_args(argv)
if options.ssh_command is None:
ssh_command = ['ssh', '-T', '-C']
else:
ssh_command = options.ssh_command.split()
if not options.ssh_key is None:
subprocess.check_call(['ssh-add', options.ssh_key])
ssh_command.extend([
'-i', options.ssh_key,
])
if not options.ssh_host is None:
ssh_command.extend([options.ssh_host])
restart = False
def on_interrupt(*args, **kwargs):
nonlocal restart
restart = True
socat_command = ['socat']
if options.socat_verbose:
socat_command.extend(['-v'])
socat_command.extend([
'tcp-listen:%d,fork,bind=127.0.0.1' % (
options.local_port,
),
])
signal.signal(
signal.SIGINT,
on_interrupt,
)
signal.signal(
signal.SIGTERM,
on_interrupt,
)
gateway = None
p = None
while True:
if gateway is None:
gateway = tempfile.NamedTemporaryFile(suffix='.sh', mode='w')
gateway.write(
r'''
exec %s
''' % ' '.join(
ssh_command + [options.gateway_command]
)
)
gateway.flush()
if p is None:
p = subprocess.Popen(
socat_command + [
'EXEC:sh %s' % gateway.name,
]
)
time.sleep(1)
if restart:
try:
p.terminate()
p.wait(10)
except:
p.kill()
restart = False
if not p.poll() is None:
p = None
if not gateway is None:
os.path.unlink(gateway.name)
if not p is None:
p.terminate()
def share_wifi(argv):
parser = optparse.OptionParser()
parser.add_option(
'--to-wifi',
dest='to_wifi',
default=None,
type=str,
)
parser.add_option(
'--from-eth',
dest='from_eth',
default=None,
type=str,
)
parser.add_option(
'--channel',
dest='channel',
default=None,
type=int,
)
parser.add_option(
'--ap-name',
dest='ap_name',
default=None,
type=str,
)
parser.add_option(
'--restart-delay',
dest='restart_delay',
default=None,
type=int,
)
options, args = parser.parse_args(argv)
if options.restart_delay is None:
options.restart_delay = 2
assert not options.to_wifi is None
assert not options.from_eth is None
assert not options.ap_name is None
assert options.restart_delay >= 1
print('enter password:')
pw = subprocess.check_output(
'read -s PW; echo -n $PW',
shell=True
).decode('utf-8')
if len(pw) == 0:
p2 = subprocess.check_output(
'pwgen -syn 20 1',
shell=True
).decode('utf-8')
with subprocess.Popen(
['qrencode', '-t', 'UTF8'],
stdin=subprocess.PIPE
) as p:
p.stdin.write(pw.encode('utf-8'))
p.stdin.flush()
p.stdin.close()
try:
p.wait(5)
except Exception as exception:
p.kill()
raise exception
last_timestamp = datetime.datetime.now()
hostapd = None
restart = False
shutdown = False
def on_interrupt(sig, *args, **kwargs):
nonlocal restart
if sig == signal.SIGINT:
restart = True
elif sig == signal.SIGTERM:
shutdown = True
else:
raise NotImplementedError
signal.signal(
signal.SIGINT,
on_interrupt,
)
signal.signal(
signal.SIGTERM,
on_interrupt,
)
hostapd_args = [
'create_ap',
'--hostapd-timestamps',
options.to_wifi,
options.from_eth,
options.ap_name,
pw,
]
if not options.channel is None:
hostapd_args.extend(['-c', '%d' % options.channel])
while True:
try:
if hostapd is None:
print('\n%s, start new' % last_timestamp)
hostapd = subprocess.Popen(hostapd_args)
else:
if restart or shutdown:
print('\n%s, shutdown current' % last_timestamp)
os.kill(
hostapd.pid,
signal.SIGINT
)
try:
hostapd.wait(20)
except:
hostapd.terminate()
restart = False
if not hostapd.poll() is None:
hostapd = None
if shutdown:
break
if (
datetime.datetime.now() - last_timestamp
).total_seconds() > options.restart_delay:
restart = True
last_timestamp = datetime.datetime.now()
except:
print(traceback.format_exc())
restart = True
finally:
time.sleep(1)
def status(argv):
import inspect
import textwrap
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
class c1(optparse.IndentedHelpFormatter):
def format_option(self, *args, **kwargs):
f1 = lambda text, width: '\n'.join([
textwrap.fill('\t' + o, width, replace_whitespace=False)
for o in text.splitlines()
]).splitlines()
t1 = inspect.getsource(optparse.IndentedHelpFormatter.format_option)
t2 = '\n'.join([o[4:] for o in t1.splitlines()[:]]).replace(
'textwrap.wrap', 'f1',
).replace('format_option', 'f2')
exec(t2, dict(f1=f1), locals())
return locals()['f2'](self, *args, **kwargs)
parser = optparse.OptionParser(
formatter=c1(
width=None,
),
)
parser.add_option(
'--sh',
dest='sh',
default=[],
action='append',
type=str,
)
parser.add_option(
'--timeout',
dest='timeout',
default=None,
type=float,
)
parser.add_option(
'--config',
dest='config',
default=None,
type=str,
help=''.join([
'.json file with array of strings, each is a shell command ',
'that outputs a separate status text value, ',
'like\n',
r'''
ping -w 1 -i 0.02 <hostname> -c 3 | tail -n 2| head -n 1 | grep -Po $'time\\s+.*$'
sensors -j | jq -r '.\"coretemp-isa-0000\".\"Package id 0\".temp1_input|tostring + \" C\"'
printf '%d RPM' $(cat /sys/devices/platform/applesmc.768/fan1_input)
printf '% 3.0f%%' $(upower -d | grep -Po 'percentage:\\s+\\d+(\\.\\d+)?%' | grep -Po '\\d+(\\.\\d+)?' | head -n 1)
'''.strip()
])
)
options, args = parser.parse_args(argv)
if options.timeout is None:
options.timeout = 0.5
timeout2 = max(options.timeout, 0.0)
assert timeout2 >= 0.0 and timeout2 <= 4
config = dict()
try:
if not options.config is None:
with io.open(options.config, 'r') as f:
config.update(
json.load(f)
)
except:
logging.error(traceback.format_exc())
pass
options.sh.extend(
config.get('sh', [])
)
t1 = []
for sh_index, o in enumerate([
*options.sh,
*[
r'''
A=$(free -h | grep -P Mem: | grep -Po '[\w\.\d]+');
echo -n $A | awk '{print $2, $7}';
''',
r'''
date +'%Y-%m-%d %l:%M:%S %p';
''',
],
]):
try:
t1.append(
subprocess.check_output(
o,
shell=True,
timeout=timeout2,
).decode('utf-8').strip()
)
except:
t1.append('fail %d' % sh_index)
t3 = ' | '.join(t1).replace('\n\r', '')
sys.stdout.write(t3)
sys.stdout.flush()
def custom_translate(current_string, check, none_char=None,):
if none_char is None:
none_char = '.'
class A:
def __getitem__(self, k):
t1 = chr(k)
t2 = check(k, t1)
if isinstance(t2, bool):
if t2:
return t1
else:
return none_char
elif isinstance(t2, str):
return t2
return current_string.translate(
A()
)
def media_keys(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
'--command',
dest='command',
type=str,
default=None,
)
options, args = parser.parse_args(argv)
if options.command is None and len(args) > 0:
assert len(args) == 1
options.command = args[0]
assert options.command in [
'media-play-pause',
'media-next',
'media-prev',
'media-lower-volume',
'media-raise-volume',
'media-toggle-volume',
]
msg = None
mode = None
is_mocp = lambda : \
subprocess.call([
'pgrep',
'-u', os.environ['USER'],
'mocp',
], stdout=subprocess.PIPE) == 0
def mocp_info() -> str:
t1 = subprocess.check_output(['mocp', '-i'])
t3 = t1.decode('utf-8')
t2 = dict([
tuple(o.split(':')[:2])
for o in t3.splitlines()
])
return t2['Title'].strip()[:128]
if is_mocp():
mode = 'mocp'
else:
mode = 'playerctl'
if options.command == 'media-play-pause':
if mode == 'mocp':
subprocess.check_call(['mocp', '--toggle-pause'])
msg = mocp_info()
elif mode == 'playerctl':
subprocess.check_call(['playerctl', 'play-pause'])
msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-next':
if mode == 'mocp':
subprocess.check_call(['mocp', '--next'])
msg = mocp_info()
elif mode == 'playerctl':
subprocess.check_call(['playerctl', 'next'])
msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-prev':
if mode == 'mocp':
subprocess.check_call(['mocp', '--previous'])
msg = mocp_info()
elif mode == 'playerctl':
subprocess.check_call(['playerctl', 'previous'])
msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-lower-volume':
subprocess.check_call([
'pactl',
'set-sink-volume',
'@DEFAULT_SINK@',
'-5%'
])
msg = subprocess.check_output([
'pactl',
'get-sink-volume',
'@DEFAULT_SINK@'
]).decode('utf-8').strip()
elif options.command == 'media-toggle-volume':
subprocess.check_call([
'pactl',
'set-sink-mute',
'@DEFAULT_SINK@',
'toggle',
])
msg = subprocess.check_output([
'pactl',
'get-sink-volume',
'@DEFAULT_SINK@'
]).decode('utf-8').strip()
elif options.command == 'media-raise-volume':
subprocess.check_call([
'pactl',
'set-sink-volume',
'@DEFAULT_SINK@',
'+5%'
])
msg = subprocess.check_output([
'pactl',
'get-sink-volume',
'@DEFAULT_SINK@'
]).decode('utf-8').strip()
else:
raise NotImplementedError
logging.info(
json.dumps(
dict(
command=options.command,
msg=msg,
mode=mode,
),
ensure_ascii=False
)
)
return dict(
msg=msg,
)
def commands_cli() -> None:
logging.getLogger().setLevel(logging.INFO)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stderr)
logging.getLogger().addHandler(handler)
msg : Optional[str] = None
try:
if sys.argv[1].startswith('media'):
msg = media_keys(sys.argv[1:]).get('msg')
elif sys.argv[1] == 'status':
status(sys.argv[2:])
elif sys.argv[1] == 'http-server':
http_server(sys.argv[2:])
elif sys.argv[1] == 'pass-ssh-osx':
pass_ssh_osx(sys.argv[2:])
elif sys.argv[1] == 'wl-screenshot':
subprocess.check_call(r'''
grim -g "$(slurp)" - | wl-copy
''', shell=True)
elif sys.argv[1] == 'chrome':
chrome(sys.argv[2:])
elif sys.argv[1] == 'eternal-oom':
eternal_oom(sys.argv[2:])
elif sys.argv[1] == 'resilient-vlc':
resilient_vlc(sys.argv[2:])
elif sys.argv[1] == 'eternal-firefox':
eternal_firefox(
profile=sys.argv[2],
group_name=sys.argv[3],
window_position=json.loads(sys.argv[4]),
debug=json.loads(sys.argv[5]),
tabs=sys.argv[6:],
)
elif sys.argv[1] == 'resilient-ethernet':
resilient_ethernet(
ip_addr=sys.argv[2],
ethernet_device=sys.argv[3],
)
elif sys.argv[1] == 'player':
player_v1(
folder_url=sys.argv[2],
item_id=int(sys.argv[3]),
)
elif sys.argv[1] == 'share-wifi':
share_wifi(sys.argv[2:])
elif sys.argv[1] == 'socat-ssh':
socat_ssh(sys.argv[2:])
elif sys.argv[1] == 'gnome-shortcuts':
gnome_shortcuts(sys.argv[2:])
elif sys.argv[1] == 'sway_sock':
print(sway_sock())
elif sys.argv[1] == 'suspend-timer':
suspend_timer(sys.argv[2:])
elif sys.argv[1] == 'desktop-services':
desktop_services(sys.argv[2:])
elif sys.argv[1] == 'pm-service':
pm_service(sys.argv[2:])
elif sys.argv[1] == 'scrap-yt-music':
scrap_yt_music(sys.argv[2:])
else:
raise NotImplementedError
except SystemExit:
pass
except:
msg = 'not implemented\n%s' % traceback.format_exc()
logging.error(msg)
if not msg is None:
custom_notify(msg=msg)
if __name__ == '__main__':
commands_cli()