[~] Refactor

This commit is contained in:
Siarhei Siniak 2022-11-07 15:30:40 +03:00
parent 0f1f526078
commit 88e0c6f5ee

@ -565,6 +565,12 @@ def http_server(argv):
def pass_ssh_osx(argv): def pass_ssh_osx(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser() parser = optparse.OptionParser()
parser.add_option(
'--list',
dest='list',
default=False,
action='store_true',
)
parser.add_option( parser.add_option(
'--pass_option', '--pass_option',
dest='pass_option', dest='pass_option',
@ -591,47 +597,53 @@ def pass_ssh_osx(argv):
t1 = options.pass_option t1 = options.pass_option
assert len(t1) > 0 assert len(t1) > 0
print(
'select on of pass names\n%s' % '\n'.join([
'%d: %s' % (k, v)
for k, v in enumerate(t1)
])
)
while True:
try:
t2 = input()
t3 = int(t2)
assert t3 >= 0 and t3 < len(t1)
break
except:
continue
reset_gpg_agent = r''' reset_gpg_agent = r'''
gpgconf --kill gpg-agent; gpgconf --kill gpg-agent && \
gpgconf --reload gpg-agent; gpgconf --reload gpg-agent
''' '''
if not options.list:
print(
'select on of pass names\n%s' % '\n'.join([
'%d: %s' % (k, v)
for k, v in enumerate(t1)
])
)
while True:
try:
t2 = input()
t3 = int(t2)
assert t3 >= 0 and t3 < len(t1)
break
except:
continue
command = r'''
%s
gpg \
--pinentry-mode=ask \
-q -u $(cat ~/.password-store/.gpg-id) \
--decrypt \
~/.password-store/%s.gpg && \
echo -n '['$?']' && \
%s
''' % (
reset_gpg_agent,
t1[t3],
reset_gpg_agent,
)
else:
command = 'pass list | less -R'
ssh_command = [ ssh_command = [
'ssh', '-C', 'ssh', '-C',
'-o', 'ConnectTimeout 10', '-o', 'ConnectTimeout 10',
'-o', 'ServerAliveInterval 1', '-o', 'ServerAliveInterval 1',
*args, *args,
'-t', '-t',
r''' command,
%s
gpg \
--pinentry-mode=ask \
-q -u $(cat ~/.password-store/.gpg-id) \
--decrypt \
~/.password-store/%s.gpg;
echo -n '['$?']';
%s
''' % (
reset_gpg_agent,
t1[t3],
reset_gpg_agent,
),
] ]
if options.debug: if options.debug:
@ -641,100 +653,107 @@ def pass_ssh_osx(argv):
) )
) )
with subprocess.Popen( if options.list:
ssh_command, subprocess.check_call(ssh_command)
stdout=subprocess.PIPE, else:
stderr=subprocess.PIPE def cliboard_set(text):
) as p: with subprocess.Popen([
password = None 'pbcopy',
last_chunk = None ], stdin=subprocess.PIPE) as p:
p.stdin.write(text.encode('utf-8'))
p.stdin.flush()
p.stdin.close()
p.wait(1)
assert p.poll() == 0
hide_password = False with subprocess.Popen(
pinentry_delimeter = b'\x1b>' ssh_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
) as p:
password = None
last_chunk = None
def transform_callback(data): hide_password = False
nonlocal hide_password pinentry_delimeter = b'\x1b>'
nonlocal pinentry_delimeter
data2 = None def transform_callback(data):
nonlocal hide_password
nonlocal pinentry_delimeter
if pinentry_delimeter in data:
hide_password = True
pos = data.rfind(pinentry_delimeter)
if pos == -1:
data2 = data
else:
data2 = data[:pos + len(pinentry_delimeter)]
elif data == b'':
#return b'\r\n'
return b''
elif hide_password:
data2 = b''
else:
data2 = None data2 = None
return data2
for chunk in intercept_output( if pinentry_delimeter in data:
current_subprocess=p, hide_password = True
return_aggregated=True, pos = data.rfind(pinentry_delimeter)
transform_callback=transform_callback, if pos == -1:
): data2 = data
if chunk['aggregated']: else:
last_chunk = chunk data2 = data[:pos + len(pinentry_delimeter)]
break elif data == b'':
#return b'\r\n'
return b''
elif hide_password:
data2 = b''
else:
data2 = None
assert not last_chunk is None return data2
assert last_chunk['returncode'] == 0
if options.debug:
pprint.pprint(last_chunk['data'])
if last_chunk['data'].endswith('\r\n[0]'.encode('utf-8')) and \ for chunk in intercept_output(
last_chunk['data'].rfind(pinentry_delimeter) != -1: current_subprocess=p,
last_line = last_chunk['data'].splitlines()[-2] return_aggregated=True,
else: transform_callback=transform_callback,
raise RuntimeError( ):
'gpg failure %s' % str( if chunk['aggregated']:
last_chunk['data'][ last_chunk = chunk
max(last_chunk['data'].find(pinentry_delimeter), -128): break
]
assert not last_chunk is None
assert last_chunk['returncode'] == 0
if options.debug:
pprint.pprint(last_chunk['data'])
if last_chunk['data'].endswith('\r\n[0]'.encode('utf-8')) and \
last_chunk['data'].rfind(pinentry_delimeter) != -1:
last_line = last_chunk['data'].splitlines()[-2]
else:
raise RuntimeError(
'gpg failure %s' % str(
last_chunk['data'][
max(last_chunk['data'].find(pinentry_delimeter), -128):
]
)
) )
)
pos2 = last_line.rfind(pinentry_delimeter) pos2 = last_line.rfind(pinentry_delimeter)
if pos2 == -1: if pos2 == -1:
last_line2 = last_line last_line2 = last_line
else: else:
last_line2 = last_line[pos2 + 1:] last_line2 = last_line[
password = last_line2.decode('utf-8').rstrip('\r\n') pos2 + len(pinentry_delimeter):
assert not password is None ]
def cliboard_set(text): password = last_line2.decode('utf-8').rstrip('\r\n')
with subprocess.Popen([ assert not password is None
'pbcopy',
], stdin=subprocess.PIPE) as p:
p.stdin.write(text.encode('utf-8'))
p.stdin.flush()
p.stdin.close()
p.wait(1)
assert p.poll() == 0
cliboard_set(password) cliboard_set(password)
get_time = lambda : datetime.datetime.now().timestamp() get_time = lambda : datetime.datetime.now().timestamp()
start = get_time() start = get_time()
while True: while True:
cur = get_time() cur = get_time()
remains = 10 - (cur - start) remains = 10 - (cur - start)
if remains <= 1e-8: if remains <= 1e-8:
break break
else: else:
print('\r%5.2fs remains' % remains, end='') print('\r%5.2fs remains' % remains, end='')
time.sleep(0.1) time.sleep(0.1)
cliboard_set('') cliboard_set('')
print('\rcleared cliboard\n', end='') print('\rcleared cliboard\n', end='')
def player_v1(folder_url, item_id): def player_v1(folder_url, item_id):
import sys import sys