Coverage for tools/acme_account_deactivate.py: 86%

81 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-05-24 19:15 +0000

1#!/usr/bin/env python3 

2"""Tiny script to deactivate account on an ACME server.""" 

3import sys 

4import argparse 

5import subprocess 

6import json 

7import base64 

8import binascii 

9import re 

10import copy 

11import logging 

12import requests 

13 

14LOGGER = logging.getLogger("acme_account_deactivate") 

15LOGGER.addHandler(logging.StreamHandler()) 

16 

17 

18def _b64(text): 

19 """Encodes text as base64 as specified in ACME RFC.""" 

20 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

21 

22 

23def _openssl(command, options, communicate=None): 

24 """Run openssl command line and raise IOError on non-zero return.""" 

25 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, 

26 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl: 

27 out, err = openssl.communicate(communicate) 

28 if openssl.returncode != 0: 

29 raise IOError("OpenSSL Error: {0}".format(err)) 

30 return out 

31 

32 

33# pylint: disable=too-many-statements 

34def account_deactivate(accountkeypath, acme_directory, timeout, log=LOGGER): 

35 """Deactivate an ACME account.""" 

36 

37 def _send_signed_request(url, payload): 

38 """Sends signed requests to ACME server.""" 

39 nonlocal nonce 

40 if payload == "": # on POST-as-GET, final payload has to be just empty string 

41 payload64 = "" 

42 else: 

43 payload64 = _b64(json.dumps(payload).encode("utf8")) 

44 protected = copy.deepcopy(private_acme_signature) 

45 protected["nonce"] = nonce or requests.get( 

46 acme_config["newNonce"], headers=adt_headers, timeout=timeout).headers['Replay-Nonce'] 

47 del nonce 

48 protected["url"] = url 

49 if url == acme_config["newAccount"]: 

50 if "kid" in protected: 

51 del protected["kid"] 

52 else: 

53 del protected["jwk"] 

54 protected64 = _b64(json.dumps(protected).encode("utf8")) 

55 signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath], 

56 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

57 jose = { 

58 "protected": protected64, "payload": payload64, "signature": _b64(signature) 

59 } 

60 jose_headers = {'Content-Type': 'application/jose+json'} | adt_headers 

61 try: 

62 response = requests.post(url, json=jose, headers=jose_headers, timeout=timeout) 

63 except requests.exceptions.RequestException as error: 

64 response = error.response 

65 if response: 

66 nonce = response.headers['Replay-Nonce'] 

67 try: 

68 return response, response.json() 

69 except ValueError: # if body is empty or not JSON formatted 

70 return response, json.loads("{}") 

71 else: 

72 raise RuntimeError("Unable to get response from ACME server.") 

73 

74 # main code 

75 adt_headers = {'User-Agent': 'acme-dns-tiny/4.0'} 

76 nonce = None 

77 

78 log.info("Fetch informations from the ACME directory.") 

79 acme_config = requests.get(acme_directory, headers=adt_headers, timeout=timeout).json() 

80 

81 log.info("Get private signature from account key.") 

82 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) 

83 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

84 accountkey.decode("utf8"), re.MULTILINE) 

85 if signature_search is None: 

86 raise ValueError("Unable to retrieve private signature.") 

87 pub_hex, pub_exp = signature_search.groups() 

88 pub_exp = "{0:x}".format(int(pub_exp)) 

89 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

90 # That signature is used to authenticate with the ACME server, it needs to be safely kept 

91 private_acme_signature = { 

92 "alg": "RS256", 

93 "jwk": { 

94 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

95 "kty": "RSA", 

96 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

97 }, 

98 } 

99 

100 log.info("Ask to the ACME server the account identifier to complete the private signature.") 

101 http_response, result = _send_signed_request(acme_config["newAccount"], 

102 {"onlyReturnExisting": True}) 

103 if http_response.status_code == 200: 

104 private_acme_signature["kid"] = http_response.headers['Location'] 

105 else: 

106 raise ValueError("Error looking or account URL: {0} {1}" 

107 .format(http_response.status_code, result)) 

108 

109 log.info("Deactivating the account.") 

110 http_response, result = _send_signed_request(private_acme_signature["kid"], 

111 {"status": "deactivated"}) 

112 

113 if http_response.status_code == 200: 

114 log.info("The account has been deactivated.") 

115 else: 

116 raise ValueError("Error while deactivating the account key: {0} {1}" 

117 .format(http_response.status_code, result)) 

118 

119 

120def main(argv): 

121 """Parse arguments and deactivate account.""" 

122 parser = argparse.ArgumentParser( 

123 formatter_class=argparse.RawDescriptionHelpFormatter, 

124 description="Tiny ACME script to deactivate an ACME account", 

125 epilog="""This script permanently *deactivates* an ACME account. 

126 

127You should revoke all TLS certificates linked to the account *before* using this script, 

128as the server won't accept any further request when account is deactivated. 

129 

130It will need to access the ACME private account key, so PLEASE READ THROUGH IT! 

131It's around 150 lines, so it won't take long. 

132 

133Example: deactivate account.key from staging Let's Encrypt: 

134 python3 acme_account_deactivate.py --account-key account.key --acme-directory \ 

135https://acme-staging-v02.api.letsencrypt.org/directory""" 

136 ) 

137 parser.add_argument("--account-key", required=True, 

138 help="path to the private account key to deactivate") 

139 parser.add_argument("--acme-directory", required=True, 

140 help="ACME directory URL of the ACME server where to remove the key") 

141 parser.add_argument("--quiet", action="store_const", 

142 const=logging.ERROR, 

143 help="suppress output except for errors") 

144 parser.add_argument("--timeout", type=int, default=10, 

145 help="""Number of seconds to wait before ACME requests time out. 

146 Set it to 0 to wait indefinitely. Defaults to 10.""") 

147 args = parser.parse_args(argv) 

148 

149 LOGGER.setLevel(args.quiet or logging.INFO) 

150 account_deactivate(args.account_key, args.acme_directory, args.timeout or None, log=LOGGER) 

151 

152 

153if __name__ == "__main__": # pragma: no cover 

154 main(sys.argv[1:])