1#!/usr/bin/env python3 

2# pylint: disable=multiple-imports 

3"""ACME client to met DNS challenge and receive TLS certificate""" 

4import argparse, base64, binascii, configparser, copy, hashlib, ipaddress, json, logging 

5import re, sys, subprocess, time 

6import requests 

7import dns.exception, dns.query, dns.name, dns.resolver, dns.rrset, dns.tsigkeyring, dns.update 

8 

9LOGGER = logging.getLogger('acme_dns_tiny') 

10LOGGER.addHandler(logging.StreamHandler()) 

11 

12 

13def _base64(text): 

14 """Encodes string as base64 as specified in the ACME RFC.""" 

15 return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") 

16 

17 

18def _openssl(command, options, communicate=None): 

19 """Run openssl command line and raise IOError on non-zero return.""" 

20 openssl = subprocess.Popen(["openssl", command] + options, 

21 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 

22 stderr=subprocess.PIPE) 

23 out, err = openssl.communicate(communicate) 

24 if openssl.returncode != 0: 

25 raise IOError("OpenSSL Error: {0}".format(err)) 

26 return out 

27 

28 

29# pylint: disable=too-many-locals,too-many-branches,too-many-statements 

30def get_crt(config, log=LOGGER): 

31 """Get ACME certificate by resolving DNS challenge.""" 

32 

33 def _update_dns(rrset, action, resolver): 

34 """Updates DNS resource by adding or deleting resource.""" 

35 algorithm = dns.name.from_text("{0}".format(config["TSIGKeyring"]["Algorithm"].lower())) 

36 dns_update = dns.update.Update(config["DNS"]["zone"], 

37 keyring=private_keyring, keyalgorithm=algorithm) 

38 if action == "add": 

39 dns_update.add(rrset.name, rrset) 

40 elif action == "delete": 

41 dns_update.delete(rrset.name, rrset) 

42 # Try each IP address found for the configured DNS Host to apply the DNS resource update 

43 response = None 

44 for nameserver in resolver.nameservers: 

45 try: 

46 response = dns.query.tcp(dns_update, nameserver, 

47 port=config.getint("DNS", "Port")) 

48 # pylint: disable=broad-except 

49 except Exception as exception: 

50 log.debug("Unable to %s DNS resource on server with IP %s, try again with " 

51 "next available IP. Error detail: %s", action, nameserver, exception) 

52 response = None 

53 if response is not None: 

54 break 

55 if response is None: 

56 raise RuntimeError("Unable to {0} DNS resource to {1}".format(action, rrset.name)) 

57 return response 

58 

59 def _send_signed_request(url, payload, extra_headers=None): 

60 """Sends signed requests to ACME server.""" 

61 nonlocal nonce 

62 if payload == "": # on POST-as-GET, final payload has to be just empty string 

63 payload64 = "" 

64 else: 

65 payload64 = _base64(json.dumps(payload).encode("utf8")) 

66 protected = copy.deepcopy(private_acme_signature) 

67 protected["nonce"] = nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] 

68 del nonce 

69 protected["url"] = url 

70 if url == acme_config["newAccount"]: 

71 if "kid" in protected: 

72 del protected["kid"] 

73 else: 

74 del protected["jwk"] 

75 protected64 = _base64(json.dumps(protected).encode("utf8")) 

76 signature = _openssl("dgst", ["-sha256", "-sign", config["acmednstiny"]["AccountKeyFile"]], 

77 "{0}.{1}".format(protected64, payload64).encode("utf8")) 

78 jose = { 

79 "protected": protected64, "payload": payload64, "signature": _base64(signature) 

80 } 

81 joseheaders = {'Content-Type': 'application/jose+json'} 

82 joseheaders.update(adtheaders) 

83 joseheaders.update(extra_headers or {}) 

84 try: 

85 response = requests.post(url, json=jose, headers=joseheaders) 

86 except requests.exceptions.RequestException as error: 

87 response = error.response 

88 if response: 

89 nonce = response.headers['Replay-Nonce'] 

90 try: 

91 return response, response.json() 

92 except ValueError: # if body is empty or not JSON formatted 

93 return response, json.loads("{}") 

94 else: 

95 raise RuntimeError("Unable to get response from ACME server.") 

96 

97 # main code 

98 adtheaders = {'User-Agent': 'acme-dns-tiny/2.4', 

99 'Accept-Language': config["acmednstiny"].get("Language", "en")} 

100 nonce = None 

101 

102 log.info("Find domains to validate from the Certificate Signing Request (CSR) file.") 

103 csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], 

104 "-noout", "-text"]).decode("utf8") 

105 domains = set() 

106 common_name = re.search(r"Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)", csr) 

107 if common_name is not None: 

108 domains.add(common_name.group(1)) 

109 subject_alt_names = re.search( 

110 r"X509v3 Subject Alternative Name: (?:critical)?\s+([^\r\n]+)\r?\n", 

111 csr, re.MULTILINE) 

112 if subject_alt_names is not None: 

113 for san in subject_alt_names.group(1).split(", "): 

114 if san.startswith("DNS:"): 

115 domains.add(san[4:]) 

116 if len(domains) == 0: # pylint: disable=len-as-condition 

117 raise ValueError("Didn't find any domain to validate in the provided CSR.") 

118 

119 log.info("Configure DNS client tools.") 

120 # That keyring is used to authenticate with the DNS server, it needs to be safely kept 

121 private_keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: 

122 config["TSIGKeyring"]["KeyValue"]}) 

123 resolver = dns.resolver.Resolver(configure=False) 

124 resolver.retry_servfail = True 

125 nameserver = [] 

126 try: 

127 ipaddress.ip_address(config["DNS"]["Host"]) 

128 nameserver.append(config["DNS"]["Host"]) 

129 except ValueError: 

130 log.debug(" - Configured DNS Host value is not a valid IP address, " 

131 "try to resolve IP address by requesting system DNS servers.") 

132 try: 

133 nameserver += [ipv6_rrset.to_text() for ipv6_rrset 

134 in dns.resolver.query(config["DNS"]["Host"], rdtype="AAAA")] 

135 except dns.exception.DNSException: 

136 log.debug((" - IPv6 addresses not found for the configured DNS Host.")) 

137 try: 

138 nameserver += [ipv4_rrset.to_text() for ipv4_rrset 

139 in dns.resolver.query(config["DNS"]["Host"], rdtype="A")] 

140 except dns.exception.DNSException: 

141 log.debug(" - IPv4 addresses not found for the configured DNS Host.") 

142 if not nameserver: 

143 raise ValueError("Unable to resolve any IP address for the configured DNS Host name") 

144 resolver.nameservers = nameserver 

145 

146 log.info("Get private signature from account key.") 

147 accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], 

148 "-noout", "-text"]) 

149 signature_search = re.search(r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", 

150 accountkey.decode("utf8"), re.MULTILINE) 

151 if signature_search is None: 

152 raise ValueError("Unable to retrieve private signature.") 

153 pub_hex, pub_exp = signature_search.groups() 

154 pub_exp = "{0:x}".format(int(pub_exp)) 

155 pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp 

156 # That signature is used to authenticate with the ACME server, it needs to be safely kept 

157 private_acme_signature = { 

158 "alg": "RS256", 

159 "jwk": { 

160 "e": _base64(binascii.unhexlify(pub_exp.encode("utf-8"))), 

161 "kty": "RSA", 

162 "n": _base64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), 

163 }, 

164 } 

165 private_jwk = json.dumps(private_acme_signature["jwk"], sort_keys=True, separators=(",", ":")) 

166 jwk_thumbprint = _base64(hashlib.sha256(private_jwk.encode("utf8")).digest()) 

167 

168 log.info("Fetch ACME server configuration from the its directory URL.") 

169 acme_config = requests.get(config["acmednstiny"]["ACMEDirectory"], headers=adtheaders).json() 

170 terms_service = acme_config.get("meta", {}).get("termsOfService", "") 

171 

172 log.info("Register ACME Account to get the account identifier.") 

173 account_request = {} 

174 if terms_service: 

175 account_request["termsOfServiceAgreed"] = True 

176 log.warning(("Terms of service exist and will be automatically agreed if possible, " 

177 "you should read them: %s"), terms_service) 

178 account_request["contact"] = config["acmednstiny"].get("Contacts", "").split(';') 

179 if account_request["contact"] == [""]: 

180 del account_request["contact"] 

181 

182 http_response, account_info = _send_signed_request(acme_config["newAccount"], account_request) 

183 if http_response.status_code == 201: 

184 private_acme_signature["kid"] = http_response.headers['Location'] 

185 log.info(" - Registered a new account: '%s'", private_acme_signature["kid"]) 

186 elif http_response.status_code == 200: 

187 private_acme_signature["kid"] = http_response.headers['Location'] 

188 log.debug(" - Account is already registered: '%s'", private_acme_signature["kid"]) 

189 

190 http_response, account_info = _send_signed_request(private_acme_signature["kid"], "") 

191 else: 

192 raise ValueError("Error registering account: {0} {1}" 

193 .format(http_response.status_code, account_info)) 

194 

195 log.info("Update contact information if needed.") 

196 if ("contact" in account_request 

197 and set(account_request["contact"]) != set(account_info["contact"])): 

198 http_response, result = _send_signed_request(private_acme_signature["kid"], 

199 account_request) 

200 if http_response.status_code == 200: 

201 log.debug(" - Account updated with latest contact informations.") 

202 else: 

203 raise ValueError("Error registering updates for the account: {0} {1}" 

204 .format(http_response.status_code, result)) 

205 

206 # new order 

207 log.info("Request to the ACME server an order to validate domains.") 

208 new_order = {"identifiers": [{"type": "dns", "value": domain} for domain in domains]} 

209 http_response, order = _send_signed_request(acme_config["newOrder"], new_order) 

210 if http_response.status_code == 201: 

211 order_location = http_response.headers['Location'] 

212 log.debug(" - Order received: %s", order_location) 

213 if order["status"] != "pending" and order["status"] != "ready": 

214 raise ValueError("Order status is neither pending neither ready, we can't use it: {0}" 

215 .format(order)) 

216 elif (http_response.status_code == 403 

217 and order["type"] == "urn:ietf:params:acme:error:userActionRequired"): 

218 raise ValueError(("Order creation failed ({0}). Read Terms of Service ({1}), then follow " 

219 "your CA instructions: {2}") 

220 .format(order["detail"], 

221 http_response.headers['Link'], order["instance"])) 

222 else: 

223 raise ValueError("Error getting new Order: {0} {1}" 

224 .format(http_response.status_code, order)) 

225 

226 # complete each authorization challenge 

227 for authz in order["authorizations"]: 

228 if order["status"] == "ready": 

229 log.info("No challenge to process: order is already ready.") 

230 break 

231 

232 log.info("Process challenge for authorization: %s", authz) 

233 # get new challenge 

234 http_response, authorization = _send_signed_request(authz, "") 

235 if http_response.status_code != 200: 

236 raise ValueError("Error fetching challenges: {0} {1}" 

237 .format(http_response.status_code, authorization)) 

238 domain = authorization["identifier"]["value"] 

239 

240 if authorization["status"] == "valid": 

241 log.info("Skip authorization for domain %s: this is alreday validated", domain) 

242 continue 

243 if authorization["status"] != "pending": 

244 raise ValueError("Authorization for the domain {0} can't be validated: " 

245 "the authorization is {1}.".format(domain, authorization["status"])) 

246 

247 challenges = [c for c in authorization["challenges"] if c["type"] == "dns-01"] 

248 if not challenges: 

249 raise ValueError("Unable to find a DNS challenge to resolve for domain {0}" 

250 .format(domain)) 

251 log.info("Install DNS TXT resource for domain: %s", domain) 

252 challenge = challenges[0] 

253 keyauthorization = challenge["token"] + "." + jwk_thumbprint 

254 keydigest64 = _base64(hashlib.sha256(keyauthorization.encode("utf8")).digest()) 

255 dnsrr_domain = "_acme-challenge.{0}.".format(domain) 

256 try: # a CNAME resource can be used for advanced TSIG configuration 

257 # Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed) 

258 dnsrr_domain = [response.to_text() for response 

259 in resolver.query(dnsrr_domain, rdtype="CNAME")][0] 

260 log.info(" - A CNAME resource has been found for this domain, will install TXT on %s", 

261 dnsrr_domain) 

262 except dns.exception.DNSException as dnsexception: 

263 log.debug((" - Not any CNAME resource has been found for this domain (%s), will " 

264 "install TXT directly on %s"), type(dnsexception).__name__, dnsrr_domain) 

265 dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"), 

266 "IN", "TXT", '"{0}"'.format(keydigest64)) 

267 try: 

268 _update_dns(dnsrr_set, "add", resolver) 

269 except dns.exception.DNSException as exception: 

270 raise ValueError("Error updating DNS records: {0} : {1}" 

271 .format(type(exception).__name__, str(exception))) from exception 

272 

273 log.info("Wait for 1 TTL (%s seconds) to ensure DNS cache is cleared.", 

274 config["DNS"].getint("TTL")) 

275 time.sleep(config["DNS"].getint("TTL")) 

276 challenge_verified = False 

277 number_check_fail = 1 

278 while challenge_verified is False: 

279 try: 

280 log.debug(('Self test (try: %s): Check resource with value "%s" exits on ' 

281 'nameservers: %s'), number_check_fail, keydigest64, 

282 resolver.nameservers) 

283 for response in resolver.query(dnsrr_domain, rdtype="TXT").rrset: 

284 log.debug(" - Found value %s", response.to_text()) 

285 challenge_verified = (challenge_verified 

286 or response.to_text() == '"{0}"'.format(keydigest64)) 

287 except dns.exception.DNSException as dnsexception: 

288 log.debug( 

289 " - Will retry as a DNS error occurred while checking challenge: %s : %s", 

290 type(dnsexception).__name__, dnsexception) 

291 finally: 

292 if challenge_verified is False: 

293 if number_check_fail >= 10: 

294 raise ValueError("Error checking challenge, value not found: {0}" 

295 .format(keydigest64)) 

296 number_check_fail = number_check_fail + 1 

297 time.sleep(config["DNS"].getint("TTL")) 

298 

299 log.info("Asking ACME server to validate challenge.") 

300 http_response, result = _send_signed_request(challenge["url"], {}) 

301 if http_response.status_code != 200: 

302 raise ValueError("Error triggering challenge: {0} {1}" 

303 .format(http_response.status_code, result)) 

304 try: 

305 while True: 

306 http_response, challenge_status = _send_signed_request(challenge["url"], "") 

307 if http_response.status_code != 200: 

308 raise ValueError("Error during challenge validation: {0} {1}".format( 

309 http_response.status_code, challenge_status)) 

310 if challenge_status["status"] == "pending": 

311 time.sleep(2) 

312 elif challenge_status["status"] == "valid": 

313 log.info("ACME has verified challenge for domain: %s", domain) 

314 break 

315 else: 

316 raise ValueError("Challenge for domain {0} did not pass: {1}".format( 

317 domain, challenge_status)) 

318 finally: 

319 _update_dns(dnsrr_set, "delete", resolver) 

320 

321 log.info("Request to finalize the order (all chalenge have been completed)") 

322 csr_der = _base64(_openssl("req", ["-in", config["acmednstiny"]["CSRFile"], 

323 "-outform", "DER"])) 

324 http_response, result = _send_signed_request(order["finalize"], {"csr": csr_der}) 

325 if http_response.status_code != 200: 

326 raise ValueError("Error while sending the CSR: {0} {1}" 

327 .format(http_response.status_code, result)) 

328 

329 while True: 

330 http_response, order = _send_signed_request(order_location, "") 

331 

332 if order["status"] == "processing": 

333 try: 

334 time.sleep(float(http_response.headers["Retry-After"])) 

335 except (OverflowError, ValueError, TypeError): 

336 time.sleep(2) 

337 elif order["status"] == "valid": 

338 log.info("Order finalized!") 

339 break 

340 else: 

341 raise ValueError("Finalizing order {0} got errors: {1}".format( 

342 order_location, order)) 

343 

344 http_response, result = _send_signed_request( 

345 order["certificate"], "", 

346 {'Accept': config["acmednstiny"].get("CertificateFormat", 

347 'application/pem-certificate-chain')}) 

348 if http_response.status_code != 200: 

349 raise ValueError("Finalizing order {0} got errors: {1}" 

350 .format(http_response.status_code, result)) 

351 

352 if 'link' in http_response.headers: 

353 log.info(" - Certificate links given by server: %s", http_response.headers['link']) 

354 

355 log.info("Certificate signed and chain received: %s", order["certificate"]) 

356 return http_response.text 

357 

358 

359def main(argv): 

360 """Parse arguments and get certificate.""" 

361 parser = argparse.ArgumentParser( 

362 formatter_class=argparse.RawDescriptionHelpFormatter, 

363 description="Tiny ACME client to get TLS certificate by responding to DNS challenges.", 

364 epilog="""As the script requires access to your private ACME account key and dns server, 

365so PLEASE READ THROUGH IT (it won't take too long, it's a one-file script) ! 

366 

367Example: requests certificate chain and store it in chain.crt 

368 python3 acme_dns_tiny.py ./example.ini > chain.crt 

369 

370See example.ini file to configure correctly this script.""" 

371 ) 

372 parser.add_argument("--quiet", action="store_const", const=logging.ERROR, 

373 help="show only errors on stderr") 

374 parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, 

375 help="show all debug informations on stderr") 

376 parser.add_argument("--csr", 

377 help="specifies CSR file path to use instead of the CSRFile option \ 

378from the configuration file.") 

379 parser.add_argument("configfile", help="path to your configuration file") 

380 args = parser.parse_args(argv) 

381 

382 config = configparser.ConfigParser() 

383 config.read_dict({ 

384 "acmednstiny": {"ACMEDirectory": "https://acme-staging-v02.api.letsencrypt.org/directory"}, 

385 "DNS": {"Port": 53, "TTL": 10}}) 

386 config.read(args.configfile) 

387 

388 if args.csr: 

389 config.set("acmednstiny", "csrfile", args.csr) 

390 

391 if (set(["accountkeyfile", "csrfile", "acmedirectory"]) - set(config.options("acmednstiny")) 

392 or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring")) 

393 or set(["zone", "host", "port", "ttl"]) - set(config.options("DNS"))): 

394 raise ValueError("Some required settings are missing.") 

395 

396 LOGGER.setLevel(args.verbose or args.quiet or logging.INFO) 

397 signed_crt = get_crt(config, LOGGER) 

398 sys.stdout.write(signed_crt) 

399 

400 

401if __name__ == "__main__": # pragma: no cover 

402 main(sys.argv[1:])