Coverage for acme_dns_tiny.py: 84%

256 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-05-24 19:15 +0000

1#!/usr/bin/env python3 

2# pylint: disable=multiple-imports 

3"""ACME client to met DNS challenge and receive TLS certificate""" 

4import argparse, base64, binascii, configparser, copy, hashlib, json, logging 

5import re, sys, subprocess, time 

6import requests 

7import dns.exception, dns.query, dns.name, dns.resolver, dns.rrset, dns.tsigkeyring, dns.update 

8 

9LOGGER = logging.getLogger('acme_dns_tiny') 

10LOGGER.addHandler(logging.StreamHandler()) 

11 

12 

13def _base64(text): 

14 """Encodes string as base64 as specified in the ACME RFC.""" 

15 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

16 

17 

18def _openssl(command, options, communicate=None): 

19 """Run openssl command line and raise IOError on non-zero return.""" 

20 with subprocess.Popen(["openssl", command] + options, 

21 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

22 stderr=subprocess.PIPE) as openssl: 

23 out, err = openssl.communicate(communicate) 

24 if openssl.returncode != 0: 

25 raise IOError("OpenSSL Error: {0}".format(err)) 

26 return out 

27 

28 

29# pylint: disable=too-many-locals,too-many-branches,too-many-statements 

30def get_crt(config, log=LOGGER): 

31 """Get ACME certificate by resolving DNS challenge.""" 

32 

33 def _get_authoritative_server_ips(zone, resolver): 

34 """Get all authoritative server ips for a given zone""" 

35 main_name = resolver.resolve(zone, rdtype="SOA", lifetime=dns_timeout)[0].mname 

36 nameservers = [ns.target for ns in resolver.resolve( 

37 zone, rdtype="NS", lifetime=dns_timeout)] 

38 nameservers_ipv4 = [] 

39 nameservers_ipv6 = [] 

40 # Add the main (aka "master") name server ip to the head of the list 

41 # (see "Requestor Behavior" section of RFC 2136) 

42 if main_name in nameservers: 

43 nameservers_ipv6 += [ip.address for ip in resolver.resolve(main_name, rdtype="AAAA", 

44 raise_on_no_answer=False, 

45 lifetime=dns_timeout)] 

46 nameservers_ipv4 += [ip.address for ip in resolver.resolve(main_name, rdtype="A", 

47 raise_on_no_answer=False, 

48 lifetime=dns_timeout)] 

49 for nameserver in list(filter(lambda ns: ns != main_name, nameservers)): 

50 nameservers_ipv6 += [ip.address for ip in resolver.resolve(nameserver, rdtype="AAAA", 

51 raise_on_no_answer=False, 

52 lifetime=dns_timeout)] 

53 nameservers_ipv4 += [ip.address for ip in resolver.resolve(nameserver, rdtype="A", 

54 raise_on_no_answer=False, 

55 lifetime=dns_timeout)] 

56 nameservers_ips = [] 

57 for ns_ip in nameservers_ipv6 + nameservers_ipv4: 

58 if ns_ip not in nameservers_ips: 

59 nameservers_ips.append(ns_ip) 

60 return nameservers_ips 

61 

62 def _update_dns(rrset, action, resolver): 

63 """Updates DNS resource by adding or deleting resource.""" 

64 algorithm = dns.name.from_text("{0}".format(config["TSIGKeyring"]["Algorithm"].lower())) 

65 dns_zone = dns.resolver.zone_for_name(rrset.name, resolver=resolver) 

66 # Prepare dns update message 

67 dns_update = dns.update.Update(dns_zone, 

68 keyring=private_keyring, keyalgorithm=algorithm) 

69 if action == "add": 

70 dns_update.add(rrset.name, rrset) 

71 elif action == "delete": 

72 dns_update.delete(rrset.name, rrset) 

73 # Send DNS update request to main zone nameservers 

74 response = None 

75 for nameserver in _get_authoritative_server_ips(dns_zone, resolver): 

76 try: 

77 response = dns.query.tcp(dns_update, nameserver, timeout=dns_timeout) 

78 # pylint: disable=broad-except 

79 except Exception as exception: 

80 log.debug("Unable to %s DNS resource on dns main server with IP %s, try again " 

81 "with next available dns main server IP. Error detail: %s", action, 

82 nameserver, exception) 

83 response = None 

84 if response is not None: 

85 break 

86 if response is None: 

87 raise RuntimeError("Unable to {0} DNS resource to {1}".format(action, rrset.name)) 

88 

89 def _send_signed_request(url, payload, extra_headers=None): 

90 """Sends signed requests to ACME server.""" 

91 nonlocal nonce 

92 if payload == "": # on POST-as-GET, final payload has to be just empty string 

93 payload64 = "" 

94 else: 

95 payload64 = _base64(json.dumps(payload).encode("utf8")) 

96 protected = copy.deepcopy(private_acme_signature) 

97 protected["nonce"] = nonce or requests.get(acme_config["newNonce"], headers=adt_headers, 

98 timeout=acme_timeout).headers['Replay-Nonce'] 

99 del nonce 

100 protected["url"] = url 

101 if url == acme_config["newAccount"]: 

102 if "kid" in protected: 

103 del protected["kid"] 

104 else: 

105 del protected["jwk"] 

106 protected64 = _base64(json.dumps(protected).encode("utf8")) 

107 signature = _openssl("dgst", ["-sha256", "-sign", config["acmednstiny"]["AccountKeyFile"]], 

108 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

109 jose = { 

110 "protected": protected64, "payload": payload64, "signature": _base64(signature) 

111 } 

112 jose_headers = { 

113 'Content-Type': 'application/jose+json'} | adt_headers | (extra_headers or {}) 

114 try: 

115 response = requests.post(url, json=jose, headers=jose_headers, timeout=acme_timeout) 

116 except requests.exceptions.RequestException as error: 

117 response = error.response 

118 if response: 

119 nonce = response.headers['Replay-Nonce'] 

120 try: 

121 return response, response.json() 

122 except ValueError: # if body is empty or not JSON formatted 

123 return response, json.loads("{}") 

124 else: 

125 raise RuntimeError("Unable to get response from ACME server.") 

126 

127 # main code 

128 acme_timeout = config["acmednstiny"].getint("Timeout") or None 

129 dns_timeout = config["DNS"].getint("Timeout") or None 

130 adt_headers = {'User-Agent': 'acme-dns-tiny/4.0', 

131 'Accept-Language': config["acmednstiny"]["Language"]} 

132 nonce = None 

133 

134 log.info("Find domains to validate from the Certificate Signing Request (CSR) file.") 

135 csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], 

136 "-noout", "-text"]).decode("utf8") 

137 domains = set() 

138 common_name = re.search(r"Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)", csr) 

139 if common_name is not None: 

140 domains.add(common_name.group(1)) 

141 subject_alt_names = re.search( 

142 r"X509v3 Subject Alternative Name: (?:critical)?\s+([^\r\n]+)\r?\n", 

143 csr, re.MULTILINE) 

144 if subject_alt_names is not None: 

145 for san in subject_alt_names.group(1).split(", "): 

146 if san.startswith("DNS:"): 

147 domains.add(san[4:]) 

148 if len(domains) == 0: # pylint: disable=len-as-condition 

149 raise ValueError("Didn't find any domain to validate in the provided CSR.") 

150 

151 log.info("Configure DNS client tools.") 

152 # That keyring is used to authenticate with the main DNS server, it needs to be safely kept 

153 private_keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: 

154 config["TSIGKeyring"]["KeyValue"]}) 

155 # Prepare DNS resolver 

156 resolver = dns.resolver.Resolver(configure=True) 

157 nameservers = list(filter(lambda ip: ip != "", config["DNS"]["NameServer"] 

158 .replace(" ", "").split(","))) 

159 if nameservers: 

160 resolver = dns.resolver.Resolver(configure=False) 

161 resolver.nameservers = nameservers 

162 

163 # explicitly disable the DNS suffix search list as the ACME server doesn't know it 

164 resolver.use_search_by_default = False 

165 

166 log.info("Get private signature from account key.") 

167 accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], 

168 "-noout", "-text"]) 

169 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

170 accountkey.decode("utf8"), re.MULTILINE) 

171 if signature_search is None: 

172 raise ValueError("Unable to retrieve private signature.") 

173 pub_hex, pub_exp = signature_search.groups() 

174 pub_exp = "{0:x}".format(int(pub_exp)) 

175 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

176 # That signature is used to authenticate with the ACME server, it needs to be safely kept 

177 private_acme_signature = { 

178 "alg": "RS256", 

179 "jwk": { 

180 "e": _base64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

181 "kty": "RSA", 

182 "n": _base64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

183 }, 

184 } 

185 private_jwk = json.dumps(private_acme_signature["jwk"], sort_keys=True, separators=(",", ":")) 

186 jwk_thumbprint = _base64(hashlib.sha256(private_jwk.encode("utf8")).digest()) 

187 

188 log.info("Fetch ACME server configuration from its directory URL.") 

189 acme_config = requests.get(config["acmednstiny"]["ACMEDirectory"], headers=adt_headers, 

190 timeout=acme_timeout).json() 

191 terms_service = acme_config.get("meta", {}).get("termsOfService", "") 

192 

193 log.info("Register ACME Account to get the account identifier.") 

194 account_request = {} 

195 if terms_service: 

196 account_request["termsOfServiceAgreed"] = True 

197 log.warning(("Terms of service exist and will be automatically agreed if possible, " 

198 "you should read them: %s"), terms_service) 

199 account_request["contact"] = config["acmednstiny"]["Contacts"].split(';') 

200 if account_request["contact"] == [""]: 

201 del account_request["contact"] 

202 

203 http_response, account_info = _send_signed_request(acme_config["newAccount"], account_request) 

204 if http_response.status_code == 201: 

205 private_acme_signature["kid"] = http_response.headers['Location'] 

206 log.info(" - Registered a new account: '%s'", private_acme_signature["kid"]) 

207 elif http_response.status_code == 200: 

208 private_acme_signature["kid"] = http_response.headers['Location'] 

209 log.debug(" - Account is already registered: '%s'", private_acme_signature["kid"]) 

210 

211 http_response, account_info = _send_signed_request(private_acme_signature["kid"], "") 

212 else: 

213 raise ValueError("Error registering account: {0} {1}" 

214 .format(http_response.status_code, account_info)) 

215 

216 log.info("Update contact information if needed.") 

217 if ("contact" in account_request 

218 and set(account_request["contact"]) != set(account_info["contact"])): 

219 http_response, result = _send_signed_request(private_acme_signature["kid"], 

220 account_request) 

221 if http_response.status_code == 200: 

222 log.debug(" - Account updated with latest contact informations.") 

223 else: 

224 raise ValueError("Error registering updates for the account: {0} {1}" 

225 .format(http_response.status_code, result)) 

226 

227 # new order 

228 log.info("Request to the ACME server an order to validate domains.") 

229 new_order = {"identifiers": [{"type": "dns", "value": domain} for domain in domains]} 

230 http_response, order = _send_signed_request(acme_config["newOrder"], new_order) 

231 if http_response.status_code == 201: 

232 order_location = http_response.headers['Location'] 

233 log.debug(" - Order received: %s", order_location) 

234 if order["status"] != "pending" and order["status"] != "ready": 

235 raise ValueError("Order status is neither pending neither ready, we can't use it: {0}" 

236 .format(order)) 

237 elif (http_response.status_code == 403 

238 and order["type"] == "urn:ietf:params:acme:error:userActionRequired"): 

239 raise ValueError(("Order creation failed ({0}). Read Terms of Service ({1}), then follow " 

240 "your CA instructions: {2}") 

241 .format(order["detail"], 

242 http_response.headers['Link'], order["instance"])) 

243 else: 

244 raise ValueError("Error getting new Order: {0} {1}" 

245 .format(http_response.status_code, order)) 

246 

247 # complete each authorization challenge 

248 for authz in order["authorizations"]: 

249 if order["status"] == "ready": 

250 log.info("No challenge to process: order is already ready.") 

251 break 

252 

253 log.info("Process challenge for authorization: %s", authz) 

254 # get new challenge 

255 http_response, authorization = _send_signed_request(authz, "") 

256 if http_response.status_code != 200: 

257 raise ValueError("Error fetching challenges: {0} {1}" 

258 .format(http_response.status_code, authorization)) 

259 domain = authorization["identifier"]["value"] 

260 

261 if authorization["status"] == "valid": 

262 log.info("Skip authorization for domain %s: this is already validated", domain) 

263 continue 

264 if authorization["status"] != "pending": 

265 raise ValueError("Authorization for the domain {0} can't be validated: " 

266 "the authorization is {1}.".format(domain, authorization["status"])) 

267 

268 challenges = [c for c in authorization["challenges"] if c["type"] == "dns-01"] 

269 if not challenges: 

270 raise ValueError("Unable to find a DNS challenge to resolve for domain {0}" 

271 .format(domain)) 

272 log.info("Install DNS TXT resource for domain: %s", domain) 

273 challenge = challenges[0] 

274 keyauthorization = challenge["token"] + "." + jwk_thumbprint 

275 keydigest64 = _base64(hashlib.sha256(keyauthorization.encode("utf8")).digest()) 

276 dnsrr_domain = "_acme-challenge.{0}.".format(domain) 

277 try: # a CNAME resource can be used for advanced TSIG configuration 

278 # Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed) 

279 dnsrr_domain = [response.to_text() for response 

280 in resolver.resolve(dnsrr_domain, rdtype="CNAME", 

281 lifetime=dns_timeout)][0] 

282 log.info(" - A CNAME resource has been found for this domain, will install TXT on %s", 

283 dnsrr_domain) 

284 except dns.exception.DNSException as dnsexception: 

285 log.debug((" - No CNAME resource has been found for this domain (%s), will " 

286 "install TXT directly on %s"), type(dnsexception).__name__, dnsrr_domain) 

287 dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"), 

288 "IN", "TXT", '"{0}"'.format(keydigest64)) 

289 try: 

290 _update_dns(dnsrr_set, "add", resolver) 

291 except dns.exception.DNSException as exception: 

292 raise ValueError("Error updating DNS records: {0} : {1}" 

293 .format(type(exception).__name__, str(exception))) from exception 

294 

295 log.info("Wait for 1 TTL (%s seconds) to ensure DNS cache is cleared.", 

296 config["DNS"].getint("TTL")) 

297 time.sleep(config["DNS"].getint("TTL")) 

298 challenge_verified = False 

299 number_check_fail = 1 

300 while challenge_verified is False: 

301 try: 

302 log.info(('Self test (try: %s): Check resource with value "%s" exits on ' 

303 'nameservers: %s'), number_check_fail, keydigest64, 

304 resolver.nameservers) 

305 for response in resolver.resolve(dnsrr_domain, rdtype="TXT", 

306 lifetime=dns_timeout).rrset: 

307 log.debug(" - Found value %s", response.to_text()) 

308 challenge_verified = (challenge_verified 

309 or response.to_text() == '"{0}"'.format(keydigest64)) 

310 except dns.exception.DNSException as dnsexception: 

311 log.info( 

312 " - Will retry as a DNS error occurred while checking challenge: %s : %s", 

313 type(dnsexception).__name__, dnsexception) 

314 finally: 

315 if challenge_verified is False: 

316 if number_check_fail >= 10: 

317 raise ValueError("Error checking challenge, value not found: {0}" 

318 .format(keydigest64)) 

319 number_check_fail = number_check_fail + 1 

320 time.sleep(config["DNS"].getint("TTL")) 

321 

322 log.info("Asking ACME server to validate challenge.") 

323 http_response, result = _send_signed_request(challenge["url"], {}) 

324 if http_response.status_code != 200: 

325 raise ValueError("Error triggering challenge: {0} {1}" 

326 .format(http_response.status_code, result)) 

327 try: 

328 while True: 

329 http_response, challenge_status = _send_signed_request(challenge["url"], "") 

330 if http_response.status_code != 200: 

331 raise ValueError("Error during challenge validation: {0} {1}".format( 

332 http_response.status_code, challenge_status)) 

333 if challenge_status["status"] == "pending": 

334 time.sleep(2) 

335 elif challenge_status["status"] == "valid": 

336 log.info("ACME has verified challenge for domain: %s", domain) 

337 break 

338 else: 

339 raise ValueError("Challenge for domain {0} did not pass: {1}".format( 

340 domain, challenge_status)) 

341 finally: 

342 _update_dns(dnsrr_set, "delete", resolver) 

343 

344 log.info("Request to finalize the order (all challenges have been completed)") 

345 csr_der = _base64(_openssl("req", ["-in", config["acmednstiny"]["CSRFile"], 

346 "-outform", "DER"])) 

347 http_response, result = _send_signed_request(order["finalize"], {"csr": csr_der}) 

348 if http_response.status_code != 200: 

349 raise ValueError("Error while sending the CSR: {0} {1}" 

350 .format(http_response.status_code, result)) 

351 

352 while True: 

353 http_response, order = _send_signed_request(order_location, "") 

354 

355 if order["status"] == "processing": 

356 try: 

357 time.sleep(float(http_response.headers["Retry-After"])) 

358 except (OverflowError, ValueError, TypeError): 

359 time.sleep(2) 

360 elif order["status"] == "valid": 

361 log.info("Order finalized!") 

362 break 

363 else: 

364 raise ValueError("Finalizing order {0} got errors: {1}".format( 

365 order_location, order)) 

366 

367 http_response, result = _send_signed_request( 

368 order["certificate"], "", 

369 {'Accept': config["acmednstiny"].get("CertificateFormat", 

370 'application/pem-certificate-chain')}) 

371 if http_response.status_code != 200: 

372 raise ValueError("Finalizing order {0} got errors: {1}" 

373 .format(http_response.status_code, result)) 

374 

375 if 'link' in http_response.headers: 

376 log.info(" - Certificate links given by server: %s", http_response.headers['link']) 

377 

378 log.info("Certificate signed and chain received: %s", order["certificate"]) 

379 return http_response.text 

380 

381 

382def main(argv): 

383 """Parse arguments and get certificate.""" 

384 parser = argparse.ArgumentParser( 

385 formatter_class=argparse.RawDescriptionHelpFormatter, 

386 description="Tiny ACME client to get TLS certificate by responding to DNS challenges.", 

387 epilog="""As the script requires access to your private ACME account key and dns server, 

388so PLEASE READ THROUGH IT (it won't take too long, it's a one-file script) ! 

389 

390Example: requests certificate chain and store it in chain.crt 

391 python3 acme_dns_tiny.py ./example.ini > chain.crt 

392 

393See example.ini file to configure correctly this script.""" 

394 ) 

395 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, 

396 help="show only errors on stderr") 

397 parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, 

398 help="show all debug informations on stderr") 

399 parser.add_argument("--csr", 

400 help="specifies CSR file path to use instead of the CSRFile option \ 

401from the configuration file.") 

402 parser.add_argument("configfile", help="path to your configuration file") 

403 args = parser.parse_args(argv) 

404 

405 config = configparser.ConfigParser() 

406 config.read_dict({ 

407 "acmednstiny": { 

408 "ACMEDirectory": "https://acme-staging-v02.api.letsencrypt.org/directory", 

409 "Language": "en", "Contacts": "", "Timeout": 10}, 

410 "DNS": {"NameServer": "", "TTL": 10, "Timeout": 10}}) 

411 config.read(args.configfile) 

412 

413 if args.csr: 

414 config.set("acmednstiny", "csrfile", args.csr) 

415 

416 if (set(["accountkeyfile", "csrfile", "acmedirectory"]) - set(config.options("acmednstiny")) 

417 or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring"))): 

418 raise ValueError("Some required settings are missing.") 

419 

420 LOGGER.setLevel(args.verbose or args.quiet or logging.INFO) 

421 signed_crt = get_crt(config, LOGGER) 

422 sys.stdout.write(signed_crt) 

423 

424 

425if __name__ == "__main__": # pragma: no cover 

426 main(sys.argv[1:])