#!/usr/bin/python3 import traceback import time import sys import subprocess import logging msg = None def player_metadata(): for k in range(20): try: time.sleep(1.0) return subprocess.check_output(['playerctl', 'metadata']).decode('utf-8').strip() except: continue def eternal_oom(): import signal import os import re import time import io import pandas import numpy import subprocess import pprint self_pid = os.getpid() while True: with io.BytesIO(subprocess.check_output('ps -e -o pid,rss,user', shell=True)) as f: t1 = pandas.read_csv(f, sep='\s+', header=0) with io.BytesIO(subprocess.check_output('free', shell=True)) as f: t2 = pandas.read_csv(f, sep='\s+') t5 = subprocess.check_output('ps -e -o pid,args', shell=True).decode('utf-8').splitlines() t6 = pandas.DataFrame( [ re.compile(r'^\s*(\d+)\s(.*)$').search(o).groups() for o in t5[1:] ], columns=tuple(t5[0].split()) ).assign(PID=lambda x: x.PID.values.astype(numpy.int32)) t7 = pandas.merge(t1, t6, on='PID') t8 = t7.sort_values(by=['RSS'], ascending=False).assign(used=lambda x: (x.RSS / 1024).cumsum()) t11 = numpy.where( numpy.stack([ t8.PID.values != self_pid, t8.COMMAND.str.contains('freelancer').isin([False]) ], axis=0).prod(0) )[0] t9 = t8.iloc[t11] t4 = lambda : os.kill(t9.PID.iloc[0], signal.SIGKILL) t10 = lambda : t2.loc['Mem:', 'used'] > 3 * 1024 * 1024 if t10(): pprint.pprint(['Killing', t9.iloc[0], t2, t9]) t4() time.sleep(1) def resilient_vlc(stream=None): if stream is None: streams_path = os.path.join( os.environ['CACHE_PATH'], 'resilient-vlc-streams.json' ) if os.path.exists(streams_path): with io.open( streams_path, 'r' ) as f: stream = json.load(f) else: raise RuntimeError( 'not found, %s' % streams_path ) if isinstance(stream, str): stream = [stream] if len(stream) == 0: raise RuntimeError('no streams') import subprocess import time while True: print('new start') with subprocess.Popen([ 'cvlc', '--verbose', '2', *stream, ], stderr=subprocess.PIPE) as p: while p.returncode is None: t1 = p.stderr.readline().decode('utf-8') if len(t1) > 0: print(t1) if not all([ o in t1 for o in [ 'prefetch stream error', 'terror', 'main interface error', ] ]) and any([ o in t1 for o in [ 'pulse audio output debug: underflow' ] ]): print('shit') p.kill() while True: try: t2 = p.wait(timeout=1) print(t2) break except: print('shit') pass time.sleep(1.0) def eternal_firefox( tabs=None, profile=None, group_name=None, window_position=None, debug=None, ): import os import datetime import pprint import subprocess import time if debug is None: debug = False if tabs is None: raise RuntimeError('no tabs provided') if profile is None: raise RuntimeError('no profile provided') if group_name is None: raise RuntimeError('no group provided') if window_position is None: #window_position = '1,600,0,600,540' raise RuntimeError('no window-position provided') while True: os.system(r'''date''') with subprocess.Popen([ 'firefox', '-P', profile, *tabs, ]) as p: try: if debug: assert subprocess.check_call(['notify-send', '%s:Starting' % group_name]) == 0 #t3 = '' for k in range(300): t1 = subprocess.check_output(r''' swaymsg -t get_tree | jq -r '..|try select(.pid== %d)' ''' % p.pid, shell=True).decode('utf-8') if len(t1) > 10: break #time.sleep(0.1) #t1 = subprocess.check_output(['wmctrl', '-p', '-l']).decode('utf-8') #t4 = [o for o in t1.splitlines() if str(p.pid) in o] #if len(t4) == 1: # t3 = t4[0] # break #if t3 == '': # raise RuntimeError #t2 = t3.split()[0] #assert os.system('wmctrl -i -r %s -e %s' % (t2, window_position)) == 0 assert os.system(r''' swaymsg '[pid="{{PID}}"] move absolute position {{X}}px {{Y}}px' '''.replace('{{PID}}', str(p.pid)) \ .replace('{{X}}', str(window_position[1])) \ .replace('{{Y}}', str(window_position[2]))) == 0 #assert os.system('wmctrl -i -r %s -b add,below' % t2) == 0 if False: for tab in tabs[1:]: time.sleep(10) assert subprocess.check_call([ 'firefox', '-P', profile, '--new-tab', tab, ]) == 0 if debug: assert subprocess.check_call(['notify-send', '%s:Started' % group_name]) == 0 start = datetime.datetime.now() is_to_restart = lambda : (datetime.datetime.now() - start).total_seconds() >= 900 * 4 while not is_to_restart(): if not p.poll() is None: break time.sleep(10) if debug: assert subprocess.check_call(['notify-send', '%s:Closing' % group_name]) == 0 #assert os.system('wmctrl -i -c %s' % t2) == 0 assert os.system(r''' swaymsg '[pid="%d"] kill' ''' % (p.pid,)) == 0 except: import traceback import pprint pprint.pprint(traceback.format_exc()) finally: try: p.wait(20) except subprocess.TimeoutExpired: pprint.pprint([p.pid, '20 seconds timeout', 'kill']) p.kill() if debug: assert subprocess.check_call(['notify-send', '%s:Closed' % group_name]) == 0 def status(): return ' | '.join([ subprocess.check_output(o, shell=True).decode('utf-8').strip() for o in [ r''' free -h | \ grep -P Mem: | grep -Po '[\w\.\d]+' | tail -n +2 | head -n 3 | xargs echo -n; ''', r''' sensors | \ grep -Po '[\\\+\\\-\\\w][^\\\s]+C ' | head -n 5 | xargs echo -n ''', r''' ssh nartes@pizcool3070 free -h | \ grep -P Mem: | grep -Po '[\w\.\d]+' | tail -n +2 | head -n 3 | xargs echo -n; ''', r''' ssh nartes@pizcool3070 sensors | \ grep -Po '[\\\+\\\-\.0-9]+\s+C ' | head -n 1 ''', r''' date +'%Y-%m-%d %l:%M:%S %p'; ''', ] ]).replace('\n\r', '') try: if sys.argv[1] == 'media-play-pause': subprocess.check_call(['playerctl', 'play-pause']) msg = player_metadata() elif sys.argv[1] == 'media-next': subprocess.check_call(['playerctl', 'next']) msg = player_metadata() elif sys.argv[1] == 'media-prev': subprocess.check_call(['playerctl', 'previous']) msg = player_metadata() elif sys.argv[1] == 'media-lower-volume': subprocess.check_call([ 'pactl', 'set-sink-volume', '@DEFAULT_SINK@', '-5%' ]) msg = subprocess.check_output([ 'pactl', 'get-sink-volume', '@DEFAULT_SINK@' ]).decode('utf-8').strip() elif sys.argv[1] == 'media-raise-volume': subprocess.check_call([ 'pactl', 'set-sink-volume', '@DEFAULT_SINK@', '+5%' ]) msg = subprocess.check_output([ 'pactl', 'get-sink-volume', '@DEFAULT_SINK@' ]).decode('utf-8').strip() elif sys.argv[1] == 'status': sys.stdout.write(status()) sys.stdout.flush() elif sys.argv[1] == 'http-server': subprocess.check_call(r''' sudo docker run \ -p 80:80 \ -u root \ -it --entrypoint=/bin/bash \ -v $PWD:/app:ro \ nginx:latest \ -c 'echo "server{listen 80; root /app; location / {autoindex on;}}" > /etc/nginx/conf.d/default.conf; nginx -g "daemon off;"' ''', shell=True) elif sys.argv[1] == 'wl-screenshot': subprocess.check_call(r''' grim -g "$(slurp)" - | wl-copy ''', shell=True) elif sys.argv[1] == 'eternal-oom': eternal_oom() elif sys.argv[1] == 'resilient-vlc': resilient_vlc(sys.argv[2:]) elif sys.argv[1] == 'eternal-firefox': eternal_firefox( profile=sys.argv[2], group_name=sys.argv[3], window_position=json.loads(sys.argv[4]), debug=bool(sys.argv[5]), tabs=sys.argv[6:], ) else: raise NotImplementedError except: msg = 'not implemented\n%s' % traceback.format_exc() logging.error(msg) if not msg is None: subprocess.check_call([ 'notify-send', 'commands', msg[-128:] ])