1#!/usr/bin/env python3
2# pylint: disable=too-many-statements
3"""Tiny script to rollover two keys for an ACME account"""
4import sys
5import argparse
6import subprocess
7import json
8import base64
9import binascii
10import re
11import copy
12import logging
13import requests
15LOGGER = logging.getLogger("acme_account_rollover")
16LOGGER.addHandler(logging.StreamHandler())
19def _b64(text):
20 """Encodes text as base64 as specified in ACME RFC."""
21 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
24def _openssl(command, options, communicate=None):
25 """Run openssl command line and raise IOError on non-zero return."""
26 with subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
27 stdout=subprocess.PIPE, stderr=subprocess.PIPE) as openssl:
28 out, err = openssl.communicate(communicate)
29 if openssl.returncode != 0:
30 raise IOError("OpenSSL Error: {0}".format(err))
31 return out
34# pylint: disable=too-many-locals
35def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, timeout, log=LOGGER):
36 """Rollover the old and new account key for an ACME account."""
37 def _get_private_acme_signature(accountkeypath):
38 """Read the account key to get the signature to authenticate with the ACME server."""
39 accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
40 signature_search = re.search(
41 r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
42 accountkey.decode("utf8"), re.MULTILINE)
43 if signature_search is None:
44 raise ValueError("Unable to retrieve private signature.")
45 pub_hex, pub_exp = signature_search.groups()
46 pub_exp = "{0:x}".format(int(pub_exp))
47 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
48 return {
49 "alg": "RS256",
50 "jwk": {
51 "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
52 "kty": "RSA",
53 "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
54 },
55 }
57 def _sign_request(url, keypath, payload, is_inner=False):
58 """Signs request with a specific right account key."""
59 nonlocal nonce
60 if payload == "": # on POST-as-GET, final payload has to be just empty string
61 payload64 = ""
62 else:
63 payload64 = _b64(json.dumps(payload).encode("utf8"))
64 if keypath == new_accountkeypath:
65 protected = copy.deepcopy(private_acme_new_signature)
66 elif keypath == old_accountkeypath:
67 protected = copy.deepcopy(private_acme_old_signature)
68 else:
69 raise RuntimeError("Unknown keypath to sign request")
71 if is_inner or url == acme_config["newAccount"]:
72 if "kid" in protected:
73 del protected["kid"]
74 else:
75 del protected["jwk"]
77 if not is_inner:
78 protected["nonce"] = (nonce
79 or requests.get(
80 acme_config["newNonce"],
81 headers=adtheaders,
82 timeout=timeout)
83 .headers['Replay-Nonce'])
84 protected["url"] = url
85 protected64 = _b64(json.dumps(protected).encode("utf8"))
86 signature = _openssl("dgst", ["-sha256", "-sign", keypath],
87 "{0}.{1}".format(protected64, payload64).encode("utf8"))
88 return {
89 "protected": protected64, "payload": payload64, "signature": _b64(signature)
90 }
92 def _send_signed_request(url, keypath, payload):
93 """Sends signed requests to ACME server."""
94 nonlocal nonce
95 jose = _sign_request(url, keypath, payload)
96 joseheaders = {
97 'User-Agent': adtheaders.get('User-Agent'),
98 'Content-Type': 'application/jose+json'
99 }
100 try:
101 response = requests.post(url, json=jose, headers=joseheaders, timeout=timeout)
102 except requests.exceptions.RequestException as error:
103 response = error.response
104 if response:
105 nonce = response.headers['Replay-Nonce']
106 try:
107 return response, response.json()
108 except ValueError: # if body is empty or not JSON formatted
109 return response, json.dumps({})
110 else:
111 raise RuntimeError("Unable to get response from ACME server.")
113 # main code
114 adtheaders = {'User-Agent': 'acme-dns-tiny/3.0'}
115 nonce = None
117 log.info("Fetch informations from the ACME directory.")
118 acme_config = requests.get(acme_directory, headers=adtheaders, timeout=timeout).json()
120 log.info("Get private signature from old account key.")
121 private_acme_old_signature = _get_private_acme_signature(old_accountkeypath)
123 log.info("Get private signature from new account key.")
124 private_acme_new_signature = _get_private_acme_signature(new_accountkeypath)
126 log.info("Ask to the ACME server the account identifier to complete the private signature.")
127 http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, {
128 "onlyReturnExisting": True})
129 if http_response.status_code == 200:
130 private_acme_old_signature["kid"] = http_response.headers["Location"]
131 private_acme_new_signature["kid"] = http_response.headers["Location"]
132 else:
133 raise ValueError("Error looking or account URL: {0} {1}"
134 .format(http_response.status_code, result))
136 log.info("Rolling over account keys.")
137 # The signature by the new key covers the account URL and the old key,
138 # signifying a request by the new key holder to take over the account from
139 # the old key holder.
140 inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, {
141 "account": private_acme_old_signature["kid"],
142 "oldKey": private_acme_old_signature["jwk"]}, is_inner=True)
143 # The signature by the old key covers this request and its signature, and
144 # indicates the old key holder's assent to the roll-over request.
145 http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath,
146 inner_payload)
148 if http_response.status_code != 200:
149 raise ValueError("Error rolling over account key: {0} {1}"
150 .format(http_response.status_code, result))
151 log.info("Keys rolled over.")
154def main(argv):
155 """Parse arguments and roll over the ACME account keys."""
156 parser = argparse.ArgumentParser(
157 formatter_class=argparse.RawDescriptionHelpFormatter,
158 description="Tiny ACME client to roll over an ACME account key with another one.",
159 epilog="""This script *rolls over* ACME account keys.
161It will need to have access to the ACME private account keys, so PLEASE READ THROUGH IT!
162It's around 150 lines, so it won't take long.
164Example: roll over account key from account.key to newaccount.key:
165 python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \
166https://acme-staging-v02.api.letsencrypt.org/directory""")
167 parser.add_argument("--current", required=True,
168 help="path to the current private account key")
169 parser.add_argument("--new", required=True,
170 help="path to the newer private account key to register")
171 parser.add_argument("--acme-directory", required=True,
172 help="ACME directory URL of the ACME server where to remove the key")
173 parser.add_argument("--quiet", action="store_const", const=logging.ERROR,
174 help="suppress output except for errors")
175 parser.add_argument("--timeout", type=int, default=10,
176 help="""Number of seconds to wait before ACME requests time out.
177 Set it to 0 to wait indefinitely. Defaults to 10.""")
178 args = parser.parse_args(argv)
180 LOGGER.setLevel(args.quiet or logging.INFO)
181 account_rollover(args.current, args.new, args.acme_directory, args.timeout or None, log=LOGGER)
184if __name__ == "__main__": # pragma: no cover
185 main(sys.argv[1:])