1#!/usr/bin/env python3 

2# pylint: disable=multiple-imports 

3"""ACME client to met DNS challenge and receive TLS certificate""" 

4import argparse, base64, binascii, configparser, copy, hashlib, json, logging 

5import re, sys, subprocess, time 

6import requests 

7import dns.exception, dns.query, dns.name, dns.resolver, dns.rrset, dns.tsigkeyring, dns.update 

8 

9LOGGER = logging.getLogger('acme_dns_tiny') 

10LOGGER.addHandler(logging.StreamHandler()) 

11 

12 

13def _base64(text): 

14 """Encodes string as base64 as specified in the ACME RFC.""" 

15 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

16 

17 

18def _openssl(command, options, communicate=None): 

19 """Run openssl command line and raise IOError on non-zero return.""" 

20 with subprocess.Popen(["openssl", command] + options, 

21 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

22 stderr=subprocess.PIPE) as openssl: 

23 out, err = openssl.communicate(communicate) 

24 if openssl.returncode != 0: 

25 raise IOError("OpenSSL Error: {0}".format(err)) 

26 return out 

27 

28 

29# pylint: disable=too-many-locals,too-many-branches,too-many-statements 

30def get_crt(config, log=LOGGER): 

31 """Get ACME certificate by resolving DNS challenge.""" 

32 

33 def _get_authoritative_server_ips(zone, resolver): 

34 """Get all authoritative server ips for a given zone""" 

35 main_name = resolver.query(zone, rdtype="SOA", lifetime=dns_timeout)[0].mname 

36 nameservers = [ns.target for ns in resolver.query(zone, rdtype="NS", lifetime=dns_timeout)] 

37 nameservers_ipv4 = [] 

38 nameservers_ipv6 = [] 

39 # Add the main (aka "master") name server ip to the head of the list 

40 # (see "Requestor Behavior" section of RFC 2136) 

41 if main_name in nameservers: 

42 nameservers_ipv6 += [ip.address for ip in resolver.query(main_name, rdtype="AAAA", 

43 raise_on_no_answer=False, 

44 lifetime=dns_timeout)] 

45 nameservers_ipv4 += [ip.address for ip in resolver.query(main_name, rdtype="A", 

46 raise_on_no_answer=False, 

47 lifetime=dns_timeout)] 

48 for nameserver in list(filter(lambda ns: ns != main_name, nameservers)): 

49 nameservers_ipv6 += [ip.address for ip in resolver.query(nameserver, rdtype="AAAA", 

50 raise_on_no_answer=False, 

51 lifetime=dns_timeout)] 

52 nameservers_ipv4 += [ip.address for ip in resolver.query(nameserver, rdtype="A", 

53 raise_on_no_answer=False, 

54 lifetime=dns_timeout)] 

55 nameservers_ips = [] 

56 for ns_ip in nameservers_ipv6 + nameservers_ipv4: 

57 if ns_ip not in nameservers_ips: 

58 nameservers_ips.append(ns_ip) 

59 return nameservers_ips 

60 

61 def _update_dns(rrset, action, resolver): 

62 """Updates DNS resource by adding or deleting resource.""" 

63 algorithm = dns.name.from_text("{0}".format(config["TSIGKeyring"]["Algorithm"].lower())) 

64 dns_zone = dns.resolver.zone_for_name(rrset.name, resolver=resolver) 

65 # Prepare dns update message 

66 dns_update = dns.update.Update(dns_zone, 

67 keyring=private_keyring, keyalgorithm=algorithm) 

68 if action == "add": 

69 dns_update.add(rrset.name, rrset) 

70 elif action == "delete": 

71 dns_update.delete(rrset.name, rrset) 

72 # Send DNS update request to main zone nameservers 

73 response = None 

74 for nameserver in _get_authoritative_server_ips(dns_zone, resolver): 

75 try: 

76 response = dns.query.tcp(dns_update, nameserver, timeout=dns_timeout) 

77 # pylint: disable=broad-except 

78 except Exception as exception: 

79 log.debug("Unable to %s DNS resource on dns main server with IP %s, try again " 

80 "with next available dns main server IP. Error detail: %s", action, 

81 nameserver, exception) 

82 response = None 

83 if response is not None: 

84 break 

85 if response is None: 

86 raise RuntimeError("Unable to {0} DNS resource to {1}".format(action, rrset.name)) 

87 

88 def _send_signed_request(url, payload, extra_headers=None): 

89 """Sends signed requests to ACME server.""" 

90 nonlocal nonce 

91 if payload == "": # on POST-as-GET, final payload has to be just empty string 

92 payload64 = "" 

93 else: 

94 payload64 = _base64(json.dumps(payload).encode("utf8")) 

95 protected = copy.deepcopy(private_acme_signature) 

96 protected["nonce"] = nonce or requests.get(acme_config["newNonce"], headers=adtheaders, 

97 timeout=acme_timeout).headers['Replay-Nonce'] 

98 del nonce 

99 protected["url"] = url 

100 if url == acme_config["newAccount"]: 

101 if "kid" in protected: 

102 del protected["kid"] 

103 else: 

104 del protected["jwk"] 

105 protected64 = _base64(json.dumps(protected).encode("utf8")) 

106 signature = _openssl("dgst", ["-sha256", "-sign", config["acmednstiny"]["AccountKeyFile"]], 

107 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

108 jose = { 

109 "protected": protected64, "payload": payload64, "signature": _base64(signature) 

110 } 

111 joseheaders = {'Content-Type': 'application/jose+json'} 

112 joseheaders.update(adtheaders) 

113 joseheaders.update(extra_headers or {}) 

114 try: 

115 response = requests.post(url, json=jose, headers=joseheaders, timeout=acme_timeout) 

116 except requests.exceptions.RequestException as error: 

117 response = error.response 

118 if response: 

119 nonce = response.headers['Replay-Nonce'] 

120 try: 

121 return response, response.json() 

122 except ValueError: # if body is empty or not JSON formatted 

123 return response, json.loads("{}") 

124 else: 

125 raise RuntimeError("Unable to get response from ACME server.") 

126 

127 # main code 

128 acme_timeout = config["acmednstiny"].getint("Timeout") or None 

129 dns_timeout = config["DNS"].getint("Timeout") or None 

130 adtheaders = {'User-Agent': 'acme-dns-tiny/3.0', 

131 'Accept-Language': config["acmednstiny"]["Language"]} 

132 nonce = None 

133 

134 log.info("Find domains to validate from the Certificate Signing Request (CSR) file.") 

135 csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], 

136 "-noout", "-text"]).decode("utf8") 

137 domains = set() 

138 common_name = re.search(r"Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)", csr) 

139 if common_name is not None: 

140 domains.add(common_name.group(1)) 

141 subject_alt_names = re.search( 

142 r"X509v3 Subject Alternative Name: (?:critical)?\s+([^\r\n]+)\r?\n", 

143 csr, re.MULTILINE) 

144 if subject_alt_names is not None: 

145 for san in subject_alt_names.group(1).split(", "): 

146 if san.startswith("DNS:"): 

147 domains.add(san[4:]) 

148 if len(domains) == 0: # pylint: disable=len-as-condition 

149 raise ValueError("Didn't find any domain to validate in the provided CSR.") 

150 

151 log.info("Configure DNS client tools.") 

152 # That keyring is used to authenticate with the main DNS server, it needs to be safely kept 

153 private_keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: 

154 config["TSIGKeyring"]["KeyValue"]}) 

155 # Prepare DNS resolver 

156 resolver = dns.resolver.Resolver(configure=True) 

157 nameservers = list(filter(lambda ip: ip != "", config["DNS"]["NameServer"] 

158 .replace(" ", "").split(","))) 

159 if nameservers: 

160 resolver = dns.resolver.Resolver(configure=False) 

161 resolver.nameservers = nameservers 

162 

163 log.info("Get private signature from account key.") 

164 accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], 

165 "-noout", "-text"]) 

166 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

167 accountkey.decode("utf8"), re.MULTILINE) 

168 if signature_search is None: 

169 raise ValueError("Unable to retrieve private signature.") 

170 pub_hex, pub_exp = signature_search.groups() 

171 pub_exp = "{0:x}".format(int(pub_exp)) 

172 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

173 # That signature is used to authenticate with the ACME server, it needs to be safely kept 

174 private_acme_signature = { 

175 "alg": "RS256", 

176 "jwk": { 

177 "e": _base64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

178 "kty": "RSA", 

179 "n": _base64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

180 }, 

181 } 

182 private_jwk = json.dumps(private_acme_signature["jwk"], sort_keys=True, separators=(",", ":")) 

183 jwk_thumbprint = _base64(hashlib.sha256(private_jwk.encode("utf8")).digest()) 

184 

185 log.info("Fetch ACME server configuration from its directory URL.") 

186 acme_config = requests.get(config["acmednstiny"]["ACMEDirectory"], headers=adtheaders, 

187 timeout=acme_timeout).json() 

188 terms_service = acme_config.get("meta", {}).get("termsOfService", "") 

189 

190 log.info("Register ACME Account to get the account identifier.") 

191 account_request = {} 

192 if terms_service: 

193 account_request["termsOfServiceAgreed"] = True 

194 log.warning(("Terms of service exist and will be automatically agreed if possible, " 

195 "you should read them: %s"), terms_service) 

196 account_request["contact"] = config["acmednstiny"]["Contacts"].split(';') 

197 if account_request["contact"] == [""]: 

198 del account_request["contact"] 

199 

200 http_response, account_info = _send_signed_request(acme_config["newAccount"], account_request) 

201 if http_response.status_code == 201: 

202 private_acme_signature["kid"] = http_response.headers['Location'] 

203 log.info(" - Registered a new account: '%s'", private_acme_signature["kid"]) 

204 elif http_response.status_code == 200: 

205 private_acme_signature["kid"] = http_response.headers['Location'] 

206 log.debug(" - Account is already registered: '%s'", private_acme_signature["kid"]) 

207 

208 http_response, account_info = _send_signed_request(private_acme_signature["kid"], "") 

209 else: 

210 raise ValueError("Error registering account: {0} {1}" 

211 .format(http_response.status_code, account_info)) 

212 

213 log.info("Update contact information if needed.") 

214 if ("contact" in account_request 

215 and set(account_request["contact"]) != set(account_info["contact"])): 

216 http_response, result = _send_signed_request(private_acme_signature["kid"], 

217 account_request) 

218 if http_response.status_code == 200: 

219 log.debug(" - Account updated with latest contact informations.") 

220 else: 

221 raise ValueError("Error registering updates for the account: {0} {1}" 

222 .format(http_response.status_code, result)) 

223 

224 # new order 

225 log.info("Request to the ACME server an order to validate domains.") 

226 new_order = {"identifiers": [{"type": "dns", "value": domain} for domain in domains]} 

227 http_response, order = _send_signed_request(acme_config["newOrder"], new_order) 

228 if http_response.status_code == 201: 

229 order_location = http_response.headers['Location'] 

230 log.debug(" - Order received: %s", order_location) 

231 if order["status"] != "pending" and order["status"] != "ready": 

232 raise ValueError("Order status is neither pending neither ready, we can't use it: {0}" 

233 .format(order)) 

234 elif (http_response.status_code == 403 

235 and order["type"] == "urn:ietf:params:acme:error:userActionRequired"): 

236 raise ValueError(("Order creation failed ({0}). Read Terms of Service ({1}), then follow " 

237 "your CA instructions: {2}") 

238 .format(order["detail"], 

239 http_response.headers['Link'], order["instance"])) 

240 else: 

241 raise ValueError("Error getting new Order: {0} {1}" 

242 .format(http_response.status_code, order)) 

243 

244 # complete each authorization challenge 

245 for authz in order["authorizations"]: 

246 if order["status"] == "ready": 

247 log.info("No challenge to process: order is already ready.") 

248 break 

249 

250 log.info("Process challenge for authorization: %s", authz) 

251 # get new challenge 

252 http_response, authorization = _send_signed_request(authz, "") 

253 if http_response.status_code != 200: 

254 raise ValueError("Error fetching challenges: {0} {1}" 

255 .format(http_response.status_code, authorization)) 

256 domain = authorization["identifier"]["value"] 

257 

258 if authorization["status"] == "valid": 

259 log.info("Skip authorization for domain %s: this is already validated", domain) 

260 continue 

261 if authorization["status"] != "pending": 

262 raise ValueError("Authorization for the domain {0} can't be validated: " 

263 "the authorization is {1}.".format(domain, authorization["status"])) 

264 

265 challenges = [c for c in authorization["challenges"] if c["type"] == "dns-01"] 

266 if not challenges: 

267 raise ValueError("Unable to find a DNS challenge to resolve for domain {0}" 

268 .format(domain)) 

269 log.info("Install DNS TXT resource for domain: %s", domain) 

270 challenge = challenges[0] 

271 keyauthorization = challenge["token"] + "." + jwk_thumbprint 

272 keydigest64 = _base64(hashlib.sha256(keyauthorization.encode("utf8")).digest()) 

273 dnsrr_domain = "_acme-challenge.{0}.".format(domain) 

274 try: # a CNAME resource can be used for advanced TSIG configuration 

275 # Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed) 

276 dnsrr_domain = [response.to_text() for response 

277 in resolver.query(dnsrr_domain, rdtype="CNAME", 

278 lifetime=dns_timeout)][0] 

279 log.info(" - A CNAME resource has been found for this domain, will install TXT on %s", 

280 dnsrr_domain) 

281 except dns.exception.DNSException as dnsexception: 

282 log.debug((" - No CNAME resource has been found for this domain (%s), will " 

283 "install TXT directly on %s"), type(dnsexception).__name__, dnsrr_domain) 

284 dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"), 

285 "IN", "TXT", '"{0}"'.format(keydigest64)) 

286 try: 

287 _update_dns(dnsrr_set, "add", resolver) 

288 except dns.exception.DNSException as exception: 

289 raise ValueError("Error updating DNS records: {0} : {1}" 

290 .format(type(exception).__name__, str(exception))) from exception 

291 

292 log.info("Wait for 1 TTL (%s seconds) to ensure DNS cache is cleared.", 

293 config["DNS"].getint("TTL")) 

294 time.sleep(config["DNS"].getint("TTL")) 

295 challenge_verified = False 

296 number_check_fail = 1 

297 while challenge_verified is False: 

298 try: 

299 log.info(('Self test (try: %s): Check resource with value "%s" exits on ' 

300 'nameservers: %s'), number_check_fail, keydigest64, 

301 resolver.nameservers) 

302 for response in resolver.query(dnsrr_domain, rdtype="TXT", 

303 lifetime=dns_timeout).rrset: 

304 log.debug(" - Found value %s", response.to_text()) 

305 challenge_verified = (challenge_verified 

306 or response.to_text() == '"{0}"'.format(keydigest64)) 

307 except dns.exception.DNSException as dnsexception: 

308 log.info( 

309 " - Will retry as a DNS error occurred while checking challenge: %s : %s", 

310 type(dnsexception).__name__, dnsexception) 

311 finally: 

312 if challenge_verified is False: 

313 if number_check_fail >= 10: 

314 raise ValueError("Error checking challenge, value not found: {0}" 

315 .format(keydigest64)) 

316 number_check_fail = number_check_fail + 1 

317 time.sleep(config["DNS"].getint("TTL")) 

318 

319 log.info("Asking ACME server to validate challenge.") 

320 http_response, result = _send_signed_request(challenge["url"], {}) 

321 if http_response.status_code != 200: 

322 raise ValueError("Error triggering challenge: {0} {1}" 

323 .format(http_response.status_code, result)) 

324 try: 

325 while True: 

326 http_response, challenge_status = _send_signed_request(challenge["url"], "") 

327 if http_response.status_code != 200: 

328 raise ValueError("Error during challenge validation: {0} {1}".format( 

329 http_response.status_code, challenge_status)) 

330 if challenge_status["status"] == "pending": 

331 time.sleep(2) 

332 elif challenge_status["status"] == "valid": 

333 log.info("ACME has verified challenge for domain: %s", domain) 

334 break 

335 else: 

336 raise ValueError("Challenge for domain {0} did not pass: {1}".format( 

337 domain, challenge_status)) 

338 finally: 

339 _update_dns(dnsrr_set, "delete", resolver) 

340 

341 log.info("Request to finalize the order (all challenges have been completed)") 

342 csr_der = _base64(_openssl("req", ["-in", config["acmednstiny"]["CSRFile"], 

343 "-outform", "DER"])) 

344 http_response, result = _send_signed_request(order["finalize"], {"csr": csr_der}) 

345 if http_response.status_code != 200: 

346 raise ValueError("Error while sending the CSR: {0} {1}" 

347 .format(http_response.status_code, result)) 

348 

349 while True: 

350 http_response, order = _send_signed_request(order_location, "") 

351 

352 if order["status"] == "processing": 

353 try: 

354 time.sleep(float(http_response.headers["Retry-After"])) 

355 except (OverflowError, ValueError, TypeError): 

356 time.sleep(2) 

357 elif order["status"] == "valid": 

358 log.info("Order finalized!") 

359 break 

360 else: 

361 raise ValueError("Finalizing order {0} got errors: {1}".format( 

362 order_location, order)) 

363 

364 http_response, result = _send_signed_request( 

365 order["certificate"], "", 

366 {'Accept': config["acmednstiny"].get("CertificateFormat", 

367 'application/pem-certificate-chain')}) 

368 if http_response.status_code != 200: 

369 raise ValueError("Finalizing order {0} got errors: {1}" 

370 .format(http_response.status_code, result)) 

371 

372 if 'link' in http_response.headers: 

373 log.info(" - Certificate links given by server: %s", http_response.headers['link']) 

374 

375 log.info("Certificate signed and chain received: %s", order["certificate"]) 

376 return http_response.text 

377 

378 

379def main(argv): 

380 """Parse arguments and get certificate.""" 

381 parser = argparse.ArgumentParser( 

382 formatter_class=argparse.RawDescriptionHelpFormatter, 

383 description="Tiny ACME client to get TLS certificate by responding to DNS challenges.", 

384 epilog="""As the script requires access to your private ACME account key and dns server, 

385so PLEASE READ THROUGH IT (it won't take too long, it's a one-file script) ! 

386 

387Example: requests certificate chain and store it in chain.crt 

388 python3 acme_dns_tiny.py ./example.ini > chain.crt 

389 

390See example.ini file to configure correctly this script.""" 

391 ) 

392 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, 

393 help="show only errors on stderr") 

394 parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, 

395 help="show all debug informations on stderr") 

396 parser.add_argument("--csr", 

397 help="specifies CSR file path to use instead of the CSRFile option \ 

398from the configuration file.") 

399 parser.add_argument("configfile", help="path to your configuration file") 

400 args = parser.parse_args(argv) 

401 

402 config = configparser.ConfigParser() 

403 config.read_dict({ 

404 "acmednstiny": { 

405 "ACMEDirectory": "https://acme-staging-v02.api.letsencrypt.org/directory", 

406 "Language": "en", "Contacts": "", "Timeout": 10}, 

407 "DNS": {"NameServer": "", "TTL": 10, "Timeout": 10}}) 

408 config.read(args.configfile) 

409 

410 if args.csr: 

411 config.set("acmednstiny", "csrfile", args.csr) 

412 

413 if (set(["accountkeyfile", "csrfile", "acmedirectory"]) - set(config.options("acmednstiny")) 

414 or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring"))): 

415 raise ValueError("Some required settings are missing.") 

416 

417 LOGGER.setLevel(args.verbose or args.quiet or logging.INFO) 

418 signed_crt = get_crt(config, LOGGER) 

419 sys.stdout.write(signed_crt) 

420 

421 

422if __name__ == "__main__": # pragma: no cover 

423 main(sys.argv[1:])