Coverage for tools/acme_account_deactivate.py: 86%
81 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-05-24 19:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-05-24 19:15 +0000
1#!/usr/bin/env python3
2"""Tiny script to deactivate account on an ACME server."""
3import sys
4import argparse
5import subprocess
6import json
7import base64
8import binascii
9import re
10import copy
11import logging
12import requests
14LOGGER = logging.getLogger("acme_account_deactivate")
15LOGGER.addHandler(logging.StreamHandler())
18def _b64(text):
19 """Encodes text as base64 as specified in ACME RFC."""
20 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
23def _openssl(command, options, communicate=None):
24 """Run openssl command line and raise IOError on non-zero return."""
25 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
26 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl:
27 out, err = openssl.communicate(communicate)
28 if openssl.returncode != 0:
29 raise IOError("OpenSSL Error: {0}".format(err))
30 return out
33# pylint: disable=too-many-statements
34def account_deactivate(accountkeypath, acme_directory, timeout, log=LOGGER):
35 """Deactivate an ACME account."""
37 def _send_signed_request(url, payload):
38 """Sends signed requests to ACME server."""
39 nonlocal nonce
40 if payload == "": # on POST-as-GET, final payload has to be just empty string
41 payload64 = ""
42 else:
43 payload64 = _b64(json.dumps(payload).encode("utf8"))
44 protected = copy.deepcopy(private_acme_signature)
45 protected["nonce"] = nonce or requests.get(
46 acme_config["newNonce"], headers=adt_headers, timeout=timeout).headers['Replay-Nonce']
47 del nonce
48 protected["url"] = url
49 if url == acme_config["newAccount"]:
50 if "kid" in protected:
51 del protected["kid"]
52 else:
53 del protected["jwk"]
54 protected64 = _b64(json.dumps(protected).encode("utf8"))
55 signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
56 "{0}.{1}".format(protected64, payload64).encode("utf8"))
57 jose = {
58 "protected": protected64, "payload": payload64, "signature": _b64(signature)
59 }
60 jose_headers = {'Content-Type': 'application/jose+json'} | adt_headers
61 try:
62 response = requests.post(url, json=jose, headers=jose_headers, timeout=timeout)
63 except requests.exceptions.RequestException as error:
64 response = error.response
65 if response:
66 nonce = response.headers['Replay-Nonce']
67 try:
68 return response, response.json()
69 except ValueError: # if body is empty or not JSON formatted
70 return response, json.loads("{}")
71 else:
72 raise RuntimeError("Unable to get response from ACME server.")
74 # main code
75 adt_headers = {'User-Agent': 'acme-dns-tiny/4.0'}
76 nonce = None
78 log.info("Fetch informations from the ACME directory.")
79 acme_config = requests.get(acme_directory, headers=adt_headers, timeout=timeout).json()
81 log.info("Get private signature from account key.")
82 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
83 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
84 accountkey.decode("utf8"), re.MULTILINE)
85 if signature_search is None:
86 raise ValueError("Unable to retrieve private signature.")
87 pub_hex, pub_exp = signature_search.groups()
88 pub_exp = "{0:x}".format(int(pub_exp))
89 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
90 # That signature is used to authenticate with the ACME server, it needs to be safely kept
91 private_acme_signature = {
92 "alg": "RS256",
93 "jwk": {
94 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
95 "kty": "RSA",
96 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
97 },
98 }
100 log.info("Ask to the ACME server the account identifier to complete the private signature.")
101 http_response, result = _send_signed_request(acme_config["newAccount"],
102 {"onlyReturnExisting": True})
103 if http_response.status_code == 200:
104 private_acme_signature["kid"] = http_response.headers['Location']
105 else:
106 raise ValueError("Error looking or account URL: {0} {1}"
107 .format(http_response.status_code, result))
109 log.info("Deactivating the account.")
110 http_response, result = _send_signed_request(private_acme_signature["kid"],
111 {"status": "deactivated"})
113 if http_response.status_code == 200:
114 log.info("The account has been deactivated.")
115 else:
116 raise ValueError("Error while deactivating the account key: {0} {1}"
117 .format(http_response.status_code, result))
120def main(argv):
121 """Parse arguments and deactivate account."""
122 parser = argparse.ArgumentParser(
123 formatter_class=argparse.RawDescriptionHelpFormatter,
124 description="Tiny ACME script to deactivate an ACME account",
125 epilog="""This script permanently *deactivates* an ACME account.
127You should revoke all TLS certificates linked to the account *before* using this script,
128as the server won't accept any further request when account is deactivated.
130It will need to access the ACME private account key, so PLEASE READ THROUGH IT!
131It's around 150 lines, so it won't take long.
133Example: deactivate account.key from staging Let's Encrypt:
134 python3 acme_account_deactivate.py --account-key account.key --acme-directory \
135https://acme-staging-v02.api.letsencrypt.org/directory"""
136 )
137 parser.add_argument("--account-key", required=True,
138 help="path to the private account key to deactivate")
139 parser.add_argument("--acme-directory", required=True,
140 help="ACME directory URL of the ACME server where to remove the key")
141 parser.add_argument("--quiet", action="store_const",
142 const=logging.ERROR,
143 help="suppress output except for errors")
144 parser.add_argument("--timeout", type=int, default=10,
145 help="""Number of seconds to wait before ACME requests time out.
146 Set it to 0 to wait indefinitely. Defaults to 10.""")
147 args = parser.parse_args(argv)
149 LOGGER.setLevel(args.quiet or logging.INFO)
150 account_deactivate(args.account_key, args.acme_directory, args.timeout or None, log=LOGGER)
153if __name__ == "__main__": # pragma: no cover
154 main(sys.argv[1:])