1#!/usr/bin/env python3
2"""Tiny script to deactivate account on an ACME server."""
3import sys
4import argparse
5import subprocess
6import json
7import base64
8import binascii
9import re
10import copy
11import logging
12import requests
14LOGGER = logging.getLogger("acme_account_deactivate")
15LOGGER.addHandler(logging.StreamHandler())
18def _b64(text):
19 """Encodes text as base64 as specified in ACME RFC."""
20 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
23def _openssl(command, options, communicate=None):
24 """Run openssl command line and raise IOError on non-zero return."""
25 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
26 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl:
27 out, err = openssl.communicate(communicate)
28 if openssl.returncode != 0:
29 raise IOError("OpenSSL Error: {0}".format(err))
30 return out
33# pylint: disable=too-many-statements
34def account_deactivate(accountkeypath, acme_directory, timeout, log=LOGGER):
35 """Deactivate an ACME account."""
37 def _send_signed_request(url, payload):
38 """Sends signed requests to ACME server."""
39 nonlocal nonce
40 if payload == "": # on POST-as-GET, final payload has to be just empty string
41 payload64 = ""
42 else:
43 payload64 = _b64(json.dumps(payload).encode("utf8"))
44 protected = copy.deepcopy(private_acme_signature)
45 protected["nonce"] = nonce or requests.get(
46 acme_config["newNonce"], headers=adtheaders, timeout=timeout).headers['Replay-Nonce']
47 del nonce
48 protected["url"] = url
49 if url == acme_config["newAccount"]:
50 if "kid" in protected:
51 del protected["kid"]
52 else:
53 del protected["jwk"]
54 protected64 = _b64(json.dumps(protected).encode("utf8"))
55 signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
56 "{0}.{1}".format(protected64, payload64).encode("utf8"))
57 jose = {
58 "protected": protected64, "payload": payload64, "signature": _b64(signature)
59 }
60 joseheaders = {
61 'User-Agent': adtheaders.get('User-Agent'),
62 'Content-Type': 'application/jose+json'
63 }
64 try:
65 response = requests.post(url, json=jose, headers=joseheaders)
66 except requests.exceptions.RequestException as error:
67 response = error.response
68 if response:
69 nonce = response.headers['Replay-Nonce']
70 try:
71 return response, response.json()
72 except ValueError: # if body is empty or not JSON formatted
73 return response, json.loads("{}")
74 else:
75 raise RuntimeError("Unable to get response from ACME server.")
77 # main code
78 adtheaders = {'User-Agent': 'acme-dns-tiny/3.0'}
79 nonce = None
81 log.info("Fetch informations from the ACME directory.")
82 acme_config = requests.get(acme_directory, headers=adtheaders, timeout=timeout).json()
84 log.info("Get private signature from account key.")
85 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
86 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
87 accountkey.decode("utf8"), re.MULTILINE)
88 if signature_search is None:
89 raise ValueError("Unable to retrieve private signature.")
90 pub_hex, pub_exp = signature_search.groups()
91 pub_exp = "{0:x}".format(int(pub_exp))
92 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
93 # That signature is used to authenticate with the ACME server, it needs to be safely kept
94 private_acme_signature = {
95 "alg": "RS256",
96 "jwk": {
97 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
98 "kty": "RSA",
99 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
100 },
101 }
103 log.info("Ask to the ACME server the account identifier to complete the private signature.")
104 http_response, result = _send_signed_request(acme_config["newAccount"],
105 {"onlyReturnExisting": True})
106 if http_response.status_code == 200:
107 private_acme_signature["kid"] = http_response.headers['Location']
108 else:
109 raise ValueError("Error looking or account URL: {0} {1}"
110 .format(http_response.status_code, result))
112 log.info("Deactivating the account.")
113 http_response, result = _send_signed_request(private_acme_signature["kid"],
114 {"status": "deactivated"})
116 if http_response.status_code == 200:
117 log.info("The account has been deactivated.")
118 else:
119 raise ValueError("Error while deactivating the account key: {0} {1}"
120 .format(http_response.status_code, result))
123def main(argv):
124 """Parse arguments and deactivate account."""
125 parser = argparse.ArgumentParser(
126 formatter_class=argparse.RawDescriptionHelpFormatter,
127 description="Tiny ACME script to deactivate an ACME account",
128 epilog="""This script permanently *deactivates* an ACME account.
130You should revoke all TLS certificates linked to the account *before* using this script,
131as the server won't accept any further request when account is deactivated.
133It will need to access the ACME private account key, so PLEASE READ THROUGH IT!
134It's around 150 lines, so it won't take long.
136Example: deactivate account.key from staging Let's Encrypt:
137 python3 acme_account_deactivate.py --account-key account.key --acme-directory \
138https://acme-staging-v02.api.letsencrypt.org/directory"""
139 )
140 parser.add_argument("--account-key", required=True,
141 help="path to the private account key to deactivate")
142 parser.add_argument("--acme-directory", required=True,
143 help="ACME directory URL of the ACME server where to remove the key")
144 parser.add_argument("--quiet", action="store_const",
145 const=logging.ERROR,
146 help="suppress output except for errors")
147 parser.add_argument("--timeout", type=int, default=10,
148 help="""Number of seconds to wait before ACME requests time out.
149 Set it to 0 to wait indefinitely. Defaults to 10.""")
150 args = parser.parse_args(argv)
152 LOGGER.setLevel(args.quiet or logging.INFO)
153 account_deactivate(args.account_key, args.acme_directory, args.timeout or None, log=LOGGER)
156if __name__ == "__main__": # pragma: no cover
157 main(sys.argv[1:])