diff --git a/dotfiles/.local/bin/commands b/dotfiles/.local/bin/commands index 457dd82..d2066e5 100755 --- a/dotfiles/.local/bin/commands +++ b/dotfiles/.local/bin/commands @@ -565,6 +565,12 @@ def http_server(argv): def pass_ssh_osx(argv): assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) parser = optparse.OptionParser() + parser.add_option( + '--list', + dest='list', + default=False, + action='store_true', + ) parser.add_option( '--pass_option', dest='pass_option', @@ -591,47 +597,53 @@ def pass_ssh_osx(argv): t1 = options.pass_option assert len(t1) > 0 - print( - 'select on of pass names\n%s' % '\n'.join([ - '%d: %s' % (k, v) - for k, v in enumerate(t1) - ]) - ) - - while True: - try: - t2 = input() - t3 = int(t2) - assert t3 >= 0 and t3 < len(t1) - break - except: - continue reset_gpg_agent = r''' - gpgconf --kill gpg-agent; - gpgconf --reload gpg-agent; + gpgconf --kill gpg-agent && \ + gpgconf --reload gpg-agent ''' + if not options.list: + print( + 'select on of pass names\n%s' % '\n'.join([ + '%d: %s' % (k, v) + for k, v in enumerate(t1) + ]) + ) + + while True: + try: + t2 = input() + t3 = int(t2) + assert t3 >= 0 and t3 < len(t1) + break + except: + continue + + command = r''' + %s + gpg \ + --pinentry-mode=ask \ + -q -u $(cat ~/.password-store/.gpg-id) \ + --decrypt \ + ~/.password-store/%s.gpg && \ + echo -n '['$?']' && \ + %s + ''' % ( + reset_gpg_agent, + t1[t3], + reset_gpg_agent, + ) + else: + command = 'pass list | less -R' + ssh_command = [ 'ssh', '-C', '-o', 'ConnectTimeout 10', '-o', 'ServerAliveInterval 1', *args, '-t', - r''' - %s - gpg \ - --pinentry-mode=ask \ - -q -u $(cat ~/.password-store/.gpg-id) \ - --decrypt \ - ~/.password-store/%s.gpg; - echo -n '['$?']'; - %s - ''' % ( - reset_gpg_agent, - t1[t3], - reset_gpg_agent, - ), + command, ] if options.debug: @@ -641,100 +653,107 @@ def pass_ssh_osx(argv): ) ) - with subprocess.Popen( - ssh_command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) as p: - password = None - last_chunk = None + if options.list: + subprocess.check_call(ssh_command) + else: + def cliboard_set(text): + with subprocess.Popen([ + 'pbcopy', + ], stdin=subprocess.PIPE) as p: + p.stdin.write(text.encode('utf-8')) + p.stdin.flush() + p.stdin.close() + p.wait(1) + assert p.poll() == 0 - hide_password = False - pinentry_delimeter = b'\x1b>' + with subprocess.Popen( + ssh_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) as p: + password = None + last_chunk = None - def transform_callback(data): - nonlocal hide_password - nonlocal pinentry_delimeter + hide_password = False + pinentry_delimeter = b'\x1b>' - data2 = None + def transform_callback(data): + nonlocal hide_password + nonlocal pinentry_delimeter - - if pinentry_delimeter in data: - hide_password = True - pos = data.rfind(pinentry_delimeter) - if pos == -1: - data2 = data - else: - data2 = data[:pos + len(pinentry_delimeter)] - elif data == b'': - #return b'\r\n' - return b'' - elif hide_password: - data2 = b'' - else: data2 = None - return data2 - for chunk in intercept_output( - current_subprocess=p, - return_aggregated=True, - transform_callback=transform_callback, - ): - if chunk['aggregated']: - last_chunk = chunk - break + if pinentry_delimeter in data: + hide_password = True + pos = data.rfind(pinentry_delimeter) + if pos == -1: + data2 = data + else: + data2 = data[:pos + len(pinentry_delimeter)] + elif data == b'': + #return b'\r\n' + return b'' + elif hide_password: + data2 = b'' + else: + data2 = None - assert not last_chunk is None - assert last_chunk['returncode'] == 0 - if options.debug: - pprint.pprint(last_chunk['data']) + return data2 - if last_chunk['data'].endswith('\r\n[0]'.encode('utf-8')) and \ - last_chunk['data'].rfind(pinentry_delimeter) != -1: - last_line = last_chunk['data'].splitlines()[-2] - else: - raise RuntimeError( - 'gpg failure %s' % str( - last_chunk['data'][ - max(last_chunk['data'].find(pinentry_delimeter), -128): - ] + for chunk in intercept_output( + current_subprocess=p, + return_aggregated=True, + transform_callback=transform_callback, + ): + if chunk['aggregated']: + last_chunk = chunk + break + + assert not last_chunk is None + assert last_chunk['returncode'] == 0 + + if options.debug: + pprint.pprint(last_chunk['data']) + + if last_chunk['data'].endswith('\r\n[0]'.encode('utf-8')) and \ + last_chunk['data'].rfind(pinentry_delimeter) != -1: + last_line = last_chunk['data'].splitlines()[-2] + else: + raise RuntimeError( + 'gpg failure %s' % str( + last_chunk['data'][ + max(last_chunk['data'].find(pinentry_delimeter), -128): + ] + ) ) - ) - pos2 = last_line.rfind(pinentry_delimeter) - if pos2 == -1: - last_line2 = last_line - else: - last_line2 = last_line[pos2 + 1:] - password = last_line2.decode('utf-8').rstrip('\r\n') - assert not password is None + pos2 = last_line.rfind(pinentry_delimeter) + if pos2 == -1: + last_line2 = last_line + else: + last_line2 = last_line[ + pos2 + len(pinentry_delimeter): + ] - def cliboard_set(text): - with subprocess.Popen([ - 'pbcopy', - ], stdin=subprocess.PIPE) as p: - p.stdin.write(text.encode('utf-8')) - p.stdin.flush() - p.stdin.close() - p.wait(1) - assert p.poll() == 0 + password = last_line2.decode('utf-8').rstrip('\r\n') + assert not password is None - cliboard_set(password) + cliboard_set(password) - get_time = lambda : datetime.datetime.now().timestamp() - start = get_time() - while True: - cur = get_time() - remains = 10 - (cur - start) - if remains <= 1e-8: - break - else: - print('\r%5.2fs remains' % remains, end='') - time.sleep(0.1) + get_time = lambda : datetime.datetime.now().timestamp() + start = get_time() + while True: + cur = get_time() + remains = 10 - (cur - start) + if remains <= 1e-8: + break + else: + print('\r%5.2fs remains' % remains, end='') + time.sleep(0.1) - cliboard_set('') - print('\rcleared cliboard\n', end='') + cliboard_set('') + print('\rcleared cliboard\n', end='') def player_v1(folder_url, item_id): import sys