#!/usr/bin/env python3 import psutil import pathlib import logging import logging.handlers import os import signal import subprocess import threading import time import argparse import re import sys from prompt_toolkit.application import Application from prompt_toolkit.key_binding import KeyBindings, ConditionalKeyBindings from prompt_toolkit.layout import Layout, HSplit, FloatContainer, Float from prompt_toolkit.filters import Condition from prompt_toolkit.layout.containers import Window from prompt_toolkit.layout.controls import FormattedTextControl from prompt_toolkit.widgets import TextArea, Frame, Dialog, Button, Label from prompt_toolkit.styles import Style from typing import ( TypedDict, Any, Optional, ) from collections import OrderedDict logger = logging.getLogger(__name__) __version__ = '0.6.1' __created__ = '2025-11-21' # — Helper for cgroup / slice matching — def get_cgroup_path(pid: int) -> Optional[str]: try: with open(f'/proc/{pid}/cgroup', 'r') as f: for line in f: parts = line.strip().split(':', 2) if len(parts) == 3: return parts[2] except Exception: logger.exception('') return None return None def slice_matches(cpath, target_slice): if not cpath or not target_slice: return False comps = cpath.strip('/').split('/') tgt = target_slice.lower() for comp in comps: name = comp.lower() if name.endswith('.slice'): name = name[:-6] if tgt == name: return True return False # — Memory‑management logic — class get_firefox_procs_ps_t: class res_t: class entry_t(TypedDict): rss: int pid: int ppid: int cgroup: str cmd: str def get_firefox_procs_ps( slice_name=None, ) -> list[get_firefox_procs_ps_t.res_t.entry_t]: entries: dict[int, dict[str, Any]] = dict() for regex, columns in [ ( re.compile(r'^\s*(\d+)\s+(\d+)\s+(\d+)\s+(.*)$'), OrderedDict( pid=lambda x: int(x[1]), rss=lambda x: int(x[2]) * 1024, ppid=lambda x: int(x[3]), cmd=lambda x: x[4], ), ), ( re.compile(r'^\s*(\d+)\s+(.*)$'), OrderedDict( pid=lambda x: int(x[1]), cgroup=lambda x: x[2], ), ), ]: lines = ( subprocess.check_output( [ 'ps', '-ax', '-o', ','.join(columns.keys()), ] ) .decode('utf-8') .splitlines()[1:] ) for line in lines: r = re.compile(regex) # print([r, line]) match = r.match(line) assert match entry = {k: v(match) for k, v in columns.items()} if not entry['pid'] in entries: entries[entry['pid']] = dict() entries[entry['pid']].update(entry) filtered_entries: list[dict[str, Any]] = [] for entry in entries.values(): if not 'cgroup' in entry or not 'rss' in entry: continue if not slice_name is None: if not slice_name in entry['cgroup']: continue filtered_entries.append(entry) return filtered_entries def get_firefox_procs(slice_name=None): procs = [] for p in psutil.process_iter(['pid', 'name', 'cmdline', 'memory_info']): try: name = p.info['name'] cmd = p.info['cmdline'] if not cmd: continue if 'firefox' not in name and not (cmd and 'firefox' in cmd[0]): continue if slice_name: cpath = get_cgroup_path(p.pid) if not slice_matches(cpath, slice_name): continue procs.append(p) except (psutil.NoSuchProcess, psutil.AccessDenied): continue return procs def total_rss_mb(procs: list['get_firefox_procs_ps_t.res_t.entry_t']): total = 0 for p in procs: try: # total += p.memory_info().rss total += p['rss'] except Exception: logger.exception('') pass return total / (1024 * 1024) def is_main_firefox(p): try: # for arg in p.cmdline(): # for arg in p['cmd'].split(): # if "contentproc" in arg: # return False if 'contentproc' in p['cmd']: return False return True except Exception: logger.exception('') return False def kill_prioritized( procs: list['get_firefox_procs_ps_t.res_t.entry_t'], to_free_mb, low_priority_pids, ): candidates = [] for p in procs: if is_main_firefox(p): continue try: # rss_mb = p.memory_info().rss / (1024 * 1024) rss_mb = p['rss'] / (1024 * 1024) candidates.append((p, rss_mb)) except Exception: logger.exception('') continue candidates.sort(key=lambda x: ((x[0]['pid'] in low_priority_pids), -x[1])) freed = 0.0 killed = [] for p, rss in candidates: if freed >= to_free_mb: break logger.info( dict( p=p, action='kill', msg='started', ) ) try: os.kill(p['pid'], signal.SIGTERM) killed.append(p['pid']) freed += rss except Exception as e: logger.exception(f'Error killing pid {p["pid"]}') # print(f"Error killing pid {p.pid}: {e}", file=sys.stderr) return killed, freed # — systemd-run logic — def launch_firefox_with_limits( base_cmd, memory_high, swap_max, extra_args, unit_name ): cmd = [ 'systemd-run', '--user', '--scope', '-p', f'MemoryHigh={int(memory_high)}M', ] if swap_max is not None: cmd += ['-p', f'MemorySwapMax={int(swap_max)}M'] if unit_name: cmd += ['--unit', unit_name] cmd += base_cmd cmd += extra_args devnull = subprocess.DEVNULL proc = subprocess.Popen(cmd, stdin=devnull, stdout=devnull, stderr=devnull) print('Launched Firefox via systemd-run, PID:', proc.pid, file=sys.stderr) return proc # — Main + TUI + Monitoring — def main(): os.makedirs( pathlib.Path('~/.cache/oom_firefox/').expanduser(), exist_ok=True ) logging.basicConfig( level=logging.INFO, format=( '%(asctime)s ' '%(levelname)-8s ' '%(filename)s:%(lineno)d ' '%(funcName)s – %(message)s' ), handlers=[ logging.handlers.RotatingFileHandler( pathlib.Path('~/.cache/oom_firefox/log').expanduser(), maxBytes=128 * 1024, backupCount=3, ) ], ) parser = argparse.ArgumentParser( description='Firefox memory manager with slice + graceful shutdown' ) parser.add_argument( '--max-mb', type=float, required=True, help='Memory threshold in MB (used for killing logic & MemoryHigh)', ) parser.add_argument( '--kill-percent', type=float, default=70.0, help='If over max, kill until usage ≤ this percent of max', ) parser.add_argument( '--swap-max-mb', type=float, default=None, help='MemorySwapMax (MB) for the systemd scope', ) parser.add_argument( '--interval', type=float, default=1.0, help='Monitoring interval in seconds', ) parser.add_argument( '--unit-name', type=str, default='firefox-limited', help='Name for systemd transient unit', ) parser.add_argument( '--firefox-extra', action='append', default=[], help='Extra CLI args to pass to Firefox (can repeat)', ) parser.add_argument( 'firefox_cmd', nargs=argparse.REMAINDER, help='Firefox command + args (if launching it)', ) args = parser.parse_args() low_priority_pids = set() body = TextArea(focusable=False, scrollbar=True) terminate_flag = threading.Event() lock = threading.Lock() firefox_proc = None def terminate(): terminate_flag.set() app.exit() def stop(): with lock: if firefox_proc: try: firefox_proc.terminate() firefox_proc.wait(timeout=5) except Exception: logger.exception('') try: firefox_proc.kill() except Exception: logger.exception('') pass # app.exit() signal.signal(signal.SIGINT, lambda s, f: terminate()) signal.signal(signal.SIGTERM, lambda s, f: terminate()) def refresh_body(): nonlocal firefox_proc with lock: procs = get_firefox_procs_ps(slice_name=args.unit_name) total = total_rss_mb(procs) limit = args.max_mb kill_to = args.kill_percent / 100.0 * limit lines = [ f'Firefox RSS (slice={args.unit_name}): {total:.1f} MB', f'Threshold (max): {limit:.1f} MB', f'Kill‑to target: {kill_to:.1f} MB ({args.kill_percent}%)', f'Low‑priority PIDs: {sorted(low_priority_pids)}', ] if total > limit: to_free = total - kill_to killed, freed = kill_prioritized( procs, to_free, low_priority_pids ) lines.append(f'Killed: {killed}') lines.append(f'Freed ≈ {freed:.1f} MB') else: lines.append('Within limit — no kill') if firefox_proc and firefox_proc.poll() is not None: print('Firefox died — restarting …', file=sys.stderr) firefox_proc = launch_firefox_with_limits( args.firefox_cmd, memory_high=args.max_mb, swap_max=args.swap_max_mb, extra_args=args.firefox_extra, unit_name=args.unit_name, ) body.text = '\n'.join(lines) dialog_float = [None] root_floats = [] def open_pid_dialog(): ta = TextArea(text='', multiline=True, scrollbar=True) def on_ok(): txt = ta.text for m in re.finditer(r'\((\d+)[^\)\d]*\)', txt): low_priority_pids.add(int(m.group(1))) for m in re.finditer(r'^\s*(\d+)\s*$', txt): low_priority_pids.add(int(m.group(1))) for m in re.finditer(r'^\s*-(\d+)\s*$', txt): low_priority_pids.remove(int(m.group(1))) close_dialog() refresh_body() def on_cancel(): close_dialog() dialog = Dialog( title='Enter low‑priority PIDs', body=ta, buttons=[ Button(text='OK', handler=on_ok), Button(text='Cancel', handler=on_cancel), ], width=60, modal=True, ) f = Float(content=dialog, left=2, top=2) dialog_float[0] = f root_floats.append(f) app.layout.focus(ta) def change_max_mb(max_mb: int) -> None: for cmd in ( [ 'systemctl', '--user', 'set-property', '%s.scope' % args.unit_name, 'MemoryHigh=%dM' % max_mb, ], [ 'systemctl', '--user', 'set-property', '%s.scope' % args.unit_name, 'MemoryMax=%dM' % (max_mb * 1.1), ], ): logger.info(dict(cmd=cmd)) subprocess.check_call(cmd) args.max_mb = max_mb def open_limit_dialog(): ta = TextArea(text='', multiline=True, scrollbar=True) def on_ok(): txt = ta.text m = re.compile(r'^\s*(\d+)\s*$').match(txt) a: str = 234234 if m: change_max_mb(int(m[1])) close_dialog() refresh_body() else: logger.error('invalid input %s' % txt) def on_cancel(): close_dialog() dialog = Dialog( title='Enter maximum memory threshold in MB', body=ta, buttons=[ Button(text='OK', handler=on_ok), Button(text='Cancel', handler=on_cancel), ], width=60, modal=True, ) f = Float(content=dialog, left=2, top=2) dialog_float[0] = f root_floats.append(f) app.layout.focus(ta) def open_message(title, message): def on_close(): close_dialog() dialog = Dialog( title=title, body=Label(text=message), buttons=[Button(text='Close', handler=on_close)], width=50, modal=True, ) f = Float(content=dialog, left=4, top=4) dialog_float[0] = f root_floats.append(f) app.layout.focus(dialog) def close_dialog(): f = dialog_float[0] if f in root_floats: root_floats.remove(f) dialog_float[0] = None app.layout.focus(body) kb = KeyBindings() gkb = ConditionalKeyBindings( key_bindings=kb, filter=Condition(lambda: dialog_float[0] is None), ) @kb.add('q') def _(event): terminate() @kb.add('m') def _(event): open_pid_dialog() @kb.add('l') def _(event): open_limit_dialog() HELP_TEXT = 'm=add PIDs, l=change limit, s=settings, a=about, q=quit' @kb.add('h') def _(event): open_message( 'Help', 'Keys: %s' % HELP_TEXT, ) @kb.add('s') def _(event): open_message( 'Settings', f'max_mb = {args.max_mb}\n' f'kill_percent = {args.kill_percent}\n' f'slice = {args.unit_name}\n' f'swap_max_mb = {args.swap_max_mb}\n' f'extra firefox args = {args.firefox_extra}', ) @kb.add('a') def _(event): open_message('About', f'Version {__version__}\nCreated {__created__}') root = FloatContainer( content=HSplit( [ Frame(body, title='Firefox Memory Manager'), Window( height=1, content=FormattedTextControl( HELP_TEXT, ), ), ] ), floats=root_floats, modal=True, ) style = Style.from_dict( { 'frame.border': 'ansicyan', 'dialog.body': 'bg:#444444', 'dialog': 'bg:#888888', } ) app = Application( layout=Layout(root), key_bindings=gkb, style=style, full_screen=True, refresh_interval=args.interval, ) if args.firefox_cmd: firefox_proc = launch_firefox_with_limits( args.firefox_cmd, memory_high=args.max_mb, swap_max=args.swap_max_mb, # **fixed here** extra_args=args.firefox_extra, unit_name=args.unit_name, ) def monitor_loop(): nonlocal firefox_proc while not terminate_flag.is_set(): try: refresh_body() except: logger.exception('') time.sleep(args.interval) # stop() terminate_flag = threading.Event() t = threading.Thread(target=monitor_loop, daemon=True) t.start() # refresh_body() app.run( # handle_sigint=True ) # from prompt‑toolkit API :contentReference[oaicite:0]{index=0} t.join() stop() if __name__ == '__main__': main()