[~] Refactor

This commit is contained in:
Siarhei Siniak 2022-10-28 16:52:01 +03:00
parent c9d787656f
commit eeb473b7aa

@ -1,4 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import datetime
import sys
import select
import subprocess
import pprint
import socket import socket
import optparse import optparse
import os import os
@ -12,6 +17,50 @@ import logging
msg = None msg = None
def intercept_output(
current_subprocess,
return_aggregated=None,
transform_callback=None,
):
if not return_aggregated:
return_aggregated = False
t1 = select.poll()
t1.register(current_subprocess.stdout, select.POLLIN)
print([current_subprocess, current_subprocess.poll()])
output = []
last_data = None
while not (
not current_subprocess.poll() is None and \
not last_data is None and len(last_data) == 0
):
t2 = t1.poll(100)
if len(t2) == 1 and (t2[0][1] & select.POLLIN) > 0 and \
not (isinstance(last_data, bytes) and len(last_data) == 0):
t3 = current_subprocess.stdout.peek()
t4 = current_subprocess.stdout.read(len(t3))
last_data = t3
output.append(t3)
yield dict(
data=t3,
aggregated=False,
)
t6 = t3
if not transform_callback is None:
t5 = transform_callback(t3)
if not t5 is None:
t6 = t5
os.write(sys.stdout.fileno(), t6)
if return_aggregated:
yield dict(
data=b''.join(output),
aggregated=True,
returncode=current_subprocess.poll(),
)
def player_metadata(): def player_metadata():
for k in range(20): for k in range(20):
try: try:
@ -512,6 +561,152 @@ def http_server(argv):
), ),
shell=True) shell=True)
def pass_ssh_osx(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
'--pass_option',
dest='pass_option',
action='append',
default=[],
type=str,
)
parser.add_option(
'--debug',
dest='debug',
action='store_true',
default=False,
)
assert sys.platform == 'darwin'
options, args = parser.parse_args(argv)
if len(args) == 0:
raise RuntimeError('ssh_command is required')
if options.debug:
print(options.pass_option)
pprint.pprint(args)
t1 = options.pass_option
assert len(t1) > 0
print(
'select on of pass names\n%s' % '\n'.join([
'%d: %s' % (k, v)
for k, v in enumerate(t1)
])
)
while True:
try:
t2 = input()
t3 = int(t2)
assert t3 >= 0 and t3 < len(t1)
break
except:
continue
reset_gpg_agent = r'''
gpgconf --kill gpg-agent;
gpgconf --reload gpg-agent;
'''
with subprocess.Popen([
'ssh', '-C',
'-o', 'ConnectTimeout 10',
'-o', 'ServerAliveInterval 1',
*args,
'-t',
r'''
%s
gpg \
--pinentry-mode=ask \
-q -u $(cat ~/.password-store/.gpg-id) \
--decrypt \
~/.password-store/%s.gpg;
%s
''' % (
reset_gpg_agent,
t1[t3],
reset_gpg_agent,
),
], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
password = None
last_chunk = None
hide_password = False
pinentry_delimeter = b'\x1b>'
def transform_callback(data):
nonlocal hide_password
nonlocal pinentry_delimeter
data2 = None
if pinentry_delimeter in data:
hide_password = True
pos = data.rfind(pinentry_delimeter)
if pos == -1:
data2 = data
else:
data2 = data[:pos + len(pinentry_delimeter)]
elif data == b'':
#return b'\r\n'
return b''
elif hide_password:
data2 = b''
else:
data2 = None
return data2
for chunk in intercept_output(
current_subprocess=p,
return_aggregated=True,
transform_callback=transform_callback,
):
if chunk['aggregated']:
last_chunk = chunk
break
assert not last_chunk is None
assert last_chunk['returncode'] == 0
last_line = last_chunk['data'].splitlines()[-1]
pos2 = last_line.rfind(pinentry_delimeter)
if pos2 == -1:
last_line2 = last_line
else:
last_line2 = last_line[pos2 + 1:]
password = last_line2.decode('utf-8').rstrip('\r\n')
assert not password is None
def cliboard_set(text):
with subprocess.Popen([
'pbcopy',
], stdin=subprocess.PIPE) as p:
p.stdin.write(text.encode('utf-8'))
p.stdin.flush()
p.stdin.close()
p.wait(1)
assert p.poll() == 0
cliboard_set(password)
get_time = lambda : datetime.datetime.now().timestamp()
start = get_time()
while True:
cur = get_time()
remains = 10 - (cur - start)
if remains <= 1e-8:
break
else:
print('\r%5.2fs remains' % remains, end='')
time.sleep(0.1)
cliboard_set('')
print('\rcleared cliboard\n', end='')
def player_v1(folder_url, item_id): def player_v1(folder_url, item_id):
import sys import sys
import urllib.parse import urllib.parse
@ -610,6 +805,8 @@ try:
sys.stdout.flush() sys.stdout.flush()
elif sys.argv[1] == 'http-server': elif sys.argv[1] == 'http-server':
http_server(sys.argv[2:]) http_server(sys.argv[2:])
elif sys.argv[1] == 'pass-ssh-osx':
pass_ssh_osx(sys.argv[2:])
elif sys.argv[1] == 'wl-screenshot': elif sys.argv[1] == 'wl-screenshot':
subprocess.check_call(r''' subprocess.check_call(r'''
grim -g "$(slurp)" - | wl-copy grim -g "$(slurp)" - | wl-copy