[~] Refactor

This commit is contained in:
Siarhei Siniak 2023-01-01 18:41:21 +03:00
parent c07e30dfa1
commit 31a1c43786

@ -97,6 +97,11 @@ def intercept_output(
if not t5 is None: if not t5 is None:
t6 = t5 t6 = t5
os.write(sys.stdout.fileno(), t6) os.write(sys.stdout.fileno(), t6)
else:
yield dict(
data=b'',
aggregated=False,
)
if return_aggregated: if return_aggregated:
yield dict( yield dict(
@ -1181,31 +1186,11 @@ def desktop_services(argv):
]) ])
services = [] services = []
try: try:
swaylock_cmd = [
'swaylock', '-f', '-d',
]
if not options.background_image is None:
swaylock_cmd.extend(
[
'-i',
'"%s"' % options.background_image,
]
)
subprocess.check_call([
'swaymsg',
'--',
'output',
'*',
'bg',
options.background_image,
'fit',
])
if options.cpufreq == 0: if options.cpufreq == 0:
print('launching cpufreq, need sudo') print('launching cpufreq, need sudo')
services.append( services.append(
subprocess.Popen(r''' subprocess.Popen(r'''
sudo sh -c 'echo cpufreq, user; whoami; exec sudo sh -c 'echo cpufreq, user; whoami;
while true; do while true; do
echo passive > /sys/devices/system/cpu/intel_pstate/status; echo passive > /sys/devices/system/cpu/intel_pstate/status;
echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo; echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo;
@ -1219,20 +1204,158 @@ def desktop_services(argv):
''', shell=True) ''', shell=True)
) )
class start_swayidle:
def __init__(self):
swaylock_cmd = [
'swaylock', '-f', '-d',
]
if not options.background_image is None:
swaylock_cmd.extend(
[
'-i',
'"%s"' % options.background_image,
]
)
subprocess.check_call([
'swaymsg',
'--',
'output',
'*',
'bg',
options.background_image,
'fit',
])
self.commands = dict(
swaylock_cmd2=' '.join(swaylock_cmd),
timeout1='echo t1; swaymsg "output * dpms off";',
lock='echo t6; pkill --signal SIGUSR1 swayidle;',
unlock='echo t7; pkill --signal SIGINT swaylock; swaymsg "output * dpms on";',
unlock2='pkill --signal SIGINT swaylock;',
resume='echo t2; swaymsg "output * dpms on";',
before_sleep='echo t3; loginctl lock-session;',
after_resume='echo t4; pkill --signal SIGUSR1 swayidle;',
)
self.last_force_idle = None
self.commands.update(
timeout2='echo t5; {swaylock_cmd};'.format(
swaylock_cmd=self.commands['swaylock_cmd2']
)
)
self.swayidle = subprocess.Popen(
r'''
exec swayidle -d -w \
timeout 300 'echo t1;' \
lock 'echo t2;' \
unlock 'echo t3;' \
timeout 900 'echo t4;' \
resume 'echo t5; ' \
before-sleep 'echo t6; read;' \
after-resume 'echo t7;'
''',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
self.output = intercept_output(self.swayidle)
self.events = []
self.data = []
def poll(self):
return self.swayidle.poll()
def force_idle(self):
if self.last_force_idle is None or (
datetime.datetime.now() - self.last_force_idle
).total_seconds() >= 10:
self.last_force_idle = datetime.datetime.now()
return True
else:
return False
def terminate(self, *args, **kwargs):
return self.swayidle.terminate(*args, **kwargs)
def kill(self):
return self.swayidle.kill()
def check(self):
while True:
if self.output is None:
break
chunk = next(self.output)
if chunk['aggregated']:
self.output = None
continue
if len(chunk['data']) == 0:
break
self.data.append(chunk)
if b'\n' in chunk['data']:
total = b''.join([
o['data']
for o in self.data
]).decode('utf-8')
sep_pos = total.rfind('\n')
lines = total[:sep_pos].splitlines()
self.data = [
dict(
data=total[sep_pos:].encode('utf-8'),
aggregated=False,
)
]
self.events.extend([
line
for line in lines
if line in [
't1', 't2', 't3', 't4',
't5', 't5', 't6', 't7',
]
])
if len(self.events) > 0:
for o in self.events:
if o == 't1':
if self.force_idle():
subprocess.check_call(self.commands['timeout2'], shell=True)
subprocess.check_call(self.commands['timeout1'], shell=True)
elif o == 't2':
if self.force_idle():
subprocess.check_call(self.commands['lock'], shell=True)
subprocess.check_call(self.commands['timeout2'], shell=True)
subprocess.check_call(self.commands['timeout1'], shell=True)
elif o == 't3':
pass
elif o == 't4':
pass
elif o == 't5':
if self.force_idle():
subprocess.check_call(self.commands['lock'], shell=True)
subprocess.check_call(self.commands['resume'], shell=True),
elif o == 't6':
print('started before-sleep')
subprocess.check_call(self.commands['timeout2'], shell=True),
subprocess.check_call(self.commands['timeout1'], shell=True),
print('started before-done')
self.swayidle.stdin.write(b'\n')
self.swayidle.stdin.flush()
elif o == 't7':
if self.force_idle():
subprocess.check_call(self.commands['lock'], shell=True)
subprocess.check_call(self.commands['resume'], shell=True),
else:
raise NotImplementedError
pprint.pprint(self.events)
del self.events[:]
services.extend([ services.extend([
subprocess.Popen(['ibus-daemon']), subprocess.Popen(['ibus-daemon']),
subprocess.Popen(r''' start_swayidle(),
exec swayidle -d -w \
timeout 300 'echo t1; swaymsg "output * dpms off"' \
lock 'echo t6; pkill --signal SIGUSR1 swayidle;' \
unlock 'echo t7; pkill --signal SIGINT swaylock; swaymsg "output * dpms on";' \
timeout 900 'echo t5; {swaylock_cmd};' \
resume 'echo t2; swaymsg "output * dpms on"' \
before-sleep 'echo t3; loginctl lock-session;' \
after-resume 'echo t4; pkill --signal SIGUSR1 swayidle;'
'''.format(
swaylock_cmd=' '.join(swaylock_cmd),
), shell=True),
]) ])
while True: while True:
@ -1240,7 +1363,11 @@ def desktop_services(argv):
print('done') print('done')
break break
time.sleep(1) for o in services:
if hasattr(o, 'check'):
o.check()
time.sleep(0.1)
except: except:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
finally: finally: