Coverage for tools/acme_account_rollover.py: 90%

94 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-05-24 19:15 +0000

1#!/usr/bin/env python3 

2# pylint: disable=too-many-statements 

3"""Tiny script to rollover two keys for an ACME account""" 

4import sys 

5import argparse 

6import subprocess 

7import json 

8import base64 

9import binascii 

10import re 

11import copy 

12import logging 

13import requests 

14 

15LOGGER = logging.getLogger("acme_account_rollover") 

16LOGGER.addHandler(logging.StreamHandler()) 

17 

18 

19def _b64(text): 

20 """Encodes text as base64 as specified in ACME RFC.""" 

21 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

22 

23 

24def _openssl(command, options, communicate=None): 

25 """Run openssl command line and raise IOError on non-zero return.""" 

26 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, 

27 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl: 

28 out, err = openssl.communicate(communicate) 

29 if openssl.returncode != 0: 

30 raise IOError("OpenSSL Error: {0}".format(err)) 

31 return out 

32 

33 

34# pylint: disable=too-many-locals 

35def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, timeout, log=LOGGER): 

36 """Rollover the old and new account key for an ACME account.""" 

37 def _get_private_acme_signature(accountkeypath): 

38 """Read the account key to get the signature to authenticate with the ACME server.""" 

39 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) 

40 signature_search = re.search( 

41 r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

42 accountkey.decode("utf8"), re.MULTILINE) 

43 if signature_search is None: 

44 raise ValueError("Unable to retrieve private signature.") 

45 pub_hex, pub_exp = signature_search.groups() 

46 pub_exp = "{0:x}".format(int(pub_exp)) 

47 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

48 return { 

49 "alg": "RS256", 

50 "jwk": { 

51 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

52 "kty": "RSA", 

53 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

54 }, 

55 } 

56 

57 def _sign_request(url, keypath, payload, is_inner=False): 

58 """Signs request with a specific right account key.""" 

59 nonlocal nonce 

60 if payload == "": # on POST-as-GET, final payload has to be just empty string 

61 payload64 = "" 

62 else: 

63 payload64 = _b64(json.dumps(payload).encode("utf8")) 

64 if keypath == new_accountkeypath: 

65 protected = copy.deepcopy(private_acme_new_signature) 

66 elif keypath == old_accountkeypath: 

67 protected = copy.deepcopy(private_acme_old_signature) 

68 else: 

69 raise RuntimeError("Unknown keypath to sign request") 

70 

71 if is_inner or url == acme_config["newAccount"]: 

72 if "kid" in protected: 

73 del protected["kid"] 

74 else: 

75 del protected["jwk"] 

76 

77 if not is_inner: 

78 protected["nonce"] = (nonce 

79 or requests.get( 

80 acme_config["newNonce"], 

81 headers=adt_headers, 

82 timeout=timeout) 

83 .headers['Replay-Nonce']) 

84 protected["url"] = url 

85 protected64 = _b64(json.dumps(protected).encode("utf8")) 

86 signature = _openssl("dgst", ["-sha256", "-sign", keypath], 

87 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

88 return { 

89 "protected": protected64, "payload": payload64, "signature": _b64(signature) 

90 } 

91 

92 def _send_signed_request(url, keypath, payload): 

93 """Sends signed requests to ACME server.""" 

94 nonlocal nonce 

95 jose = _sign_request(url, keypath, payload) 

96 jose_headers = {'Content-Type': 'application/jose+json'} | adt_headers 

97 try: 

98 response = requests.post(url, json=jose, headers=jose_headers, timeout=timeout) 

99 except requests.exceptions.RequestException as error: 

100 response = error.response 

101 if response: 

102 nonce = response.headers['Replay-Nonce'] 

103 try: 

104 return response, response.json() 

105 except ValueError: # if body is empty or not JSON formatted 

106 return response, json.dumps({}) 

107 else: 

108 raise RuntimeError("Unable to get response from ACME server.") 

109 

110 # main code 

111 adt_headers = {'User-Agent': 'acme-dns-tiny/4.0'} 

112 nonce = None 

113 

114 log.info("Fetch informations from the ACME directory.") 

115 acme_config = requests.get(acme_directory, headers=adt_headers, timeout=timeout).json() 

116 

117 log.info("Get private signature from old account key.") 

118 private_acme_old_signature = _get_private_acme_signature(old_accountkeypath) 

119 

120 log.info("Get private signature from new account key.") 

121 private_acme_new_signature = _get_private_acme_signature(new_accountkeypath) 

122 

123 log.info("Ask to the ACME server the account identifier to complete the private signature.") 

124 http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, { 

125 "onlyReturnExisting": True}) 

126 if http_response.status_code == 200: 

127 private_acme_old_signature["kid"] = http_response.headers["Location"] 

128 private_acme_new_signature["kid"] = http_response.headers["Location"] 

129 else: 

130 raise ValueError("Error looking or account URL: {0} {1}" 

131 .format(http_response.status_code, result)) 

132 

133 log.info("Rolling over account keys.") 

134 # The signature by the new key covers the account URL and the old key, 

135 # signifying a request by the new key holder to take over the account from 

136 # the old key holder. 

137 inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, { 

138 "account": private_acme_old_signature["kid"], 

139 "oldKey": private_acme_old_signature["jwk"]}, is_inner=True) 

140 # The signature by the old key covers this request and its signature, and 

141 # indicates the old key holder's assent to the roll-over request. 

142 http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, 

143 inner_payload) 

144 

145 if http_response.status_code != 200: 

146 raise ValueError("Error rolling over account key: {0} {1}" 

147 .format(http_response.status_code, result)) 

148 log.info("Keys rolled over.") 

149 

150 

151def main(argv): 

152 """Parse arguments and roll over the ACME account keys.""" 

153 parser = argparse.ArgumentParser( 

154 formatter_class=argparse.RawDescriptionHelpFormatter, 

155 description="Tiny ACME client to roll over an ACME account key with another one.", 

156 epilog="""This script *rolls over* ACME account keys. 

157 

158It will need to have access to the ACME private account keys, so PLEASE READ THROUGH IT! 

159It's around 150 lines, so it won't take long. 

160 

161Example: roll over account key from account.key to newaccount.key: 

162 python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \ 

163https://acme-staging-v02.api.letsencrypt.org/directory""") 

164 parser.add_argument("--current", required=True, 

165 help="path to the current private account key") 

166 parser.add_argument("--new", required=True, 

167 help="path to the newer private account key to register") 

168 parser.add_argument("--acme-directory", required=True, 

169 help="ACME directory URL of the ACME server where to remove the key") 

170 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, 

171 help="suppress output except for errors") 

172 parser.add_argument("--timeout", type=int, default=10, 

173 help="""Number of seconds to wait before ACME requests time out. 

174 Set it to 0 to wait indefinitely. Defaults to 10.""") 

175 args = parser.parse_args(argv) 

176 

177 LOGGER.setLevel(args.quiet or logging.INFO) 

178 account_rollover(args.current, args.new, args.acme_directory, args.timeout or None, log=LOGGER) 

179 

180 

181if __name__ == "__main__": # pragma: no cover 

182 main(sys.argv[1:])