|
|
|
@ -13,6 +13,7 @@ from .. import util
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_output(args, env=None, sp=subprocess):
|
|
|
|
|
"""Call an external binary and return its stdout."""
|
|
|
|
|
log.debug('calling %s with env %s', args, env)
|
|
|
|
@ -24,7 +25,7 @@ def check_output(args, env=None, sp=subprocess):
|
|
|
|
|
def get_agent_sock_path(env=None, sp=subprocess):
|
|
|
|
|
"""Parse gpgconf output to find out GPG agent UNIX socket path."""
|
|
|
|
|
args = [util.which('gpgconf'), '--list-dirs']
|
|
|
|
|
output = check_output(args=args, env=env)
|
|
|
|
|
output = check_output(args=args, env=env, sp=sp)
|
|
|
|
|
lines = output.strip().split(b'\n')
|
|
|
|
|
dirs = dict(line.split(b':', 1) for line in lines)
|
|
|
|
|
log.debug('%s: %s', args, dirs)
|
|
|
|
@ -34,7 +35,8 @@ def get_agent_sock_path(env=None, sp=subprocess):
|
|
|
|
|
def connect_to_agent(env=None, sp=subprocess):
|
|
|
|
|
"""Connect to GPG agent's UNIX socket."""
|
|
|
|
|
sock_path = get_agent_sock_path(sp=sp, env=env)
|
|
|
|
|
check_output(args=['gpg-connect-agent', '/bye']) # Make sure it's running
|
|
|
|
|
# Make sure the original gpg-agent is running.
|
|
|
|
|
check_output(args=['gpg-connect-agent', '/bye'], sp=sp)
|
|
|
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
|
sock.connect(sock_path)
|
|
|
|
|
return sock
|
|
|
|
@ -151,7 +153,7 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
|
|
|
|
|
|
|
|
|
|
assert communicate(sock, 'RESET').startswith(b'OK')
|
|
|
|
|
|
|
|
|
|
ttyname = check_output(args=['tty']).strip()
|
|
|
|
|
ttyname = check_output(args=['tty'], sp=sp).strip()
|
|
|
|
|
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
|
|
|
|
|
|
|
|
|
|
display = (environ or os.environ).get('DISPLAY')
|
|
|
|
@ -188,7 +190,8 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
|
|
|
|
|
|
|
|
|
|
def get_gnupg_components(sp=subprocess):
|
|
|
|
|
"""Parse GnuPG components' paths."""
|
|
|
|
|
output = check_output(args=[util.which('gpgconf'), '--list-components'])
|
|
|
|
|
args = [util.which('gpgconf'), '--list-components']
|
|
|
|
|
output = check_output(args=args, sp=sp)
|
|
|
|
|
components = dict(re.findall('(.*):.*:(.*)', output.decode('ascii')))
|
|
|
|
|
log.debug('gpgconf --list-components: %s', components)
|
|
|
|
|
return components
|
|
|
|
@ -214,14 +217,14 @@ def gpg_command(args, env=None):
|
|
|
|
|
def get_keygrip(user_id, sp=subprocess):
|
|
|
|
|
"""Get a keygrip of the primary GPG key of the specified user."""
|
|
|
|
|
args = gpg_command(['--list-keys', '--with-keygrip', user_id])
|
|
|
|
|
output = check_output(args=args).decode('ascii')
|
|
|
|
|
output = check_output(args=args, sp=sp).decode('ascii')
|
|
|
|
|
return re.findall(r'Keygrip = (\w+)', output)[0]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gpg_version(sp=subprocess):
|
|
|
|
|
"""Get a keygrip of the primary GPG key of the specified user."""
|
|
|
|
|
args = gpg_command(['--version'])
|
|
|
|
|
output = check_output(args=args)
|
|
|
|
|
output = check_output(args=args, sp=sp)
|
|
|
|
|
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
|
|
|
|
|
return line.split(b' ')[-1] # b'2.1.11'
|
|
|
|
|
|
|
|
|
@ -229,7 +232,7 @@ def gpg_version(sp=subprocess):
|
|
|
|
|
def export_public_key(user_id, env=None, sp=subprocess):
|
|
|
|
|
"""Export GPG public key for specified `user_id`."""
|
|
|
|
|
args = gpg_command(['--export', user_id])
|
|
|
|
|
result = check_output(args=args, env=env)
|
|
|
|
|
result = check_output(args=args, env=env, sp=sp)
|
|
|
|
|
if not result:
|
|
|
|
|
log.error('could not find public key %r in local GPG keyring', user_id)
|
|
|
|
|
raise KeyError(user_id)
|
|
|
|
@ -239,7 +242,7 @@ def export_public_key(user_id, env=None, sp=subprocess):
|
|
|
|
|
def export_public_keys(env=None, sp=subprocess):
|
|
|
|
|
"""Export all GPG public keys."""
|
|
|
|
|
args = gpg_command(['--export'])
|
|
|
|
|
result = check_output(args=args, env=env)
|
|
|
|
|
result = check_output(args=args, env=env, sp=sp)
|
|
|
|
|
if not result:
|
|
|
|
|
raise KeyError('No GPG public keys found at env: {!r}'.format(env))
|
|
|
|
|
return result
|
|
|
|
|