1#!/usr/bin/env python3 

2# pylint: disable=too-many-statements 

3"""Tiny script to rollover two keys for an ACME account""" 

4import sys 

5import argparse 

6import subprocess 

7import json 

8import base64 

9import binascii 

10import re 

11import copy 

12import logging 

13import requests 

14 

15LOGGER = logging.getLogger("acme_account_rollover") 

16LOGGER.addHandler(logging.StreamHandler()) 

17 

18 

19def _b64(text): 

20 """Encodes text as base64 as specified in ACME RFC.""" 

21 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

22 

23 

24def _openssl(command, options, communicate=None): 

25 """Run openssl command line and raise IOError on non-zero return.""" 

26 openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, 

27 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

28 out, err = openssl.communicate(communicate) 

29 if openssl.returncode != 0: 

30 raise IOError("OpenSSL Error: {0}".format(err)) 

31 return out 

32 

33 

34def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER): 

35 """Rollover the old and new account key for an ACME account.""" 

36 def _get_private_acme_signature(accountkeypath): 

37 """Read the account key to get the signature to authenticate with the ACME server.""" 

38 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) 

39 signature_search = re.search( 

40 r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

41 accountkey.decode("utf8"), re.MULTILINE) 

42 if signature_search is None: 

43 raise ValueError("Unable to retrieve private signature.") 

44 pub_hex, pub_exp = signature_search.groups() 

45 pub_exp = "{0:x}".format(int(pub_exp)) 

46 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

47 return { 

48 "alg": "RS256", 

49 "jwk": { 

50 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

51 "kty": "RSA", 

52 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

53 }, 

54 } 

55 

56 def _sign_request(url, keypath, payload, is_inner=False): 

57 """Signs request with a specific right account key.""" 

58 nonlocal nonce 

59 if payload == "": # on POST-as-GET, final payload has to be just empty string 

60 payload64 = "" 

61 else: 

62 payload64 = _b64(json.dumps(payload).encode("utf8")) 

63 if keypath == new_accountkeypath: 

64 protected = copy.deepcopy(private_acme_new_signature) 

65 elif keypath == old_accountkeypath: 

66 protected = copy.deepcopy(private_acme_old_signature) 

67 else: 

68 raise RuntimeError("Unknown keypath to sign request") 

69 

70 if is_inner or url == acme_config["newAccount"]: 

71 if "kid" in protected: 

72 del protected["kid"] 

73 else: 

74 del protected["jwk"] 

75 

76 if not is_inner: 

77 protected["nonce"] = (nonce 

78 or requests.get(acme_config["newNonce"]) 

79 .headers['Replay-Nonce']) 

80 protected["url"] = url 

81 protected64 = _b64(json.dumps(protected).encode("utf8")) 

82 signature = _openssl("dgst", ["-sha256", "-sign", keypath], 

83 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

84 return { 

85 "protected": protected64, "payload": payload64, "signature": _b64(signature) 

86 } 

87 

88 def _send_signed_request(url, keypath, payload): 

89 """Sends signed requests to ACME server.""" 

90 nonlocal nonce 

91 jose = _sign_request(url, keypath, payload) 

92 joseheaders = { 

93 'User-Agent': adtheaders.get('User-Agent'), 

94 'Content-Type': 'application/jose+json' 

95 } 

96 try: 

97 response = requests.post(url, json=jose, headers=joseheaders) 

98 except requests.exceptions.RequestException as error: 

99 response = error.response 

100 if response: 

101 nonce = response.headers['Replay-Nonce'] 

102 try: 

103 return response, response.json() 

104 except ValueError: # if body is empty or not JSON formatted 

105 return response, json.dumps({}) 

106 else: 

107 raise RuntimeError("Unable to get response from ACME server.") 

108 

109 # main code 

110 adtheaders = {'User-Agent': 'acme-dns-tiny/2.4'} 

111 nonce = None 

112 

113 log.info("Fetch informations from the ACME directory.") 

114 acme_config = requests.get(acme_directory, headers=adtheaders).json() 

115 

116 log.info("Get private signature from old account key.") 

117 private_acme_old_signature = _get_private_acme_signature(old_accountkeypath) 

118 

119 log.info("Get private signature from new account key.") 

120 private_acme_new_signature = _get_private_acme_signature(new_accountkeypath) 

121 

122 log.info("Ask to the ACME server the account identifier to complete the private signature.") 

123 http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, { 

124 "onlyReturnExisting": True}) 

125 if http_response.status_code == 200: 

126 private_acme_old_signature["kid"] = http_response.headers["Location"] 

127 private_acme_new_signature["kid"] = http_response.headers["Location"] 

128 else: 

129 raise ValueError("Error looking or account URL: {0} {1}" 

130 .format(http_response.status_code, result)) 

131 

132 log.info("Rolling over account keys.") 

133 # The signature by the new key covers the account URL and the old key, 

134 # signifying a request by the new key holder to take over the account from 

135 # the old key holder. 

136 inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, { 

137 "account": private_acme_old_signature["kid"], 

138 "oldKey": private_acme_old_signature["jwk"]}, is_inner=True) 

139 # The signature by the old key covers this request and its signature, and 

140 # indicates the old key holder's assent to the roll-over request. 

141 http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, 

142 inner_payload) 

143 

144 if http_response.status_code != 200: 

145 raise ValueError("Error rolling over account key: {0} {1}" 

146 .format(http_response.status_code, result)) 

147 log.info("Keys rolled over.") 

148 

149 

150def main(argv): 

151 """Parse arguments and roll over the ACME account keys.""" 

152 parser = argparse.ArgumentParser( 

153 formatter_class=argparse.RawDescriptionHelpFormatter, 

154 description="Tiny ACME client to roll over an ACME account key with another one.", 

155 epilog="""This script *rolls over* ACME account keys. 

156 

157It will need to have access to the ACME private account keys, so PLEASE READ THROUGH IT! 

158It's around 150 lines, so it won't take long. 

159 

160Example: roll over account key from account.key to newaccount.key: 

161 python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \ 

162https://acme-staging-v02.api.letsencrypt.org/directory""") 

163 parser.add_argument("--current", required=True, 

164 help="path to the current private account key") 

165 parser.add_argument("--new", required=True, 

166 help="path to the newer private account key to register") 

167 parser.add_argument("--acme-directory", required=True, 

168 help="ACME directory URL of the ACME server where to remove the key") 

169 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, 

170 help="suppress output except for errors") 

171 args = parser.parse_args(argv) 

172 

173 LOGGER.setLevel(args.quiet or logging.INFO) 

174 account_rollover(args.current, args.new, args.acme_directory, log=LOGGER) 

175 

176 

177if __name__ == "__main__": # pragma: no cover 

178 main(sys.argv[1:])