Coverage for tools/acme_account_rollover.py: 90%
94 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-05-24 19:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-05-24 19:15 +0000
1#!/usr/bin/env python3
2# pylint: disable=too-many-statements
3"""Tiny script to rollover two keys for an ACME account"""
4import sys
5import argparse
6import subprocess
7import json
8import base64
9import binascii
10import re
11import copy
12import logging
13import requests
15LOGGER = logging.getLogger("acme_account_rollover")
16LOGGER.addHandler(logging.StreamHandler())
19def _b64(text):
20 """Encodes text as base64 as specified in ACME RFC."""
21 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
24def _openssl(command, options, communicate=None):
25 """Run openssl command line and raise IOError on non-zero return."""
26 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
27 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl:
28 out, err = openssl.communicate(communicate)
29 if openssl.returncode != 0:
30 raise IOError("OpenSSL Error: {0}".format(err))
31 return out
34# pylint: disable=too-many-locals
35def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, timeout, log=LOGGER):
36 """Rollover the old and new account key for an ACME account."""
37 def _get_private_acme_signature(accountkeypath):
38 """Read the account key to get the signature to authenticate with the ACME server."""
39 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
40 signature_search = re.search(
41 r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
42 accountkey.decode("utf8"), re.MULTILINE)
43 if signature_search is None:
44 raise ValueError("Unable to retrieve private signature.")
45 pub_hex, pub_exp = signature_search.groups()
46 pub_exp = "{0:x}".format(int(pub_exp))
47 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
48 return {
49 "alg": "RS256",
50 "jwk": {
51 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
52 "kty": "RSA",
53 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
54 },
55 }
57 def _sign_request(url, keypath, payload, is_inner=False):
58 """Signs request with a specific right account key."""
59 nonlocal nonce
60 if payload == "": # on POST-as-GET, final payload has to be just empty string
61 payload64 = ""
62 else:
63 payload64 = _b64(json.dumps(payload).encode("utf8"))
64 if keypath == new_accountkeypath:
65 protected = copy.deepcopy(private_acme_new_signature)
66 elif keypath == old_accountkeypath:
67 protected = copy.deepcopy(private_acme_old_signature)
68 else:
69 raise RuntimeError("Unknown keypath to sign request")
71 if is_inner or url == acme_config["newAccount"]:
72 if "kid" in protected:
73 del protected["kid"]
74 else:
75 del protected["jwk"]
77 if not is_inner:
78 protected["nonce"] = (nonce
79 or requests.get(
80 acme_config["newNonce"],
81 headers=adt_headers,
82 timeout=timeout)
83 .headers['Replay-Nonce'])
84 protected["url"] = url
85 protected64 = _b64(json.dumps(protected).encode("utf8"))
86 signature = _openssl("dgst", ["-sha256", "-sign", keypath],
87 "{0}.{1}".format(protected64, payload64).encode("utf8"))
88 return {
89 "protected": protected64, "payload": payload64, "signature": _b64(signature)
90 }
92 def _send_signed_request(url, keypath, payload):
93 """Sends signed requests to ACME server."""
94 nonlocal nonce
95 jose = _sign_request(url, keypath, payload)
96 jose_headers = {'Content-Type': 'application/jose+json'} | adt_headers
97 try:
98 response = requests.post(url, json=jose, headers=jose_headers, timeout=timeout)
99 except requests.exceptions.RequestException as error:
100 response = error.response
101 if response:
102 nonce = response.headers['Replay-Nonce']
103 try:
104 return response, response.json()
105 except ValueError: # if body is empty or not JSON formatted
106 return response, json.dumps({})
107 else:
108 raise RuntimeError("Unable to get response from ACME server.")
110 # main code
111 adt_headers = {'User-Agent': 'acme-dns-tiny/4.0'}
112 nonce = None
114 log.info("Fetch informations from the ACME directory.")
115 acme_config = requests.get(acme_directory, headers=adt_headers, timeout=timeout).json()
117 log.info("Get private signature from old account key.")
118 private_acme_old_signature = _get_private_acme_signature(old_accountkeypath)
120 log.info("Get private signature from new account key.")
121 private_acme_new_signature = _get_private_acme_signature(new_accountkeypath)
123 log.info("Ask to the ACME server the account identifier to complete the private signature.")
124 http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, {
125 "onlyReturnExisting": True})
126 if http_response.status_code == 200:
127 private_acme_old_signature["kid"] = http_response.headers["Location"]
128 private_acme_new_signature["kid"] = http_response.headers["Location"]
129 else:
130 raise ValueError("Error looking or account URL: {0} {1}"
131 .format(http_response.status_code, result))
133 log.info("Rolling over account keys.")
134 # The signature by the new key covers the account URL and the old key,
135 # signifying a request by the new key holder to take over the account from
136 # the old key holder.
137 inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, {
138 "account": private_acme_old_signature["kid"],
139 "oldKey": private_acme_old_signature["jwk"]}, is_inner=True)
140 # The signature by the old key covers this request and its signature, and
141 # indicates the old key holder's assent to the roll-over request.
142 http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath,
143 inner_payload)
145 if http_response.status_code != 200:
146 raise ValueError("Error rolling over account key: {0} {1}"
147 .format(http_response.status_code, result))
148 log.info("Keys rolled over.")
151def main(argv):
152 """Parse arguments and roll over the ACME account keys."""
153 parser = argparse.ArgumentParser(
154 formatter_class=argparse.RawDescriptionHelpFormatter,
155 description="Tiny ACME client to roll over an ACME account key with another one.",
156 epilog="""This script *rolls over* ACME account keys.
158It will need to have access to the ACME private account keys, so PLEASE READ THROUGH IT!
159It's around 150 lines, so it won't take long.
161Example: roll over account key from account.key to newaccount.key:
162 python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \
163https://acme-staging-v02.api.letsencrypt.org/directory""")
164 parser.add_argument("--current", required=True,
165 help="path to the current private account key")
166 parser.add_argument("--new", required=True,
167 help="path to the newer private account key to register")
168 parser.add_argument("--acme-directory", required=True,
169 help="ACME directory URL of the ACME server where to remove the key")
170 parser.add_argument("--quiet", action="store_const", const=logging.ERROR,
171 help="suppress output except for errors")
172 parser.add_argument("--timeout", type=int, default=10,
173 help="""Number of seconds to wait before ACME requests time out.
174 Set it to 0 to wait indefinitely. Defaults to 10.""")
175 args = parser.parse_args(argv)
177 LOGGER.setLevel(args.quiet or logging.INFO)
178 account_rollover(args.current, args.new, args.acme_directory, args.timeout or None, log=LOGGER)
181if __name__ == "__main__": # pragma: no cover
182 main(sys.argv[1:])