#!/usr/bin/env python3 import functools import re import tempfile import signal import datetime import sys import select import subprocess import pprint import socket import optparse import os import json import traceback import time import sys import io import subprocess import logging def custom_notify( title=None, msg=None ): if title is None: title = 'commands' assert isinstance(title, str) and len(title) > 0 assert isinstance(msg, str) and len(msg) > 0 if sys.platform == 'darwin': osascript_translate = functools.partial( custom_translate, check=lambda a, b: not re.compile( r'^[a-zA-Z0-9\<\>\/\(\)\s\.\,\:]*$' )\ .match(b) is None, ) subprocess.check_call([ 'osascript', '-e', 'display notification "%s" with title "%s"' % ( osascript_translate(msg), osascript_translate(title), ) ]) else: subprocess.check_call([ 'notify-send', title, msg[-128:] ]) def intercept_output( current_subprocess, return_aggregated=None, transform_callback=None, real_time=None, timeout=None, ): if real_time is None: real_time = False start_timestamp = datetime.datetime.now() if not return_aggregated: return_aggregated = False t1 = select.poll() t1.register(current_subprocess.stdout, select.POLLIN) print([current_subprocess, current_subprocess.poll()]) output = [] last_data = None while not ( not current_subprocess.poll() is None and \ not last_data is None ): if not timeout is None and \ (datetime.datetime.now() - start_timestamp).total_seconds() > timeout: break t2 = t1.poll(100) if len(t2) == 1 and (t2[0][1] & select.POLLIN) > 0 and \ not (isinstance(last_data, bytes) and len(last_data) == 0): t3 = current_subprocess.stdout.peek() t4 = current_subprocess.stdout.read(len(t3)) last_data = t3 output.append(t3) yield dict( data=t3, aggregated=False, ) t6 = t3 if not transform_callback is None: t5 = transform_callback(t3) if not t5 is None: t6 = t5 os.write(sys.stdout.fileno(), t6) elif real_time: yield dict( data=b'', aggregated=False, ) if return_aggregated: yield dict( data=b''.join(output), aggregated=True, returncode=current_subprocess.poll(), ) def player_metadata(): for k in range(20): try: time.sleep(1.0) return subprocess.check_output(['playerctl', 'metadata']).decode('utf-8').strip() except: continue def memory_stats(): if sys.platform == 'linux': with io.BytesIO( subprocess.check_output( 'free', shell=True ) ) as f: t1 = f.read().decode('utf-8').splitlines() mem_total = int(t1[1].strip().split()[1]) mem_used = int(t1[1].strip().split()[2]) return dict( mem_total=mem_total, mem_used=mem_used, ) elif sys.platform == 'darwin': sysctl_value = lambda name, custom_cast=int: \ custom_cast( subprocess.check_output( 'sysctl -a | grep %s' % name, shell=True, ).decode('utf-8').split(':')[1] ) vm_pagesize = sysctl_value('vm.pagesize') mem_total = sysctl_value('hw.memsize') t1 = subprocess.check_output('vm_stat').decode('utf-8') t2 = [o.split(':') for o in t1.splitlines() if ':' in o] t3 = { o[0].replace(' ', '_').replace('-', '_').lower() \ : \ int(o[1].strip().rstrip('.')) for o in t2 if len(o) == 2 and len(o[0]) > 0 \ and not re.compile(r'^\s*\d+\.\s*$').match(o[1]) is None \ and not re.compile(r'^[a-zA-Z0-9\_\-\s]+$').match(o[0]) is None } mem_used = ( t3['pages_active'] + \ t3['pages_wired_down'] ) * vm_pagesize return dict( mem_total=mem_total / 1024, mem_used=mem_used / 1024, ) else: raise NotImplementedError def eternal_oom(argv): import signal import re import time import pprint assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) parser = optparse.OptionParser() parser.add_option( '--cpu_wait', dest='cpu_wait', default=None, type=float, ) parser.add_option( '--mean_size', dest='mean_size', default=None, type=int, ) parser.add_option( '--period', dest='period', default=None, type=float, ) parser.add_option( '--memory_limit', dest='memory_limit', default=None, type=float, ) parser.add_option( '--cpu_limit', dest='cpu_limit', default=None, type=float, ) parser.add_option( '--debug', dest='debug', action='store_true', default=False, ) options, args = parser.parse_args(argv) self_pid = os.getpid() if options.period is None: options.period = 1 if options.memory_limit is None: options.memory_limit = 3 * 1024 * 1024 if options.cpu_limit is None: options.cpu_limit = 0.6 * os.cpu_count() if options.cpu_wait is None: options.cpu_wait = 10 if options.mean_size is None: options.mean_size = 30 if isinstance(options.memory_limit, float): options.memory_limit = int(options.memory_limit) assert isinstance(options.memory_limit, int) \ and options.memory_limit < memory_stats()['mem_total'] * 0.8 \ and options.memory_limit > 512 * 1024 assert isinstance(options.cpu_limit, float) \ and options.cpu_limit > 0.2 * os.cpu_count() and \ options.cpu_limit < os.cpu_count() * 0.8 assert options.period >= 1 assert options.cpu_wait >= 10 assert options.mean_size >= 16 def pandas_data_frame(lines, groups_regex, header_regex, extra_columns): header = re.compile(header_regex).search(lines[0]).groups() rows = [ re.compile(groups_regex).search(row).groups() for row in lines[1:] ] columns = { column: [] for column in header } for row in rows: for value, column in zip(row, header): columns[column].append(value) for column, transformation in extra_columns.items(): columns[column] = [ transformation( { k : v[index] for k, v in columns.items() } ) for index in range(len(rows)) ] return columns def pandas_merge(left, right, on): index = {} input_data_frames = [ ('left', left), ('right', right), ] for index_name, data_frame in input_data_frames: current_index = {} for row_index, value in enumerate(data_frame[on]): if not value in current_index: current_index[value] = [] current_index[value].append(row_index) index[index_name] = current_index merged_data_frame = dict( header=[ column + '_x' for column in left ] + [ column + '_y' for column in right ], columns={}, ) for column in merged_data_frame['header']: merged_data_frame['columns'][column] = [] common_values = { left_value for left_value in index['left'] if left_value in index['right'] } common_rows = sorted( [ dict( left_row_index=index['left'][value][0], right_row_index=index['right'][value][0], ) for value in common_values ], key=lambda x: x['left_row_index'], ) for common_row in common_rows: row = sum([ [ values[ common_row['%s_row_index' % index_name] ] for column, values in data_frame.items() ] for index_name, data_frame in input_data_frames ], []) for column, value in zip(merged_data_frame['header'], row): merged_data_frame['columns'][column].append(value) return merged_data_frame['columns'] def pandas_sort_values(data_frame, by, ascending): assert len(by) == 1 assert ascending is False t1 = [ o['row_index'] for o in sorted( [ dict( row_index=row_index, value=value ) for row_index, value in enumerate(data_frame[by[0]]) ], key=lambda x: x['value'] )[::-1] ] return { column : [ values[row_index] for row_index in t1 ] for column, values in data_frame.items() } def pandas_filter_values(data_frame, condition): shape = [ len(data_frame), ] if shape[0] > 0: shape.append( len(list(data_frame.values())[0]) ) t1 = [ row_index for row_index in range(shape[1]) if condition( { column : values[row_index] for column, values in data_frame.items() } ) ] return { column : [ values[row_index] for row_index in t1 ] for column, values in data_frame.items() } def pandas_row(data_frame, row_index): return { column : values[row_index] for column, values in data_frame.items() } def pandas_shape(data_frame): columns_count = len(data_frame) if columns_count > 0: rows_count = len(data_frame[ next(iter(data_frame.keys())) ]) else: rows_count = 0 return [ columns_count, rows_count, ] def ps_regex(groups_cnt): assert groups_cnt >= 1 return ''.join([ r'^\s*', r'([^\s]+)\s+' * (groups_cnt - 1), r'([^\s]+)\s*$', ]) def oom_get_processes(): with io.BytesIO( subprocess.check_output( 'ps -e -o pid,rss,user,%cpu', shell=True ) ) as f: t1 = pandas_data_frame( f.read().decode('utf-8').splitlines(), ps_regex(4), ps_regex(4), dict( PID=lambda row: int(row['PID']), RSS=lambda row: int(row['RSS']), CPU=lambda row: float(row['%CPU']), ), ) del t1['%CPU'] assert set(t1.keys()) == set(['PID', 'RSS', 'USER', 'CPU']) t5 = subprocess.check_output( 'ps -e -o pid,args', shell=True ).decode('utf-8').splitlines() t6 = pandas_data_frame( t5, r'^\s*(\d+)\s(.*)$', r'^\s+(\w+)\s+(\w+)\s*$', dict( PID=lambda row: int(row['PID']) ), ) if not 'COMMAND' in t6: if sys.platform == 'darwin' and 'ARGS' in t6: t6['COMMAND'] = t6['ARGS'] del t6['ARGS'] else: raise NotImplementedError assert set(t6.keys()) == set(['PID', 'COMMAND']) t11 = pandas_merge(t1, t6, on='PID') t7 = pandas_filter_values( t11, lambda row: \ row['PID_x'] != self_pid and \ not 'freelancer' in row['COMMAND_y'] ) t8 = pandas_sort_values( t7, by=['RSS_x'], ascending=False ) t9 = pandas_sort_values( t7, by=['CPU_x'], ascending=False ) t10 = sum(t9['CPU_x'], 0.0) / 100 if options.debug: pprint.pprint([t9['CPU_x'][:10], t10 * 100]) return dict( by_mem=t8, by_cpu=t9, total_cpu=t10, ) def oom_display_rows(current_dataframe): print('\n'.join([ ( lambda row: \ '% 8d\t% 6.3f GiB\t% 10s\t%s' % ( row['PID_x'], row['RSS_x'] / 1024 / 1024, row['USER_x'], row['COMMAND_y'], ) )( pandas_row(current_dataframe, k) ) for k in range( 0, min( 5, pandas_shape(current_dataframe)[1], ) ) ])) def oom_kill(pid): assert isinstance(pid, int) try: logging.info('%s oom_kill, pid %d' % ( datetime.datetime.now().isoformat(), pid, )) os.kill(pid, signal.SIGKILL) except: logging.error(traceback.format_exc()) custom_notify( msg='oom_kill, failed to kill pid %d' % pid ) def oom_status(): print( '\r%s %6.2f / %.2f %%, %6.2f / %.2f GiB' % ( datetime.datetime.now().isoformat(), oom_mean_cpu() / os.cpu_count() * 100, options.cpu_limit / os.cpu_count() * 100, memory_stats()['mem_used'] / 1024 / 1024, options.memory_limit / 1024 / 1024, ), end='' ) def first_check(): current_memory_stats = memory_stats() t11 = oom_get_processes() t8 = t11['by_mem'] if current_memory_stats['mem_used'] > options.memory_limit: oom_display_rows(t8) if t11['total_cpu'] > options.cpu_limit: oom_display_rows(t11['by_cpu']) free_before_oom = ( options.memory_limit - current_memory_stats['mem_used'] ) print( 'available %5.2f %% out of %5.2f %% of cpu limit before OOC' % ( (options.cpu_limit - t11['total_cpu']) * 100 / os.cpu_count(), options.cpu_limit * 100 / os.cpu_count(), ) ) print( '%5.2f GiB [%5.2f %%] out of %5.2f GiB of free memory before OOM' % ( free_before_oom / 1024 / 1024, free_before_oom / options.memory_limit * 100, options.memory_limit / 1024 / 1024, ) ) del t8 del t11 print('press Enter to start monitoring: ...', end='') input() print('\nstarted...') first_check() last_total_cpu = [] last_cpu_high = None def oom_add_cpu(total_cpu): if options.debug: pprint.pprint([total_cpu, last_total_cpu]) last_total_cpu.append(total_cpu) if len(last_total_cpu) > options.mean_size: del last_total_cpu[-options.mean_size:] def oom_mean_cpu(): return sum(last_total_cpu) / (len(last_total_cpu) + 1e-8) def oom_cpu_high(): nonlocal last_cpu_high if oom_mean_cpu() > options.cpu_limit: if last_cpu_high is None: last_cpu_high = datetime.datetime.now().timestamp() if datetime.datetime.now().timestamp() - last_cpu_high > options.cpu_wait: last_cpu_high = None return True return False while True: mem_used = memory_stats()['mem_used'] t11 = oom_get_processes() oom_add_cpu(t11['total_cpu']) t8 = t11['by_mem'] t9 = t8 t4 = lambda : oom_kill(t9['PID_x'][0]) t10 = lambda : mem_used > options.memory_limit oom_status() if t10(): print('\n', end='') pprint.pprint([ 'Killing [OOM]', pandas_row(t9, 0), mem_used, ]) t4() if oom_cpu_high(): print('\n', end='') pprint.pprint([ 'Killing [CPU]', pandas_row(t11['by_cpu'], 0), [options.cpu_limit, oom_mean_cpu(), t11['total_cpu']], ]) oom_kill(t11['by_cpu']['PID_x'][0]) time.sleep(options.period) def resilient_vlc(stream=None): if stream is None: streams_path = os.path.join( os.environ['CACHE_PATH'], 'resilient-vlc-streams.json' ) if os.path.exists(streams_path): with io.open( streams_path, 'r' ) as f: stream = json.load(f) else: raise RuntimeError( 'not found, %s' % streams_path ) if isinstance(stream, str): stream = [stream] if len(stream) == 0: raise RuntimeError('no streams') import subprocess import time while True: print('new start') with subprocess.Popen([ 'cvlc', '--verbose', '2', *stream, ], stderr=subprocess.PIPE) as p: while p.returncode is None: t1 = p.stderr.readline().decode('utf-8') if len(t1) > 0: print(t1) if not all([ o in t1 for o in [ 'prefetch stream error', 'terror', 'main interface error', ] ]) and any([ o in t1 for o in [ 'pulse audio output debug: underflow' ] ]): print('shit') p.kill() while True: try: t2 = p.wait(timeout=1) print(t2) break except: print('shit') pass time.sleep(1.0) def sway_sock(): import glob uid = os.stat(os.environ['HOME']).st_uid t1 = glob.glob( os.path.join( '/run', 'user', '%d' % uid, 'sway-ipc.%d*.sock' % uid, ) ) t2 = [ os.stat(o).st_mtime for o in t1 ] t3 = sorted(enumerate(t1), key=lambda x: t2[x[0]])[-1][0] return t1[t3] def eternal_firefox( tabs=None, profile=None, group_name=None, window_position=None, debug=None, ): import os import datetime import pprint import subprocess import time if debug is None: debug = False if tabs is None: raise RuntimeError('no tabs provided') if profile is None: raise RuntimeError('no profile provided') if group_name is None: raise RuntimeError('no group provided') if window_position is None: #window_position = '1,600,0,600,540' raise RuntimeError('no window-position provided') while True: os.system(r'''date''') with subprocess.Popen([ 'firefox', '-P', profile, *tabs, ]) as p: try: if debug: assert subprocess.check_call(['notify-send', '%s:Starting' % group_name]) == 0 #t3 = '' for k in range(300): t1 = subprocess.check_output(r''' swaymsg -t get_tree | jq -r '..|try select(.pid== %d)' ''' % p.pid, shell=True).decode('utf-8') if len(t1) > 10: break #time.sleep(0.1) #t1 = subprocess.check_output(['wmctrl', '-p', '-l']).decode('utf-8') #t4 = [o for o in t1.splitlines() if str(p.pid) in o] #if len(t4) == 1: # t3 = t4[0] # break #if t3 == '': # raise RuntimeError #t2 = t3.split()[0] #assert os.system('wmctrl -i -r %s -e %s' % (t2, window_position)) == 0 #assert os.system('wmctrl -i -r %s -b add,below' % t2) == 0 def reposition(): t1 = lambda s: \ s \ .replace('{{PID}}', str(p.pid)) \ .replace('{{X}}', str(window_position[1])) \ .replace('{{Y}}', str(window_position[2])) \ .replace('{{W}}', str(window_position[3])) \ .replace('{{H}}', str(window_position[4])) \ .replace('{{WORKSPACE}}', str(window_position[0])) assert os.system(t1(r''' swaymsg '[pid="{{PID}}"] move window to workspace {{WORKSPACE}}' ''')) == 0 if window_position[1] != '' and window_position[2] != '': assert os.system(t1(r''' swaymsg '[pid="{{PID}}"] floating enable' \ swaymsg '[pid="{{PID}}"] resize set width {{W}}px height {{H}}px' && \ swaymsg '[pid="{{PID}}"] move absolute position {{X}}px {{Y}}px' ''')) == 0 else: assert os.system(t1(r''' swaymsg '[pid="{{PID}}"] floating disable' ''')) == 0 if False: for tab in tabs[1:]: time.sleep(10) assert subprocess.check_call([ 'firefox', '-P', profile, '--new-tab', tab, ]) == 0 reposition() if debug: assert subprocess.check_call(['notify-send', '%s:Started' % group_name]) == 0 start = datetime.datetime.now() is_to_restart = lambda : (datetime.datetime.now() - start).total_seconds() >= 900 * 4 polling_count = 0 while not is_to_restart(): if polling_count == 0: reposition() if not p.poll() is None: break time.sleep(10) polling_count += 1 if debug: assert subprocess.check_call(['notify-send', '%s:Closing' % group_name]) == 0 #assert os.system('wmctrl -i -c %s' % t2) == 0 assert os.system(r''' swaymsg '[pid="%d"] kill' ''' % (p.pid,)) == 0 except KeyboardInterrupt: assert os.system(r''' swaymsg '[pid="%d"] kill' ''' % (p.pid,)) == 0 break except: import traceback import pprint pprint.pprint(traceback.format_exc()) finally: try: p.wait(20) except subprocess.TimeoutExpired: pprint.pprint([p.pid, '20 seconds timeout', 'kill']) p.kill() if debug: assert subprocess.check_call(['notify-send', '%s:Closed' % group_name]) == 0 def resilient_ethernet(ip_addr, ethernet_device): subprocess.check_call( r''' sudo sh -c '\ while true; \ do ping -c 3 -w 3 -W 1 {{IP_ADDR}} || (\ ip link set {{ETHERNET_DEVICE}} down; \ ip link set {{ETHERNET_DEVICE}} up; \ sleep 4; true;\ ); \ sleep 10; clear; date; \ done' '''.replace( '{{IP_ADDR}}', ip_addr ).replace( '{{ETHERNET_DEVICE}}}', ethernet_device ), shell=True ) def http_server(argv): assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) parser = optparse.OptionParser() parser.add_option( '--public', dest='public', action='store_true', default=False, ) parser.add_option( '--port', dest='port', type='int', default=80, ) parser.add_option( '--host', dest='host', type='str', default='127.0.0.1', ) options, args = parser.parse_args(argv) assert options.port >= 1 try: assert not socket.inet_aton(options.host) is None subprocess.check_call([ 'ping', '-w', '1', options.host ]) except: raise RuntimeError('invalid ip address %s' % options.host) index_section = 'autoindex on;' if len(sys.argv) == 3 and sys.argv[2] == '--public': location_section = 'location / {%s}' % index_section else: token = os.urandom(16).hex() print( 'access url is http://%s:%d/%s/' % ( options.host, options.port, token, ) ) location_section = ( 'location / {' 'deny all;' '}' 'location /%s/ {' 'alias /app/;' '%s' '}' ) % (token, index_section) subprocess.check_call( r''' sudo docker run \ -p %s:%d:80 \ -u root \ -it --entrypoint=/bin/bash \ -v $PWD:/app:ro \ nginx:latest \ -c 'echo "server{listen 80; charset UTF-8; root /app; %s}" > /etc/nginx/conf.d/default.conf; nginx -g "daemon off;"' ''' % ( options.host, options.port, location_section, ), shell=True) def pass_ssh_osx(argv): assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) parser = optparse.OptionParser() parser.add_option( '--list', dest='list', default=False, action='store_true', ) parser.add_option( '--pass_option', dest='pass_option', action='append', default=[], type=str, ) parser.add_option( '--clipboard_copy', dest='clipboard_copy', default=None, type=str, ) parser.add_option( '--debug', dest='debug', action='store_true', default=False, ) assert sys.platform in ['darwin', 'linux'] options, args = parser.parse_args(argv) if options.clipboard_copy is None: if sys.platform == 'linux': options.clipboard_copy = 'wl-copy' elif sys.platform == 'darwin': options.clipboard_copy = 'pbcopy' else: raise NotImplementedError if len(args) == 0: raise RuntimeError('ssh_command is required') if options.debug: print(options.pass_option) pprint.pprint(args) reset_gpg_agent = r''' gpgconf --kill gpg-agent && \ gpgconf --reload gpg-agent ''' if not options.list: t1 = options.pass_option assert len(t1) > 0 print( 'select on of pass names\n%s' % '\n'.join([ '%d: %s' % (k, v) for k, v in enumerate(t1) ]) ) while True: try: t2 = input() t3 = int(t2) assert t3 >= 0 and t3 < len(t1) break except: continue command = r''' %s gpg \ --pinentry-mode=ask \ -q -u $(cat ~/.password-store/.gpg-id) \ --decrypt \ ~/.password-store/%s.gpg && \ echo -n '['$?']' && \ %s ''' % ( reset_gpg_agent, t1[t3], reset_gpg_agent, ) else: command = 'pass list | less -R' ssh_command = [ 'ssh', '-C', '-o', 'ConnectTimeout 10', '-o', 'ServerAliveInterval 1', *args, '-t', command, ] if options.debug: pprint.pprint( dict( ssh_command=ssh_command, ) ) if options.list: subprocess.check_call(ssh_command) else: def clipboard_set(text): with subprocess.Popen([ options.clipboard_copy, ], stdin=subprocess.PIPE) as p: p.stdin.write(text.encode('utf-8')) p.stdin.flush() p.stdin.close() p.wait(1) assert p.poll() == 0 with subprocess.Popen( ssh_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) as p: password = None last_chunk = None hide_password = False pinentry_delimeter = b'\x1b>' def transform_callback(data): nonlocal hide_password nonlocal pinentry_delimeter data2 = None if pinentry_delimeter in data: hide_password = True pos = data.rfind(pinentry_delimeter) if pos == -1: data2 = data else: data2 = data[:pos + len(pinentry_delimeter)] elif data == b'': #return b'\r\n' return b'' elif hide_password: data2 = b'' else: data2 = None return data2 for chunk in intercept_output( current_subprocess=p, return_aggregated=True, transform_callback=transform_callback, real_time=True, #timeout=10, ): if chunk['aggregated']: last_chunk = chunk break assert not last_chunk is None assert last_chunk['returncode'] == 0 if options.debug: pprint.pprint(last_chunk['data']) if last_chunk['data'].endswith('\r\n[0]'.encode('utf-8')) and \ last_chunk['data'].rfind(pinentry_delimeter) != -1: last_line = last_chunk['data'].splitlines()[-2] else: raise RuntimeError( 'gpg failure %s' % str( last_chunk['data'][ max(last_chunk['data'].find(pinentry_delimeter), -128): ] ) ) pos2 = last_line.rfind(pinentry_delimeter) if pos2 == -1: last_line2 = last_line else: last_line2 = last_line[ pos2 + len(pinentry_delimeter): ] password = last_line2.decode('utf-8').rstrip('\r\n') assert not password is None clipboard_set(password) get_time = lambda : datetime.datetime.now().timestamp() start = get_time() while True: cur = get_time() remains = 10 - (cur - start) if remains <= 1e-8: break else: print('\r%5.2fs remains' % remains, end='') time.sleep(0.1) clipboard_set('') print('\rcleared cliboard\n', end='') def player_v1(folder_url, item_id): import sys import urllib.parse import re import subprocess import os import tqdm t4 = folder_url t1 = subprocess.check_output(['curl', '-s', t4]).decode('utf-8') t2 = re.compile(r"href=\"(.*\.mp3)\""); t3 = [o.group(1) for o in t2.finditer(t1)]; t5 = ['%s/%s' % (t4, o) for o in t3] t6 = item_id t9 = range(t6, len(t5)) with tqdm.tqdm( total=len(t5), ) as progress_bar: progress_bar.update(t6) for k in t9: t7 = t5[k] t9 = urllib.parse.unquote(os.path.split(t7)[1]) progress_bar.set_description('%03d %s' % (k, t9)) with subprocess.Popen(['ffprobe', '-hide_banner', '-i', t7], stderr=subprocess.PIPE, stdout=subprocess.PIPE) as p: p.wait() assert p.returncode == 0 t8 = p.stderr.read().decode('utf-8') #print(t8) with subprocess.Popen(['ffplay', '-hide_banner', '-nodisp', '-autoexit', '-loop', '1', t7], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) as p: p.wait() assert p.returncode == 0 progress_bar.update(1) def desktop_services(argv): parser = optparse.OptionParser() parser.add_option( '--background_image', dest='background_image', default=None, type=str, ) parser.add_option( '--cpufreq', dest='cpufreq', default=None, type=int, help='0 - mac book air (no turbo boost, max pct 30, every 4 seconds', ) options, args = parser.parse_args(argv) os.environ['SWAYSOCK'] = sway_sock() assert all([ env_name in os.environ for env_name in [ 'GTK_IM_MODULE', 'XMODIFIERS', 'QT_IM_MODULE', 'I3SOCK', 'SWAYSOCK', 'WAYLAND_DISPLAY', ] ]) and os.environ['SWAYSOCK'] == sway_sock() services = [] try: if options.cpufreq == 0: print('launching cpufreq, need sudo') subprocess.check_call(['sudo', 'whoami']) services.append( subprocess.Popen(r''' exec sudo sh -c 'echo cpufreq, user; whoami; while true; do echo passive > /sys/devices/system/cpu/intel_pstate/status; echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo; echo 40 > /sys/devices/system/cpu/intel_pstate/max_perf_pct; for cpu_path in /sys/devices/system/cpu/cpu?; do echo 900000 > $cpu_path/cpufreq/scaling_max_freq; echo schedutil > $cpu_path/cpufreq/scaling_governor; done; sleep 10; done;' ''', shell=True) ) class start_swayidle: def __init__(self): swaylock_cmd = [ 'swaylock', '-f', '-d', ] if not options.background_image is None: swaylock_cmd.extend( [ '-i', '"%s"' % options.background_image, ] ) subprocess.check_call([ 'swaymsg', '--', 'output', '*', 'bg', options.background_image, 'fill', ]) self.commands = dict( swaylock_cmd2=' '.join(swaylock_cmd), timeout1='echo timeout1; swaymsg "output * dpms off";', lock='echo lock; pkill --signal SIGUSR1 swayidle;', unlock='echo unlock; pkill --signal SIGINT swaylock; swaymsg "output * dpms on";', unlock2='pkill --signal SIGINT swaylock;', resume='echo resume; swaymsg "output * dpms on";', before_sleep='echo before_sleep; loginctl lock-session;', after_resume='echo after_resume; pkill --signal SIGUSR1 swayidle;', ) self.last_force_idle = None self.commands.update( timeout2='echo timeout2; {swaylock_cmd};'.format( swaylock_cmd=self.commands['swaylock_cmd2'] ) ) self.swayidle = subprocess.Popen( r''' exec swayidle -d -w \ timeout 300 'echo t1; read;' \ resume 'echo t5; ' \ timeout 900 'echo t4; read;' \ resume 'echo t5; ' \ lock 'echo t2; read;' \ unlock 'echo t3;' \ before-sleep 'echo t6; read;' \ after-resume 'echo t7; read;' ''', shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) self.output = intercept_output( self.swayidle, real_time=True, ) self.events = [] self.data = [] def poll(self): return self.swayidle.poll() def release_lock(self): self.swayidle.stdin.write(b'\n') self.swayidle.stdin.flush() def force_idle(self): if self.last_force_idle is None or ( datetime.datetime.now() - self.last_force_idle ).total_seconds() >= 10: self.last_force_idle = datetime.datetime.now() return True else: return False def terminate(self, *args, **kwargs): return self.swayidle.terminate(*args, **kwargs) def kill(self): return self.swayidle.kill() def check(self): while True: if self.output is None: break chunk = next(self.output) if chunk['aggregated']: self.output = None continue if len(chunk['data']) == 0: break self.data.append(chunk) if b'\n' in chunk['data']: total = b''.join([ o['data'] for o in self.data ]).decode('utf-8') sep_pos = total.rfind('\n') lines = total[:sep_pos].splitlines() self.data = [ dict( data=total[sep_pos:].encode('utf-8'), aggregated=False, ) ] self.events.extend([ line for line in lines if line in [ 't1', 't2', 't3', 't4', 't5', 't5', 't6', 't7', ] or ( 'idle state' in line or \ 'active state' in line ) ]) if len(self.events) > 0: for o in self.events: if o == 't1': #if self.force_idle(): # subprocess.check_call(self.commands['lock'], shell=True) print('started t1') if self.force_idle(): subprocess.check_call(self.commands['timeout1'], shell=True) print('done t1') self.release_lock() elif o == 't2': print('started lock') if self.force_idle(): subprocess.check_call(self.commands['lock'], shell=True) subprocess.call(self.commands['timeout2'], shell=True) subprocess.check_call(self.commands['timeout1'], shell=True) print('done lock') self.release_lock() elif o == 't3': pass elif o == 't4': print('started t4') if self.force_idle(): subprocess.check_call(self.commands['lock'], shell=True) subprocess.call(self.commands['timeout2'], shell=True) subprocess.check_call(self.commands['timeout1'], shell=True) print('done t4') self.release_lock() elif o == 't5': if self.force_idle(): subprocess.check_call(self.commands['lock'], shell=True) subprocess.check_call(self.commands['resume'], shell=True), elif o == 't6': print('started before-sleep') if self.force_idle(): subprocess.call(self.commands['timeout2'], shell=True), subprocess.check_call(self.commands['timeout1'], shell=True), print('done before-sleep') self.release_lock() elif o == 't7': print('started resume') if self.force_idle(): subprocess.check_call(self.commands['lock'], shell=True) subprocess.check_call(self.commands['resume'], shell=True), print('done resume') self.release_lock() else: raise NotImplementedError pprint.pprint(self.events) del self.events[:] services.extend([ subprocess.Popen(['ibus-daemon']), start_swayidle(), ]) while True: if all([not o.poll() is None for o in services]): print('done') break for o in services: if hasattr(o, 'check'): o.check() time.sleep(0.1) except: logging.error(traceback.format_exc()) finally: for o in services: try: o.terminate(timeout=10) except: logging.error('killed %s' % str(o.__dict__)) o.kill() def suspend_timer(argv): import datetime; import subprocess; import time; import sys; if len(argv) == 0: print("enter HH:MM"); t3 = input().strip() else: t3 = argv[0] t2 = datetime.datetime.strptime(t3, "%H:%M").time() while True: t1 = datetime.datetime.now() if ((t1.hour, t1.minute) >= (t2.hour, t2.minute)): break else: t3 = [ (t2.hour - t1.hour), t2.minute - t1.minute ] if t3[1] < 0: t3[1] += 60 t3[0] -= 1 print("\r%s, %02d:%02d" % ( t1, *t3, ), end="") time.sleep(1) print("suspend computer at %s" % t1.isoformat()) subprocess.check_call(["systemctl", "suspend"]); def socat_ssh(argv): parser = optparse.OptionParser() parser.add_option( '--local_port', dest='local_port', default=None, type=int, ) parser.add_option( '--ssh_key', dest='ssh_key', default=None, type=str, ) parser.add_option( '--socat_verbose', dest='socat_verbose', action='store_true', default=False, ) parser.add_option( '--ssh_host', dest='ssh_host', default=None, type=str, ) parser.add_option( '--target_port', dest='target_port', default=None, type=int, ) parser.add_option( '--gateway_command', dest='gateway_command', default=None, type=str, help=( 'a shell command that forwards ssh socket data ' 'somewhere else, like ' 'busybox nc 127.0.0.1 $(cat remote-ssh.port)' ), ) options, args = parser.parse_args(argv) ssh_command = ['ssh', '-T', '-C'] if not options.ssh_key is None: subprocess.check_call(['ssh-add', options.ssh_key]) ssh_command.extend([ '-i', options.ssh_key, ]) if not options.ssh_host is None: ssh_command.extend([options.ssh_host]) restart = False def on_interrupt(*args, **kwargs): nonlocal restart restart = True socat_command = ['socat'] if options.socat_verbose: socat_command.extend(['-v']) socat_command.extend([ 'tcp-listen:%d,fork,bind=127.0.0.1' % ( options.local_port, ), ]) signal.signal( signal.SIGINT, on_interrupt, ) signal.signal( signal.SIGTERM, on_interrupt, ) gateway = None p = None while True: if gateway is None: gateway = tempfile.NamedTemporaryFile(suffix='.sh', mode='w') gateway.write( r''' exec %s ''' % ' '.join( ssh_command + [options.gateway_command] ) ) gateway.flush() if p is None: p = subprocess.Popen( socat_command + [ 'EXEC:sh %s' % gateway.name, ] ) time.sleep(1) if restart: try: p.terminate() p.wait(10) except: p.kill() restart = False if not p.poll() is None: p = None if not gateway is None: os.path.unlink(gateway.name) if not p is None: p.terminate() def share_wifi(argv): parser = optparse.OptionParser() parser.add_option( '--to-wifi', dest='to_wifi', default=None, type=str, ) parser.add_option( '--from-eth', dest='from_eth', default=None, type=str, ) parser.add_option( '--ap-name', dest='ap_name', default=None, type=str, ) parser.add_option( '--restart-delay', dest='restart_delay', default=None, type=int, ) options, args = parser.parse_args(argv) if options.restart_delay is None: options.restart_delay = 2 assert not options.to_wifi is None assert not options.from_eth is None assert not options.ap_name is None assert options.restart_delay >= 1 print('enter password:') pw = subprocess.check_output( 'read -s PW; echo -n $PW', shell=True ).decode('utf-8') if len(pw) == 0: p2 = subprocess.check_output( 'pwgen -syn 20 1', shell=True ).decode('utf-8') with subprocess.Popen( ['qrencode', '-t', 'UTF8'], stdin=subprocess.PIPE ) as p: p.stdin.write(pw.encode('utf-8')) p.stdin.flush() p.stdin.close() try: p.wait(5) except Exception as exception: p.kill() raise exception last_timestamp = datetime.datetime.now() hostapd = None restart = False def on_interrupt(*args, **kwargs): nonlocal restart restart = True signal.signal( signal.SIGINT, on_interrupt, ) signal.signal( signal.SIGTERM, on_interrupt, ) while True: try: if hostapd is None: print('\n%s, start new' % last_timestamp) hostapd = subprocess.Popen([ 'create_ap', '--hostapd-timestamps', options.to_wifi, options.from_eth, options.ap_name, pw, ]) else: if restart: print('\n%s, shutdown current' % last_timestamp) os.kill( hostapd.pid, signal.SIGINT ) try: hostapd.wait(20) except: hostapd.terminate() restart = False if not hostapd.poll() is None: hostapd = None if ( datetime.datetime.now() - last_timestamp ).total_seconds() > options.restart_delay: restart = True last_timestamp = datetime.datetime.now() except: print(traceback.format_exc()) restart = True finally: time.sleep(1) def status(argv): import inspect import textwrap assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) class c1(optparse.IndentedHelpFormatter): def format_option(self, *args, **kwargs): f1 = lambda text, width: '\n'.join([ textwrap.fill('\t' + o, width, replace_whitespace=False) for o in text.splitlines() ]).splitlines() t1 = inspect.getsource(optparse.IndentedHelpFormatter.format_option) t2 = '\n'.join([o[4:] for o in t1.splitlines()[:]]).replace( 'textwrap.wrap', 'f1', ).replace('format_option', 'f2') exec(t2, dict(f1=f1), locals()) return locals()['f2'](self, *args, **kwargs) parser = optparse.OptionParser( formatter=c1( width=None, ), ) parser.add_option( '--sh', dest='sh', default=[], action='append', type=str, ) parser.add_option( '--config', dest='config', default=None, type=str, help=''.join([ '.json file with array of strings, each is a shell command ', 'that outputs a separate status text value, ', 'like\n', r''' sensors -j | jq -r '.\"coretemp-isa-0000\".\"Package id 0\".temp1_input|tostring + \" C\"' printf '%d RPM' $(cat /sys/devices/platform/applesmc.768/fan1_input) printf '% 3.0f%%' $(upower -d | grep -Po 'percentage:\\s+\\d+(\\.\\d+)?%' | grep -Po '\\d+(\\.\\d+)?' | head -n 1) '''.strip() ]) ) options, args = parser.parse_args(argv) config = dict() try: with io.open(options.config, 'r') as f: config.update( json.load(f) ) except: logging.error(traceback.format_exc()) pass options.sh.extend( config.get('sh', []) ) t1 = [ subprocess.check_output(o, shell=True).decode('utf-8').strip() for o in [ r''' free -h | \ grep -P Mem: | grep -Po '[\w\.\d]+' | tail -n +2 | head -n 3 | xargs echo -n; ''', r''' date +'%Y-%m-%d %l:%M:%S %p'; ''', ] ] t2 = [ subprocess.check_output(o, shell=True).decode('utf-8').strip() for o in options.sh ] t3 = ' | '.join(t2 + t1).replace('\n\r', '') sys.stdout.write(t3) sys.stdout.flush() def custom_translate(current_string, check, none_char=None,): if none_char is None: none_char = '.' class A: def __getitem__(self, k): t1 = chr(k) t2 = check(k, t1) if isinstance(t2, bool): if t2: return t1 else: return none_char elif isinstance(t2, str): return t2 return current_string.translate( A() ) def commands_cli(): logging.getLogger().setLevel(logging.INFO) msg = None try: if sys.argv[1] == 'media-play-pause': subprocess.check_call(['playerctl', 'play-pause']) msg = player_metadata() elif sys.argv[1] == 'media-next': subprocess.check_call(['playerctl', 'next']) msg = player_metadata() elif sys.argv[1] == 'media-prev': subprocess.check_call(['playerctl', 'previous']) msg = player_metadata() elif sys.argv[1] == 'media-lower-volume': subprocess.check_call([ 'pactl', 'set-sink-volume', '@DEFAULT_SINK@', '-5%' ]) msg = subprocess.check_output([ 'pactl', 'get-sink-volume', '@DEFAULT_SINK@' ]).decode('utf-8').strip() elif sys.argv[1] == 'media-raise-volume': subprocess.check_call([ 'pactl', 'set-sink-volume', '@DEFAULT_SINK@', '+5%' ]) msg = subprocess.check_output([ 'pactl', 'get-sink-volume', '@DEFAULT_SINK@' ]).decode('utf-8').strip() elif sys.argv[1] == 'status': status(sys.argv[2:]) elif sys.argv[1] == 'http-server': http_server(sys.argv[2:]) elif sys.argv[1] == 'pass-ssh-osx': pass_ssh_osx(sys.argv[2:]) elif sys.argv[1] == 'wl-screenshot': subprocess.check_call(r''' grim -g "$(slurp)" - | wl-copy ''', shell=True) elif sys.argv[1] == 'eternal-oom': eternal_oom(sys.argv[2:]) elif sys.argv[1] == 'resilient-vlc': resilient_vlc(sys.argv[2:]) elif sys.argv[1] == 'eternal-firefox': eternal_firefox( profile=sys.argv[2], group_name=sys.argv[3], window_position=json.loads(sys.argv[4]), debug=json.loads(sys.argv[5]), tabs=sys.argv[6:], ) elif sys.argv[1] == 'resilient-ethernet': resilient_ethernet( ip_addr=sys.argv[2], ethernet_device=sys.argv[3], ) elif sys.argv[1] == 'player': player_v1( folder_url=sys.argv[2], item_id=int(sys.argv[3]), ) elif sys.argv[1] == 'share-wifi': share_wifi(sys.argv[2:]) elif sys.argv[1] == 'socat-ssh': socat_ssh(sys.argv[2:]) elif sys.argv[1] == 'suspend-timer': suspend_timer(sys.argv[2:]) elif sys.argv[1] == 'desktop-services': desktop_services(sys.argv[2:]) else: raise NotImplementedError except SystemExit: pass except: msg = 'not implemented\n%s' % traceback.format_exc() logging.error(msg) if not msg is None: custom_notify(msg=msg) if __name__ == '__main__': commands_cli()