1#!/usr/bin/env python3 

2"""Tiny script to deactivate account on an ACME server.""" 

3import sys 

4import argparse 

5import subprocess 

6import json 

7import base64 

8import binascii 

9import re 

10import copy 

11import logging 

12import requests 

13 

14LOGGER = logging.getLogger("acme_account_deactivate") 

15LOGGER.addHandler(logging.StreamHandler()) 

16 

17 

18def _b64(text): 

19 """Encodes text as base64 as specified in ACME RFC.""" 

20 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

21 

22 

23def _openssl(command, options, communicate=None): 

24 """Run openssl command line and raise IOError on non-zero return.""" 

25 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, 

26 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl: 

27 out, err = openssl.communicate(communicate) 

28 if openssl.returncode != 0: 

29 raise IOError("OpenSSL Error: {0}".format(err)) 

30 return out 

31 

32 

33# pylint: disable=too-many-statements 

34def account_deactivate(accountkeypath, acme_directory, timeout, log=LOGGER): 

35 """Deactivate an ACME account.""" 

36 

37 def _send_signed_request(url, payload): 

38 """Sends signed requests to ACME server.""" 

39 nonlocal nonce 

40 if payload == "": # on POST-as-GET, final payload has to be just empty string 

41 payload64 = "" 

42 else: 

43 payload64 = _b64(json.dumps(payload).encode("utf8")) 

44 protected = copy.deepcopy(private_acme_signature) 

45 protected["nonce"] = nonce or requests.get( 

46 acme_config["newNonce"], headers=adtheaders, timeout=timeout).headers['Replay-Nonce'] 

47 del nonce 

48 protected["url"] = url 

49 if url == acme_config["newAccount"]: 

50 if "kid" in protected: 

51 del protected["kid"] 

52 else: 

53 del protected["jwk"] 

54 protected64 = _b64(json.dumps(protected).encode("utf8")) 

55 signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath], 

56 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

57 jose = { 

58 "protected": protected64, "payload": payload64, "signature": _b64(signature) 

59 } 

60 joseheaders = { 

61 'User-Agent': adtheaders.get('User-Agent'), 

62 'Content-Type': 'application/jose+json' 

63 } 

64 try: 

65 response = requests.post(url, json=jose, headers=joseheaders) 

66 except requests.exceptions.RequestException as error: 

67 response = error.response 

68 if response: 

69 nonce = response.headers['Replay-Nonce'] 

70 try: 

71 return response, response.json() 

72 except ValueError: # if body is empty or not JSON formatted 

73 return response, json.loads("{}") 

74 else: 

75 raise RuntimeError("Unable to get response from ACME server.") 

76 

77 # main code 

78 adtheaders = {'User-Agent': 'acme-dns-tiny/3.0'} 

79 nonce = None 

80 

81 log.info("Fetch informations from the ACME directory.") 

82 acme_config = requests.get(acme_directory, headers=adtheaders, timeout=timeout).json() 

83 

84 log.info("Get private signature from account key.") 

85 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) 

86 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

87 accountkey.decode("utf8"), re.MULTILINE) 

88 if signature_search is None: 

89 raise ValueError("Unable to retrieve private signature.") 

90 pub_hex, pub_exp = signature_search.groups() 

91 pub_exp = "{0:x}".format(int(pub_exp)) 

92 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

93 # That signature is used to authenticate with the ACME server, it needs to be safely kept 

94 private_acme_signature = { 

95 "alg": "RS256", 

96 "jwk": { 

97 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

98 "kty": "RSA", 

99 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

100 }, 

101 } 

102 

103 log.info("Ask to the ACME server the account identifier to complete the private signature.") 

104 http_response, result = _send_signed_request(acme_config["newAccount"], 

105 {"onlyReturnExisting": True}) 

106 if http_response.status_code == 200: 

107 private_acme_signature["kid"] = http_response.headers['Location'] 

108 else: 

109 raise ValueError("Error looking or account URL: {0} {1}" 

110 .format(http_response.status_code, result)) 

111 

112 log.info("Deactivating the account.") 

113 http_response, result = _send_signed_request(private_acme_signature["kid"], 

114 {"status": "deactivated"}) 

115 

116 if http_response.status_code == 200: 

117 log.info("The account has been deactivated.") 

118 else: 

119 raise ValueError("Error while deactivating the account key: {0} {1}" 

120 .format(http_response.status_code, result)) 

121 

122 

123def main(argv): 

124 """Parse arguments and deactivate account.""" 

125 parser = argparse.ArgumentParser( 

126 formatter_class=argparse.RawDescriptionHelpFormatter, 

127 description="Tiny ACME script to deactivate an ACME account", 

128 epilog="""This script permanently *deactivates* an ACME account. 

129 

130You should revoke all TLS certificates linked to the account *before* using this script, 

131as the server won't accept any further request when account is deactivated. 

132 

133It will need to access the ACME private account key, so PLEASE READ THROUGH IT! 

134It's around 150 lines, so it won't take long. 

135 

136Example: deactivate account.key from staging Let's Encrypt: 

137 python3 acme_account_deactivate.py --account-key account.key --acme-directory \ 

138https://acme-staging-v02.api.letsencrypt.org/directory""" 

139 ) 

140 parser.add_argument("--account-key", required=True, 

141 help="path to the private account key to deactivate") 

142 parser.add_argument("--acme-directory", required=True, 

143 help="ACME directory URL of the ACME server where to remove the key") 

144 parser.add_argument("--quiet", action="store_const", 

145 const=logging.ERROR, 

146 help="suppress output except for errors") 

147 parser.add_argument("--timeout", type=int, default=10, 

148 help="""Number of seconds to wait before ACME requests time out. 

149 Set it to 0 to wait indefinitely. Defaults to 10.""") 

150 args = parser.parse_args(argv) 

151 

152 LOGGER.setLevel(args.quiet or logging.INFO) 

153 account_deactivate(args.account_key, args.acme_directory, args.timeout or None, log=LOGGER) 

154 

155 

156if __name__ == "__main__": # pragma: no cover 

157 main(sys.argv[1:])