1#!/usr/bin/env python3 

2# pylint: disable=too-many-statements 

3"""Tiny script to rollover two keys for an ACME account""" 

4import sys 

5import argparse 

6import subprocess 

7import json 

8import base64 

9import binascii 

10import re 

11import copy 

12import logging 

13import requests 

14 

15LOGGER = logging.getLogger("acme_account_rollover") 

16LOGGER.addHandler(logging.StreamHandler()) 

17 

18 

19def _b64(text): 

20 """Encodes text as base64 as specified in ACME RFC.""" 

21 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

22 

23 

24def _openssl(command, options, communicate=None): 

25 """Run openssl command line and raise IOError on non-zero return.""" 

26 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, 

27 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl: 

28 out, err = openssl.communicate(communicate) 

29 if openssl.returncode != 0: 

30 raise IOError("OpenSSL Error: {0}".format(err)) 

31 return out 

32 

33 

34# pylint: disable=too-many-locals 

35def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, timeout, log=LOGGER): 

36 """Rollover the old and new account key for an ACME account.""" 

37 def _get_private_acme_signature(accountkeypath): 

38 """Read the account key to get the signature to authenticate with the ACME server.""" 

39 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) 

40 signature_search = re.search( 

41 r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

42 accountkey.decode("utf8"), re.MULTILINE) 

43 if signature_search is None: 

44 raise ValueError("Unable to retrieve private signature.") 

45 pub_hex, pub_exp = signature_search.groups() 

46 pub_exp = "{0:x}".format(int(pub_exp)) 

47 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

48 return { 

49 "alg": "RS256", 

50 "jwk": { 

51 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

52 "kty": "RSA", 

53 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

54 }, 

55 } 

56 

57 def _sign_request(url, keypath, payload, is_inner=False): 

58 """Signs request with a specific right account key.""" 

59 nonlocal nonce 

60 if payload == "": # on POST-as-GET, final payload has to be just empty string 

61 payload64 = "" 

62 else: 

63 payload64 = _b64(json.dumps(payload).encode("utf8")) 

64 if keypath == new_accountkeypath: 

65 protected = copy.deepcopy(private_acme_new_signature) 

66 elif keypath == old_accountkeypath: 

67 protected = copy.deepcopy(private_acme_old_signature) 

68 else: 

69 raise RuntimeError("Unknown keypath to sign request") 

70 

71 if is_inner or url == acme_config["newAccount"]: 

72 if "kid" in protected: 

73 del protected["kid"] 

74 else: 

75 del protected["jwk"] 

76 

77 if not is_inner: 

78 protected["nonce"] = (nonce 

79 or requests.get( 

80 acme_config["newNonce"], 

81 headers=adtheaders, 

82 timeout=timeout) 

83 .headers['Replay-Nonce']) 

84 protected["url"] = url 

85 protected64 = _b64(json.dumps(protected).encode("utf8")) 

86 signature = _openssl("dgst", ["-sha256", "-sign", keypath], 

87 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

88 return { 

89 "protected": protected64, "payload": payload64, "signature": _b64(signature) 

90 } 

91 

92 def _send_signed_request(url, keypath, payload): 

93 """Sends signed requests to ACME server.""" 

94 nonlocal nonce 

95 jose = _sign_request(url, keypath, payload) 

96 joseheaders = { 

97 'User-Agent': adtheaders.get('User-Agent'), 

98 'Content-Type': 'application/jose+json' 

99 } 

100 try: 

101 response = requests.post(url, json=jose, headers=joseheaders, timeout=timeout) 

102 except requests.exceptions.RequestException as error: 

103 response = error.response 

104 if response: 

105 nonce = response.headers['Replay-Nonce'] 

106 try: 

107 return response, response.json() 

108 except ValueError: # if body is empty or not JSON formatted 

109 return response, json.dumps({}) 

110 else: 

111 raise RuntimeError("Unable to get response from ACME server.") 

112 

113 # main code 

114 adtheaders = {'User-Agent': 'acme-dns-tiny/3.0'} 

115 nonce = None 

116 

117 log.info("Fetch informations from the ACME directory.") 

118 acme_config = requests.get(acme_directory, headers=adtheaders, timeout=timeout).json() 

119 

120 log.info("Get private signature from old account key.") 

121 private_acme_old_signature = _get_private_acme_signature(old_accountkeypath) 

122 

123 log.info("Get private signature from new account key.") 

124 private_acme_new_signature = _get_private_acme_signature(new_accountkeypath) 

125 

126 log.info("Ask to the ACME server the account identifier to complete the private signature.") 

127 http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, { 

128 "onlyReturnExisting": True}) 

129 if http_response.status_code == 200: 

130 private_acme_old_signature["kid"] = http_response.headers["Location"] 

131 private_acme_new_signature["kid"] = http_response.headers["Location"] 

132 else: 

133 raise ValueError("Error looking or account URL: {0} {1}" 

134 .format(http_response.status_code, result)) 

135 

136 log.info("Rolling over account keys.") 

137 # The signature by the new key covers the account URL and the old key, 

138 # signifying a request by the new key holder to take over the account from 

139 # the old key holder. 

140 inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, { 

141 "account": private_acme_old_signature["kid"], 

142 "oldKey": private_acme_old_signature["jwk"]}, is_inner=True) 

143 # The signature by the old key covers this request and its signature, and 

144 # indicates the old key holder's assent to the roll-over request. 

145 http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, 

146 inner_payload) 

147 

148 if http_response.status_code != 200: 

149 raise ValueError("Error rolling over account key: {0} {1}" 

150 .format(http_response.status_code, result)) 

151 log.info("Keys rolled over.") 

152 

153 

154def main(argv): 

155 """Parse arguments and roll over the ACME account keys.""" 

156 parser = argparse.ArgumentParser( 

157 formatter_class=argparse.RawDescriptionHelpFormatter, 

158 description="Tiny ACME client to roll over an ACME account key with another one.", 

159 epilog="""This script *rolls over* ACME account keys. 

160 

161It will need to have access to the ACME private account keys, so PLEASE READ THROUGH IT! 

162It's around 150 lines, so it won't take long. 

163 

164Example: roll over account key from account.key to newaccount.key: 

165 python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \ 

166https://acme-staging-v02.api.letsencrypt.org/directory""") 

167 parser.add_argument("--current", required=True, 

168 help="path to the current private account key") 

169 parser.add_argument("--new", required=True, 

170 help="path to the newer private account key to register") 

171 parser.add_argument("--acme-directory", required=True, 

172 help="ACME directory URL of the ACME server where to remove the key") 

173 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, 

174 help="suppress output except for errors") 

175 parser.add_argument("--timeout", type=int, default=10, 

176 help="""Number of seconds to wait before ACME requests time out. 

177 Set it to 0 to wait indefinitely. Defaults to 10.""") 

178 args = parser.parse_args(argv) 

179 

180 LOGGER.setLevel(args.quiet or logging.INFO) 

181 account_rollover(args.current, args.new, args.acme_directory, args.timeout or None, log=LOGGER) 

182 

183 

184if __name__ == "__main__": # pragma: no cover 

185 main(sys.argv[1:])