1#!/usr/bin/env python3 

2"""Tiny script to deactivate account on an ACME server.""" 

3import sys 

4import argparse 

5import subprocess 

6import json 

7import base64 

8import binascii 

9import re 

10import copy 

11import logging 

12import requests 

13 

14LOGGER = logging.getLogger("acme_account_deactivate") 

15LOGGER.addHandler(logging.StreamHandler()) 

16 

17 

18def _b64(text): 

19 """Encodes text as base64 as specified in ACME RFC.""" 

20 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

21 

22 

23def _openssl(command, options, communicate=None): 

24 """Run openssl command line and raise IOError on non-zero return.""" 

25 openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, 

26 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

27 out, err = openssl.communicate(communicate) 

28 if openssl.returncode != 0: 

29 raise IOError("OpenSSL Error: {0}".format(err)) 

30 return out 

31 

32 

33# pylint: disable=too-many-statements 

34def account_deactivate(accountkeypath, acme_directory, log=LOGGER): 

35 """Deactivate an ACME account.""" 

36 

37 def _send_signed_request(url, payload): 

38 """Sends signed requests to ACME server.""" 

39 nonlocal nonce 

40 if payload == "": # on POST-as-GET, final payload has to be just empty string 

41 payload64 = "" 

42 else: 

43 payload64 = _b64(json.dumps(payload).encode("utf8")) 

44 protected = copy.deepcopy(private_acme_signature) 

45 protected["nonce"] = nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] 

46 del nonce 

47 protected["url"] = url 

48 if url == acme_config["newAccount"]: 

49 if "kid" in protected: 

50 del protected["kid"] 

51 else: 

52 del protected["jwk"] 

53 protected64 = _b64(json.dumps(protected).encode("utf8")) 

54 signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath], 

55 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

56 jose = { 

57 "protected": protected64, "payload": payload64, "signature": _b64(signature) 

58 } 

59 joseheaders = { 

60 'User-Agent': adtheaders.get('User-Agent'), 

61 'Content-Type': 'application/jose+json' 

62 } 

63 try: 

64 response = requests.post(url, json=jose, headers=joseheaders) 

65 except requests.exceptions.RequestException as error: 

66 response = error.response 

67 if response: 

68 nonce = response.headers['Replay-Nonce'] 

69 try: 

70 return response, response.json() 

71 except ValueError: # if body is empty or not JSON formatted 

72 return response, json.loads("{}") 

73 else: 

74 raise RuntimeError("Unable to get response from ACME server.") 

75 

76 # main code 

77 adtheaders = {'User-Agent': 'acme-dns-tiny/2.4'} 

78 nonce = None 

79 

80 log.info("Fetch informations from the ACME directory.") 

81 acme_config = requests.get(acme_directory, headers=adtheaders).json() 

82 

83 log.info("Get private signature from account key.") 

84 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) 

85 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

86 accountkey.decode("utf8"), re.MULTILINE) 

87 if signature_search is None: 

88 raise ValueError("Unable to retrieve private signature.") 

89 pub_hex, pub_exp = signature_search.groups() 

90 pub_exp = "{0:x}".format(int(pub_exp)) 

91 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

92 # That signature is used to authenticate with the ACME server, it needs to be safely kept 

93 private_acme_signature = { 

94 "alg": "RS256", 

95 "jwk": { 

96 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

97 "kty": "RSA", 

98 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

99 }, 

100 } 

101 

102 log.info("Ask to the ACME server the account identifier to complete the private signature.") 

103 http_response, result = _send_signed_request(acme_config["newAccount"], 

104 {"onlyReturnExisting": True}) 

105 if http_response.status_code == 200: 

106 private_acme_signature["kid"] = http_response.headers['Location'] 

107 else: 

108 raise ValueError("Error looking or account URL: {0} {1}" 

109 .format(http_response.status_code, result)) 

110 

111 log.info("Deactivating the account.") 

112 http_response, result = _send_signed_request(private_acme_signature["kid"], 

113 {"status": "deactivated"}) 

114 

115 if http_response.status_code == 200: 

116 log.info("The account has been deactivated.") 

117 else: 

118 raise ValueError("Error while deactivating the account key: {0} {1}" 

119 .format(http_response.status_code, result)) 

120 

121 

122def main(argv): 

123 """Parse arguments and deactivate account.""" 

124 parser = argparse.ArgumentParser( 

125 formatter_class=argparse.RawDescriptionHelpFormatter, 

126 description="Tiny ACME script to deactivate an ACME account", 

127 epilog="""This script permanently *deactivates* an ACME account. 

128 

129You should revoke all TLS certificates linked to the account *before* using this script, 

130as the server won't accept any further request when account is deactivated. 

131 

132It will need to access the ACME private account key, so PLEASE READ THROUGH IT! 

133It's around 150 lines, so it won't take long. 

134 

135Example: deactivate account.key from staging Let's Encrypt: 

136 python3 acme_account_deactivate.py --account-key account.key --acme-directory \ 

137https://acme-staging-v02.api.letsencrypt.org/directory""" 

138 ) 

139 parser.add_argument("--account-key", required=True, 

140 help="path to the private account key to deactivate") 

141 parser.add_argument("--acme-directory", required=True, 

142 help="ACME directory URL of the ACME server where to remove the key") 

143 parser.add_argument("--quiet", action="store_const", 

144 const=logging.ERROR, 

145 help="suppress output except for errors") 

146 args = parser.parse_args(argv) 

147 

148 LOGGER.setLevel(args.quiet or logging.INFO) 

149 account_deactivate(args.account_key, args.acme_directory, log=LOGGER) 

150 

151 

152if __name__ == "__main__": # pragma: no cover 

153 main(sys.argv[1:])