Coverage for acme_dns_tiny.py: 84%
256 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-05-24 19:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-05-24 19:15 +0000
1#!/usr/bin/env python3
2# pylint: disable=multiple-imports
3"""ACME client to met DNS challenge and receive TLS certificate"""
4import argparse, base64, binascii, configparser, copy, hashlib, json, logging
5import re, sys, subprocess, time
6import requests
7import dns.exception, dns.query, dns.name, dns.resolver, dns.rrset, dns.tsigkeyring, dns.update
9LOGGER = logging.getLogger('acme_dns_tiny')
10LOGGER.addHandler(logging.StreamHandler())
13def _base64(text):
14 """Encodes string as base64 as specified in the ACME RFC."""
15 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
18def _openssl(command, options, communicate=None):
19 """Run openssl command line and raise IOError on non-zero return."""
20 with subprocess.Popen(["openssl", command] + options,
21 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
22 stderr=subprocess.PIPE) as openssl:
23 out, err = openssl.communicate(communicate)
24 if openssl.returncode != 0:
25 raise IOError("OpenSSL Error: {0}".format(err))
26 return out
29# pylint: disable=too-many-locals,too-many-branches,too-many-statements
30def get_crt(config, log=LOGGER):
31 """Get ACME certificate by resolving DNS challenge."""
33 def _get_authoritative_server_ips(zone, resolver):
34 """Get all authoritative server ips for a given zone"""
35 main_name = resolver.resolve(zone, rdtype="SOA", lifetime=dns_timeout)[0].mname
36 nameservers = [ns.target for ns in resolver.resolve(
37 zone, rdtype="NS", lifetime=dns_timeout)]
38 nameservers_ipv4 = []
39 nameservers_ipv6 = []
40 # Add the main (aka "master") name server ip to the head of the list
41 # (see "Requestor Behavior" section of RFC 2136)
42 if main_name in nameservers:
43 nameservers_ipv6 += [ip.address for ip in resolver.resolve(main_name, rdtype="AAAA",
44 raise_on_no_answer=False,
45 lifetime=dns_timeout)]
46 nameservers_ipv4 += [ip.address for ip in resolver.resolve(main_name, rdtype="A",
47 raise_on_no_answer=False,
48 lifetime=dns_timeout)]
49 for nameserver in list(filter(lambda ns: ns != main_name, nameservers)):
50 nameservers_ipv6 += [ip.address for ip in resolver.resolve(nameserver, rdtype="AAAA",
51 raise_on_no_answer=False,
52 lifetime=dns_timeout)]
53 nameservers_ipv4 += [ip.address for ip in resolver.resolve(nameserver, rdtype="A",
54 raise_on_no_answer=False,
55 lifetime=dns_timeout)]
56 nameservers_ips = []
57 for ns_ip in nameservers_ipv6 + nameservers_ipv4:
58 if ns_ip not in nameservers_ips:
59 nameservers_ips.append(ns_ip)
60 return nameservers_ips
62 def _update_dns(rrset, action, resolver):
63 """Updates DNS resource by adding or deleting resource."""
64 algorithm = dns.name.from_text("{0}".format(config["TSIGKeyring"]["Algorithm"].lower()))
65 dns_zone = dns.resolver.zone_for_name(rrset.name, resolver=resolver)
66 # Prepare dns update message
67 dns_update = dns.update.Update(dns_zone,
68 keyring=private_keyring, keyalgorithm=algorithm)
69 if action == "add":
70 dns_update.add(rrset.name, rrset)
71 elif action == "delete":
72 dns_update.delete(rrset.name, rrset)
73 # Send DNS update request to main zone nameservers
74 response = None
75 for nameserver in _get_authoritative_server_ips(dns_zone, resolver):
76 try:
77 response = dns.query.tcp(dns_update, nameserver, timeout=dns_timeout)
78 # pylint: disable=broad-except
79 except Exception as exception:
80 log.debug("Unable to %s DNS resource on dns main server with IP %s, try again "
81 "with next available dns main server IP. Error detail: %s", action,
82 nameserver, exception)
83 response = None
84 if response is not None:
85 break
86 if response is None:
87 raise RuntimeError("Unable to {0} DNS resource to {1}".format(action, rrset.name))
89 def _send_signed_request(url, payload, extra_headers=None):
90 """Sends signed requests to ACME server."""
91 nonlocal nonce
92 if payload == "": # on POST-as-GET, final payload has to be just empty string
93 payload64 = ""
94 else:
95 payload64 = _base64(json.dumps(payload).encode("utf8"))
96 protected = copy.deepcopy(private_acme_signature)
97 protected["nonce"] = nonce or requests.get(acme_config["newNonce"], headers=adt_headers,
98 timeout=acme_timeout).headers['Replay-Nonce']
99 del nonce
100 protected["url"] = url
101 if url == acme_config["newAccount"]:
102 if "kid" in protected:
103 del protected["kid"]
104 else:
105 del protected["jwk"]
106 protected64 = _base64(json.dumps(protected).encode("utf8"))
107 signature = _openssl("dgst", ["-sha256", "-sign", config["acmednstiny"]["AccountKeyFile"]],
108 "{0}.{1}".format(protected64, payload64).encode("utf8"))
109 jose = {
110 "protected": protected64, "payload": payload64, "signature": _base64(signature)
111 }
112 jose_headers = {
113 'Content-Type': 'application/jose+json'} | adt_headers | (extra_headers or {})
114 try:
115 response = requests.post(url, json=jose, headers=jose_headers, timeout=acme_timeout)
116 except requests.exceptions.RequestException as error:
117 response = error.response
118 if response:
119 nonce = response.headers['Replay-Nonce']
120 try:
121 return response, response.json()
122 except ValueError: # if body is empty or not JSON formatted
123 return response, json.loads("{}")
124 else:
125 raise RuntimeError("Unable to get response from ACME server.")
127 # main code
128 acme_timeout = config["acmednstiny"].getint("Timeout") or None
129 dns_timeout = config["DNS"].getint("Timeout") or None
130 adt_headers = {'User-Agent': 'acme-dns-tiny/4.0',
131 'Accept-Language': config["acmednstiny"]["Language"]}
132 nonce = None
134 log.info("Find domains to validate from the Certificate Signing Request (CSR) file.")
135 csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"],
136 "-noout", "-text"]).decode("utf8")
137 domains = set()
138 common_name = re.search(r"Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)", csr)
139 if common_name is not None:
140 domains.add(common_name.group(1))
141 subject_alt_names = re.search(
142 r"X509v3 Subject Alternative Name: (?:critical)?\s+([^\r\n]+)\r?\n",
143 csr, re.MULTILINE)
144 if subject_alt_names is not None:
145 for san in subject_alt_names.group(1).split(", "):
146 if san.startswith("DNS:"):
147 domains.add(san[4:])
148 if len(domains) == 0: # pylint: disable=len-as-condition
149 raise ValueError("Didn't find any domain to validate in the provided CSR.")
151 log.info("Configure DNS client tools.")
152 # That keyring is used to authenticate with the main DNS server, it needs to be safely kept
153 private_keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]:
154 config["TSIGKeyring"]["KeyValue"]})
155 # Prepare DNS resolver
156 resolver = dns.resolver.Resolver(configure=True)
157 nameservers = list(filter(lambda ip: ip != "", config["DNS"]["NameServer"]
158 .replace(" ", "").split(",")))
159 if nameservers:
160 resolver = dns.resolver.Resolver(configure=False)
161 resolver.nameservers = nameservers
163 # explicitly disable the DNS suffix search list as the ACME server doesn't know it
164 resolver.use_search_by_default = False
166 log.info("Get private signature from account key.")
167 accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"],
168 "-noout", "-text"])
169 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
170 accountkey.decode("utf8"), re.MULTILINE)
171 if signature_search is None:
172 raise ValueError("Unable to retrieve private signature.")
173 pub_hex, pub_exp = signature_search.groups()
174 pub_exp = "{0:x}".format(int(pub_exp))
175 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
176 # That signature is used to authenticate with the ACME server, it needs to be safely kept
177 private_acme_signature = {
178 "alg": "RS256",
179 "jwk": {
180 "e": _base64(binascii.unhexlify(pub_exp.encode("utf-8"))),
181 "kty": "RSA",
182 "n": _base64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
183 },
184 }
185 private_jwk = json.dumps(private_acme_signature["jwk"], sort_keys=True, separators=(",", ":"))
186 jwk_thumbprint = _base64(hashlib.sha256(private_jwk.encode("utf8")).digest())
188 log.info("Fetch ACME server configuration from its directory URL.")
189 acme_config = requests.get(config["acmednstiny"]["ACMEDirectory"], headers=adt_headers,
190 timeout=acme_timeout).json()
191 terms_service = acme_config.get("meta", {}).get("termsOfService", "")
193 log.info("Register ACME Account to get the account identifier.")
194 account_request = {}
195 if terms_service:
196 account_request["termsOfServiceAgreed"] = True
197 log.warning(("Terms of service exist and will be automatically agreed if possible, "
198 "you should read them: %s"), terms_service)
199 account_request["contact"] = config["acmednstiny"]["Contacts"].split(';')
200 if account_request["contact"] == [""]:
201 del account_request["contact"]
203 http_response, account_info = _send_signed_request(acme_config["newAccount"], account_request)
204 if http_response.status_code == 201:
205 private_acme_signature["kid"] = http_response.headers['Location']
206 log.info(" - Registered a new account: '%s'", private_acme_signature["kid"])
207 elif http_response.status_code == 200:
208 private_acme_signature["kid"] = http_response.headers['Location']
209 log.debug(" - Account is already registered: '%s'", private_acme_signature["kid"])
211 http_response, account_info = _send_signed_request(private_acme_signature["kid"], "")
212 else:
213 raise ValueError("Error registering account: {0} {1}"
214 .format(http_response.status_code, account_info))
216 log.info("Update contact information if needed.")
217 if ("contact" in account_request
218 and set(account_request["contact"]) != set(account_info["contact"])):
219 http_response, result = _send_signed_request(private_acme_signature["kid"],
220 account_request)
221 if http_response.status_code == 200:
222 log.debug(" - Account updated with latest contact informations.")
223 else:
224 raise ValueError("Error registering updates for the account: {0} {1}"
225 .format(http_response.status_code, result))
227 # new order
228 log.info("Request to the ACME server an order to validate domains.")
229 new_order = {"identifiers": [{"type": "dns", "value": domain} for domain in domains]}
230 http_response, order = _send_signed_request(acme_config["newOrder"], new_order)
231 if http_response.status_code == 201:
232 order_location = http_response.headers['Location']
233 log.debug(" - Order received: %s", order_location)
234 if order["status"] != "pending" and order["status"] != "ready":
235 raise ValueError("Order status is neither pending neither ready, we can't use it: {0}"
236 .format(order))
237 elif (http_response.status_code == 403
238 and order["type"] == "urn:ietf:params:acme:error:userActionRequired"):
239 raise ValueError(("Order creation failed ({0}). Read Terms of Service ({1}), then follow "
240 "your CA instructions: {2}")
241 .format(order["detail"],
242 http_response.headers['Link'], order["instance"]))
243 else:
244 raise ValueError("Error getting new Order: {0} {1}"
245 .format(http_response.status_code, order))
247 # complete each authorization challenge
248 for authz in order["authorizations"]:
249 if order["status"] == "ready":
250 log.info("No challenge to process: order is already ready.")
251 break
253 log.info("Process challenge for authorization: %s", authz)
254 # get new challenge
255 http_response, authorization = _send_signed_request(authz, "")
256 if http_response.status_code != 200:
257 raise ValueError("Error fetching challenges: {0} {1}"
258 .format(http_response.status_code, authorization))
259 domain = authorization["identifier"]["value"]
261 if authorization["status"] == "valid":
262 log.info("Skip authorization for domain %s: this is already validated", domain)
263 continue
264 if authorization["status"] != "pending":
265 raise ValueError("Authorization for the domain {0} can't be validated: "
266 "the authorization is {1}.".format(domain, authorization["status"]))
268 challenges = [c for c in authorization["challenges"] if c["type"] == "dns-01"]
269 if not challenges:
270 raise ValueError("Unable to find a DNS challenge to resolve for domain {0}"
271 .format(domain))
272 log.info("Install DNS TXT resource for domain: %s", domain)
273 challenge = challenges[0]
274 keyauthorization = challenge["token"] + "." + jwk_thumbprint
275 keydigest64 = _base64(hashlib.sha256(keyauthorization.encode("utf8")).digest())
276 dnsrr_domain = "_acme-challenge.{0}.".format(domain)
277 try: # a CNAME resource can be used for advanced TSIG configuration
278 # Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed)
279 dnsrr_domain = [response.to_text() for response
280 in resolver.resolve(dnsrr_domain, rdtype="CNAME",
281 lifetime=dns_timeout)][0]
282 log.info(" - A CNAME resource has been found for this domain, will install TXT on %s",
283 dnsrr_domain)
284 except dns.exception.DNSException as dnsexception:
285 log.debug((" - No CNAME resource has been found for this domain (%s), will "
286 "install TXT directly on %s"), type(dnsexception).__name__, dnsrr_domain)
287 dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"),
288 "IN", "TXT", '"{0}"'.format(keydigest64))
289 try:
290 _update_dns(dnsrr_set, "add", resolver)
291 except dns.exception.DNSException as exception:
292 raise ValueError("Error updating DNS records: {0} : {1}"
293 .format(type(exception).__name__, str(exception))) from exception
295 log.info("Wait for 1 TTL (%s seconds) to ensure DNS cache is cleared.",
296 config["DNS"].getint("TTL"))
297 time.sleep(config["DNS"].getint("TTL"))
298 challenge_verified = False
299 number_check_fail = 1
300 while challenge_verified is False:
301 try:
302 log.info(('Self test (try: %s): Check resource with value "%s" exits on '
303 'nameservers: %s'), number_check_fail, keydigest64,
304 resolver.nameservers)
305 for response in resolver.resolve(dnsrr_domain, rdtype="TXT",
306 lifetime=dns_timeout).rrset:
307 log.debug(" - Found value %s", response.to_text())
308 challenge_verified = (challenge_verified
309 or response.to_text() == '"{0}"'.format(keydigest64))
310 except dns.exception.DNSException as dnsexception:
311 log.info(
312 " - Will retry as a DNS error occurred while checking challenge: %s : %s",
313 type(dnsexception).__name__, dnsexception)
314 finally:
315 if challenge_verified is False:
316 if number_check_fail >= 10:
317 raise ValueError("Error checking challenge, value not found: {0}"
318 .format(keydigest64))
319 number_check_fail = number_check_fail + 1
320 time.sleep(config["DNS"].getint("TTL"))
322 log.info("Asking ACME server to validate challenge.")
323 http_response, result = _send_signed_request(challenge["url"], {})
324 if http_response.status_code != 200:
325 raise ValueError("Error triggering challenge: {0} {1}"
326 .format(http_response.status_code, result))
327 try:
328 while True:
329 http_response, challenge_status = _send_signed_request(challenge["url"], "")
330 if http_response.status_code != 200:
331 raise ValueError("Error during challenge validation: {0} {1}".format(
332 http_response.status_code, challenge_status))
333 if challenge_status["status"] == "pending":
334 time.sleep(2)
335 elif challenge_status["status"] == "valid":
336 log.info("ACME has verified challenge for domain: %s", domain)
337 break
338 else:
339 raise ValueError("Challenge for domain {0} did not pass: {1}".format(
340 domain, challenge_status))
341 finally:
342 _update_dns(dnsrr_set, "delete", resolver)
344 log.info("Request to finalize the order (all challenges have been completed)")
345 csr_der = _base64(_openssl("req", ["-in", config["acmednstiny"]["CSRFile"],
346 "-outform", "DER"]))
347 http_response, result = _send_signed_request(order["finalize"], {"csr": csr_der})
348 if http_response.status_code != 200:
349 raise ValueError("Error while sending the CSR: {0} {1}"
350 .format(http_response.status_code, result))
352 while True:
353 http_response, order = _send_signed_request(order_location, "")
355 if order["status"] == "processing":
356 try:
357 time.sleep(float(http_response.headers["Retry-After"]))
358 except (OverflowError, ValueError, TypeError):
359 time.sleep(2)
360 elif order["status"] == "valid":
361 log.info("Order finalized!")
362 break
363 else:
364 raise ValueError("Finalizing order {0} got errors: {1}".format(
365 order_location, order))
367 http_response, result = _send_signed_request(
368 order["certificate"], "",
369 {'Accept': config["acmednstiny"].get("CertificateFormat",
370 'application/pem-certificate-chain')})
371 if http_response.status_code != 200:
372 raise ValueError("Finalizing order {0} got errors: {1}"
373 .format(http_response.status_code, result))
375 if 'link' in http_response.headers:
376 log.info(" - Certificate links given by server: %s", http_response.headers['link'])
378 log.info("Certificate signed and chain received: %s", order["certificate"])
379 return http_response.text
382def main(argv):
383 """Parse arguments and get certificate."""
384 parser = argparse.ArgumentParser(
385 formatter_class=argparse.RawDescriptionHelpFormatter,
386 description="Tiny ACME client to get TLS certificate by responding to DNS challenges.",
387 epilog="""As the script requires access to your private ACME account key and dns server,
388so PLEASE READ THROUGH IT (it won't take too long, it's a one-file script) !
390Example: requests certificate chain and store it in chain.crt
391 python3 acme_dns_tiny.py ./example.ini > chain.crt
393See example.ini file to configure correctly this script."""
394 )
395 parser.add_argument("--quiet", action="store_const", const=logging.ERROR,
396 help="show only errors on stderr")
397 parser.add_argument("--verbose", action="store_const", const=logging.DEBUG,
398 help="show all debug informations on stderr")
399 parser.add_argument("--csr",
400 help="specifies CSR file path to use instead of the CSRFile option \
401from the configuration file.")
402 parser.add_argument("configfile", help="path to your configuration file")
403 args = parser.parse_args(argv)
405 config = configparser.ConfigParser()
406 config.read_dict({
407 "acmednstiny": {
408 "ACMEDirectory": "https://acme-staging-v02.api.letsencrypt.org/directory",
409 "Language": "en", "Contacts": "", "Timeout": 10},
410 "DNS": {"NameServer": "", "TTL": 10, "Timeout": 10}})
411 config.read(args.configfile)
413 if args.csr:
414 config.set("acmednstiny", "csrfile", args.csr)
416 if (set(["accountkeyfile", "csrfile", "acmedirectory"]) - set(config.options("acmednstiny"))
417 or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring"))):
418 raise ValueError("Some required settings are missing.")
420 LOGGER.setLevel(args.verbose or args.quiet or logging.INFO)
421 signed_crt = get_crt(config, LOGGER)
422 sys.stdout.write(signed_crt)
425if __name__ == "__main__": # pragma: no cover
426 main(sys.argv[1:])