File size: 36,211 Bytes
17e971c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 | import base64
import binascii
import hashlib
import json
import os
import fcntl
import re
import shutil
import socket
import subprocess
import sys
import time
import datetime
from pathlib import Path
import requests
APACHE_CONF_DIRS = [
"/www/server/panel/vhost/apache"
]
def is_ipv4(ip):
'''
@name 是否是IPV4地址
@author hwliang
@param ip<string> IP地址
@return True/False
'''
# 验证基本格式
if not re.match(r"^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$", ip):
return False
# 验证每个段是否在合理范围
try:
socket.inet_pton(socket.AF_INET, ip)
except AttributeError:
try:
socket.inet_aton(ip)
except socket.error:
return False
except socket.error:
return False
return True
def is_ipv6(ip):
'''
@name 是否为IPv6地址
@author hwliang
@param ip<string> 地址
@return True/False
'''
# 验证基本格式
if not re.match(r"^[\w:]+$", ip):
return False
# 验证IPv6地址
try:
socket.inet_pton(socket.AF_INET6, ip)
except socket.error:
return False
return True
def check_ip(ip):
return is_ipv4(ip) or is_ipv6(ip)
def find_apache_conf_files(keyword):
"""查找 Apache 主配置和 vhost 文件"""
files = set()
for base in APACHE_CONF_DIRS:
base_path = Path(base)
if not base_path.exists():
continue
for f in base_path.rglob("*.conf"):
with open(f, "r") as file:
content = file.read()
# 检查是否包含 ServerName 或 ServerAlias 指令
if keyword in content:
files.add(str(f))
return list(files)
def insert_location_into_vhost(file_path, keyword, verify_file):
LOCATION_BLOCK = [
" <Location /.well-known/acme-challenge/{}>\n".format(verify_file),
" Require all granted\n",
" Header set Content-Type \"text/plain\"\n",
" </Location>\n",
" Alias /.well-known/acme-challenge/{} /tmp/{}\n".format(verify_file, verify_file),
]
path = Path(file_path)
backup = path.with_suffix(path.suffix + ".bak")
shutil.copy(path, backup)
with open(path, "r") as f:
lines = f.readlines()
new_lines = []
in_vhost = False
hit_vhost = False
location_exists = False
for line in lines:
stripped = line.strip()
if stripped.lower().startswith("<virtualhost"):
in_vhost = True
hit_vhost = False
location_exists = False
if in_vhost:
low = stripped.lower()
if (low.startswith("servername") or low.startswith("serveralias")) and keyword in stripped:
hit_vhost = True
if "<location /.well-known/acme-challenge/>" in low:
location_exists = True
if stripped.lower() == "</virtualhost>":
if hit_vhost and not location_exists:
new_lines.extend(LOCATION_BLOCK)
in_vhost = False
new_lines.append(line)
with open(path, "w") as f:
f.writelines(new_lines)
return True
def find_nginx_files_by_servername(keyword):
"""通过 nginx -T 找到包含 server_name 的配置文件"""
result = subprocess.run(
["nginx", "-T"],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
text=True,
check=True
)
files = set()
current_file = None
for line in result.stdout.splitlines():
if line.startswith("# configuration file"):
current_file = line.split()[-1].rstrip(":")
if "server_name" in line and keyword in line:
if current_file:
files.add(current_file)
return list(files)
def insert_location_into_server(file_path, keyword, verify_file, verify_content):
path = Path(file_path)
backup = path.with_suffix(path.suffix + ".bak")
shutil.copy(path, backup)
with open(path, "r") as f:
lines = f.readlines()
new_lines = []
brace_level = 0
in_server = False
hit_server = False
location_exists = False
LOCATION_BLOCK = [
" location = /.well-known/acme-challenge/{} {{\n".format(verify_file),
" default_type text/plain;\n",
" return 200 \"{}\";\n".format(verify_content),
" }\n"
]
for i, line in enumerate(lines):
stripped = line.strip()
# server 开始
if stripped.startswith("server"):
in_server = True
hit_server = False
location_exists = False
if in_server:
brace_level += line.count("{")
brace_level -= line.count("}")
if "server_name" in line and keyword in line:
hit_server = True
if "location /.well-known/acme-challenge/" in line:
location_exists = True
# server 结束
if brace_level == 0:
if hit_server and not location_exists:
new_lines.extend(LOCATION_BLOCK)
in_server = False
new_lines.append(line)
with open(path, "w") as f:
f.writelines(new_lines)
return True
class AutoApplyIPSSL:
# 请求到ACME接口
def __init__(self):
self._wait_time = 5
self._max_check_num = 15
self._url = 'https://acme-v02.api.letsencrypt.org/directory'
self._bits = 2048
self._conf_file_v2 = '/www/server/panel/config/letsencrypt_v2.json'
self._apis = None
self._replay_nonce = None
self._config = self.read_config()
# 取接口目录
def get_apis(self):
if not self._apis:
# 尝试从配置文件中获取
api_index = "Production"
if not 'apis' in self._config:
self._config['apis'] = {}
if api_index in self._config['apis']:
if 'expires' in self._config['apis'][api_index] and 'directory' in self._config['apis'][api_index]:
if time.time() < self._config['apis'][api_index]['expires']:
self._apis = self._config['apis'][api_index]['directory']
return self._apis
# 尝试从云端获取
res = requests.get(self._url)
if not res.status_code in [200, 201]:
result = res.json()
if "type" in result:
if result['type'] == 'urn:acme:error:serverInternal':
raise Exception('服务因维护而关闭或发生内部错误,查看 <a href="https://letsencrypt.status.io/" target="_blank" class="btlink">https://letsencrypt.status.io/</a> 了解更多详细信息。')
raise Exception(res.content)
s_body = res.json()
self._apis = {}
self._apis['newAccount'] = s_body['newAccount']
self._apis['newNonce'] = s_body['newNonce']
self._apis['newOrder'] = s_body['newOrder']
self._apis['revokeCert'] = s_body['revokeCert']
self._apis['keyChange'] = s_body['keyChange']
# 保存到配置文件
self._config['apis'][api_index] = {}
self._config['apis'][api_index]['directory'] = self._apis
self._config['apis'][api_index]['expires'] = time.time() + \
86400 # 24小时后过期
self.save_config()
return self._apis
def acme_request(self, url, payload):
headers = {}
payload = self.stringfy_items(payload)
if payload == "":
payload64 = payload
else:
payload64 = self.calculate_safe_base64(json.dumps(payload))
protected = self.get_acme_header(url)
protected64 = self.calculate_safe_base64(json.dumps(protected))
signature = self.sign_message(
message="{0}.{1}".format(protected64, payload64)) # bytes
signature64 = self.calculate_safe_base64(signature) # str
data = json.dumps(
{"protected": protected64, "payload": payload64,
"signature": signature64}
)
headers.update({"Content-Type": "application/jose+json"})
response = requests.post(url, data=data.encode("utf8"), headers=headers)
# 更新随机数
self.update_replay_nonce(response)
return response
# 更新随机数
def update_replay_nonce(self, res):
replay_nonce = res.headers.get('Replay-Nonce')
if replay_nonce:
self._replay_nonce = replay_nonce
def stringfy_items(self, payload):
if isinstance(payload, str):
return payload
for k, v in payload.items():
if isinstance(k, bytes):
k = k.decode("utf-8")
if isinstance(v, bytes):
v = v.decode("utf-8")
payload[k] = v
return payload
# 转为无填充的Base64
def calculate_safe_base64(self, un_encoded_data):
if sys.version_info[0] == 3:
if isinstance(un_encoded_data, str):
un_encoded_data = un_encoded_data.encode("utf8")
r = base64.urlsafe_b64encode(un_encoded_data).rstrip(b"=")
return r.decode("utf8")
# 获请ACME请求头
def get_acme_header(self, url):
header = {"alg": "RS256", "nonce": self.get_nonce(), "url": url}
if url in [self._apis['newAccount'], 'GET_THUMBPRINT']:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
private_key = serialization.load_pem_private_key(
self.get_account_key().encode(),
password=None,
backend=default_backend(),
)
public_key_public_numbers = private_key.public_key().public_numbers()
exponent = "{0:x}".format(public_key_public_numbers.e)
exponent = "0{0}".format(exponent) if len(
exponent) % 2 else exponent
modulus = "{0:x}".format(public_key_public_numbers.n)
jwk = {
"kty": "RSA",
"e": self.calculate_safe_base64(binascii.unhexlify(exponent)),
"n": self.calculate_safe_base64(binascii.unhexlify(modulus)),
}
header["jwk"] = jwk
else:
header["kid"] = self.get_kid()
return header
def get_nonce(self, force=False):
# 如果没有保存上一次的随机数或force=True时则重新获取新的随机数
if not self._replay_nonce or force:
response = requests.get(
self._apis['newNonce'],
)
self._replay_nonce = response.headers["Replay-Nonce"]
return self._replay_nonce
def analysis_private_key(self, key_pem, password=None):
"""
解析私钥
:param key_pem: 私钥内容
:param password: 私钥密码
:return: 私钥对象
"""
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
private_key = serialization.load_pem_private_key(
key_pem.encode(),
password=password,
backend=default_backend()
)
return private_key
except:
return None
def sign_message(self, message):
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
pk = self.analysis_private_key(self.get_account_key())
return pk.sign(message.encode("utf8"), padding.PKCS1v15(), hashes.SHA256())
# 获用户取密钥对
def get_account_key(self):
if not 'account' in self._config:
self._config['account'] = {}
k = "Production"
if not k in self._config['account']:
self._config['account'][k] = {}
if not 'key' in self._config['account'][k]:
self._config['account'][k]['key'] = self.create_key()
if type(self._config['account'][k]['key']) == bytes:
self._config['account'][k]['key'] = self._config['account'][k]['key'].decode()
self.save_config()
return self._config['account'][k]['key']
def create_key(self, key_type='RSA'):
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, ec, ed25519
if key_type == 'RSA':
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=self._bits
)
elif key_type == 'EC':
private_key = ec.generate_private_key(ec.SECP256R1())
elif key_type == 'ED25519':
private_key = ed25519.Ed25519PrivateKey.generate()
else:
raise ValueError(f"Unsupported key type: {key_type}")
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return private_key_pem
def get_kid(self, force=False):
#如果配置文件中不存在kid或force = True时则重新注册新的acme帐户
if not 'account' in self._config:
self._config['account'] = {}
k = "Production"
if not k in self._config['account']:
self._config['account'][k] = {}
if not 'kid' in self._config['account'][k]:
self._config['account'][k]['kid'] = self.register()
self.save_config()
time.sleep(3)
self._config = self.read_config()
return self._config['account'][k]['kid']
# 读配置文件
def read_config(self):
if not os.path.exists(self._conf_file_v2):
self._config = {'orders': {}, 'account': {}, 'apis': {}, 'email': None}
self.save_config()
return self._config
with open(self._conf_file_v2, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH) # 加锁
tmp_config = f.read()
fcntl.flock(f, fcntl.LOCK_UN) # 解锁
f.close()
if not tmp_config:
return self._config
try:
self._config = json.loads(tmp_config)
except:
self.save_config()
return self._config
return self._config
# 写配置文件
def save_config(self):
fp = open(self._conf_file_v2, 'w+')
fcntl.flock(fp, fcntl.LOCK_EX) # 加锁
fp.write(json.dumps(self._config))
fcntl.flock(fp, fcntl.LOCK_UN) # 解锁
fp.close()
return True
# 注册acme帐户
def register(self, existing=False):
if not 'email' in self._config:
self._config['email'] = 'demo@bt.cn'
if existing:
payload = {"onlyReturnExisting": True}
elif self._config['email']:
payload = {
"termsOfServiceAgreed": True,
"contact": ["mailto:{0}".format(self._config['email'])],
}
else:
payload = {"termsOfServiceAgreed": True}
res = self.acme_request(url=self._apis['newAccount'], payload=payload)
if res.status_code not in [201, 200, 409]:
raise Exception("注册ACME帐户失败: {}".format(res.json()))
kid = res.headers["Location"]
return kid
def create_csr(self, ips):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import ipaddress
# 生成私钥
pk = self.create_key()
private_key = serialization.load_pem_private_key(pk, password=None)
# IP证书不需要CN
csr_builder = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([]))
# 添加 subjectAltName 扩展
alt_names = [x509.IPAddress(ipaddress.ip_address(ip)) for ip in ips]
csr_builder = csr_builder.add_extension(
x509.SubjectAlternativeName(alt_names),
critical=False
)
# 签署 CSR
csr = csr_builder.sign(private_key, hashes.SHA256())
# 返回 CSR (ASN1 格式)
return csr.public_bytes(serialization.Encoding.DER), pk
def apply_ip_ssl(self, ips, email, webroot=None, mode=None, path=None):
print("开始申请Let's Encrypt IP SSL证书...")
print("获取ACME接口目录...")
self.get_apis()
self._config['email'] = email
print("创建订单...")
order_data = self.create_order(ips)
if not order_data:
raise Exception("创建订单失败!")
print("订单创建成功")
print("进行域名验证...")
try:
self.get_and_set_authorizations(order_data, webroot, mode, ips)
except Exception as e:
raise Exception("域名验证失败!{}".format(e))
# 完成订单
print("创建csr...")
csr, private_key = self.create_csr(ips)
print("发送csr并完成订单...")
res = self.acme_request(order_data['finalize'], payload={
"csr": self.calculate_safe_base64(csr)
})
if res.status_code not in [200, 201]:
raise Exception("完成订单失败!{}".format(res.json()))
# 获取证书
print("获取证书...")
cert_url = res.json().get('certificate')
if not cert_url:
raise Exception("获取证书URL失败!")
cert_res = self.acme_request(cert_url, payload="")
if cert_res.status_code not in [200, 201]:
raise Exception("获取证书失败!{}".format(cert_res.json()))
print("证书获取成功!")
cert_pem = cert_res.content.decode()
# 保存证书和私钥
if not path:
path = "/www/server/panel/ssl"
if not os.path.exists(path):
os.makedirs(path)
cert_path = os.path.join(path, 'certificate.pem')
key_path = os.path.join(path, 'privateKey.pem')
with open(cert_path, 'w') as f:
f.write(cert_pem)
with open(key_path, 'w') as f:
f.write(private_key.decode())
return cert_path, key_path
def create_order(self, ips):
identifiers = []
for ip in ips:
identifiers.append({"type": "ip", "value": ip})
payload = {"identifiers": identifiers, "profile": "shortlived"}
print("创建订单,域名列表:{}".format(','.join(ips)))
res = self.acme_request(self._apis['newOrder'], payload)
if not res.status_code in [201,200]: # 如果创建失败
print("创建订单失败,尝试修复错误...")
e_body = res.json()
if 'type' in e_body:
# 如果随机数失效
if e_body['type'].find('error:badNonce') != -1:
print("随机数失效,重新获取随机数后重试...")
self.get_nonce(force=True)
res = self.acme_request(self._apis['newOrder'], payload)
# 如果帐户失效
if e_body['detail'].find('KeyID header contained an invalid account URL') != -1:
print("帐户失效,重新注册帐户后重试...")
k = "Production"
del(self._config['account'][k])
self.get_kid()
self.get_nonce(force=True)
res = self.acme_request(self._apis['newOrder'], payload)
if not res.status_code in [201,200]:
print(res.json())
return {}
return res.json()
# UTC时间转时间戳
def utc_to_time(self, utc_string):
try:
utc_string = utc_string.split('.')[0]
utc_date = datetime.datetime.strptime(
utc_string, "%Y-%m-%dT%H:%M:%S")
# 按北京时间返回
return int(time.mktime(utc_date.timetuple())) + (3600 * 8)
except:
return int(time.time() + 86400 * 7)
def get_keyauthorization(self, token):
acme_header_jwk_json = json.dumps(
self.get_acme_header("GET_THUMBPRINT")["jwk"], sort_keys=True, separators=(",", ":")
)
acme_thumbprint = self.calculate_safe_base64(
hashlib.sha256(acme_header_jwk_json.encode("utf8")).digest()
)
acme_keyauthorization = "{0}.{1}".format(token, acme_thumbprint)
base64_of_acme_keyauthorization = self.calculate_safe_base64(
hashlib.sha256(acme_keyauthorization.encode("utf8")).digest()
)
return acme_keyauthorization, base64_of_acme_keyauthorization
# 获取并设置验证信息
def get_and_set_authorizations(self, order_data, webroot=None, mode=None, ips=None):
import os
if 'authorizations' not in order_data:
raise Exception("订单数据异常,缺少验证信息!")
for auth_url in order_data['authorizations']:
res = self.acme_request(auth_url, payload="")
if not res.status_code in [200, 201]:
raise Exception("获取验证信息失败!{}".format(res.json()))
s_body = res.json()
if 'status' in s_body:
if s_body['status'] in ['invalid']:
raise Exception("无效订单,此订单当前为验证失败状态!")
if s_body['status'] in ['valid']: # 跳过无需验证的域名
continue
for challenge in s_body['challenges']:
if challenge['type'] == "http-01":
break
if challenge['type'] != "http-01":
raise Exception("未找到http-01验证方式,无法继续申请证书!")
# 检查是否需要验证
check_auth_data = self.check_auth_status(challenge['url'])
if check_auth_data.json()['status'] == 'invalid':
raise Exception('域名验证失败,请尝试重新申请!')
if check_auth_data.json()['status'] == 'valid':
continue
acme_keyauthorization, auth_value = self.get_keyauthorization(
challenge['token'])
print(challenge)
if mode:
if mode == 'standalone':
from http.server import HTTPServer, SimpleHTTPRequestHandler
import threading
import os
class ACMERequestHandler(SimpleHTTPRequestHandler):
def log_message(self, format, *args):
# 屏蔽默认的请求日志输出
return
def do_GET(self):
if self.path == '/.well-known/acme-challenge/{}'.format(challenge['token']):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(acme_keyauthorization.encode())
else:
self.send_response(404)
self.end_headers()
server_address = ('', 80)
httpd = HTTPServer(server_address, ACMERequestHandler)
def start_server():
httpd.serve_forever()
server_thread = threading.Thread(target=start_server)
server_thread.daemon = True
server_thread.start()
time.sleep(2) # 等待服务器启动
try:
# 通知ACME服务器进行验证
self.acme_request(challenge['url'], payload={"keyAuthorization": "{0}".format(acme_keyauthorization)})
self.check_auth_status(challenge['url'], [
'valid', 'invalid'])
finally:
httpd.shutdown()
server_thread.join()
elif mode == 'nginx':
tmp_path = '/www/server/panel/vhost/nginx/tmp_apply_ip_ssl.conf'
files = find_nginx_files_by_servername(ips[0])
if not files:
print("未找到相关Nginx配置文件,尝试创建临时配置文件...")
if not os.path.exists('/www/server/panel/vhost/nginx'):
raise Exception("未找到Nginx配置文件,且Nginx配置目录不存在!")
# 如果没有找到相关配置文件,则创建一个临时配置文件
with open(tmp_path, 'w') as f:
f.write("""server
{{
listen 80;
server_name {0};
location /.well-known/acme-challenge/{1} {{
default_type text/plain;
return 200 "{2}";
}}
}}
""".format(ips[0], challenge['token'], acme_keyauthorization))
try:
for file in files:
print("修改Nginx配置文件: {}".format(file))
insert_location_into_server(file, ips[0], verify_file=challenge['token'], verify_content=acme_keyauthorization)
# 重新加载Nginx配置
subprocess.run(["nginx", "-t"], check=True)
subprocess.run(["nginx", "-s", "reload"], check=True)
# 通知ACME服务器进行验证
self.acme_request(challenge['url'],
payload={"keyAuthorization": "{0}".format(acme_keyauthorization)})
self.check_auth_status(challenge['url'], [
'valid', 'invalid'])
finally:
for file in files:
print("恢复Nginx配置文件: {}".format(file))
# 恢复备份文件
backup_file = file + ".bak"
if os.path.exists(backup_file):
shutil.move(backup_file, file)
if not files:
print("删除临时Nginx配置文件...")
# 删除临时配置文件
os.remove(tmp_path)
# 重新加载Nginx配置
subprocess.run(["nginx", "-t"], check=True)
subprocess.run(["nginx", "-s", "reload"], check=True)
elif mode == 'apache':
tmp_path = '/www/server/panel/vhost/apache/tmp_apply_ip_ssl.conf'
files = find_apache_conf_files(ips[0])
if not files:
print("未找到相关Apache配置文件,尝试创建临时配置文件...")
if not os.path.exists('/www/server/panel/vhost/apache'):
raise Exception("未找到Apache配置文件,且Apache配置目录不存在!")
# 如果没有找到相关配置文件,则创建一个临时配置文件
with open(tmp_path, 'w') as f:
f.write("""<VirtualHost *:80>
ServerName {0}
<Location /.well-known/acme-challenge/{1}>
Require all granted
Header set Content-Type "text/plain"
</Location>
Alias /.well-known/acme-challenge/{1} /tmp/{1}
</VirtualHost>
""".format(ips[0], challenge['token']))
try:
for file in files:
print("修改Apache配置文件: {}".format(file))
insert_location_into_vhost(file, ips[0], verify_file=challenge['token'])
# 写入验证文件
with open('/tmp/{}'.format(challenge['token']), 'w') as f:
f.write(acme_keyauthorization)
# 重新加载Apache配置
subprocess.run(["/etc/init.d/httpd", "reload"], check=True)
# 通知ACME服务器进行验证
self.acme_request(challenge['url'],
payload={"keyAuthorization": "{0}".format(acme_keyauthorization)})
self.check_auth_status(challenge['url'], [
'valid', 'invalid'])
finally:
for file in files:
print("恢复Apache配置文件: {}".format(file))
# 恢复备份文件
backup_file = file + ".bak"
if os.path.exists(backup_file):
shutil.move(backup_file, file)
if not files:
print("删除临时Apache配置文件...")
# 删除临时配置文件
os.remove(tmp_path)
# 重新加载Apache配置
subprocess.run(["systemctl", "restart", "httpd"], check=True)
else:
# 使用webroot方式验证
challenge_path = os.path.join(
webroot, '.well-known', 'acme-challenge')
if not os.path.exists(challenge_path):
os.makedirs(challenge_path)
file_path = os.path.join(challenge_path, challenge['token'])
with open(file_path, 'w') as f:
f.write(acme_keyauthorization)
try:
# 通知ACME服务器进行验证
self.acme_request(challenge['url'], payload={"keyAuthorization": "{0}".format(acme_keyauthorization)})
self.check_auth_status(challenge['url'], [
'valid', 'invalid'])
finally:
os.remove(file_path)
# 检查验证状态
def check_auth_status(self, url, desired_status=None):
desired_status = desired_status or ["pending", "valid", "invalid"]
number_of_checks = 0
authorization_status = "pending"
while True:
print("|-第{}次查询验证结果..".format(number_of_checks + 1))
if desired_status == ['valid', 'invalid']:
time.sleep(self._wait_time)
check_authorization_status_response = self.acme_request(url, "")
a_auth = check_authorization_status_response.json()
if not isinstance(a_auth, dict):
continue
authorization_status = a_auth["status"]
number_of_checks += 1
if authorization_status in desired_status:
if authorization_status == "invalid":
try:
if 'error' in a_auth['challenges'][0]:
ret_title = a_auth['challenges'][0]['error']['detail']
elif 'error' in a_auth['challenges'][1]:
ret_title = a_auth['challenges'][1]['error']['detail']
elif 'error' in a_auth['challenges'][2]:
ret_title = a_auth['challenges'][2]['error']['detail']
else:
ret_title = str(a_auth)
except:
ret_title = str(a_auth)
raise StopIteration(
"{0} >>>> {1}".format(
ret_title,
json.dumps(a_auth)
)
)
break
if number_of_checks == self._max_check_num:
raise StopIteration(
"错误:已尝试验证{0}次. 最大验证次数为{1}. 验证时间间隔为{2}秒.".format(
number_of_checks,
self._max_check_num,
self._wait_time
)
)
print("|-验证结果: {}".format(authorization_status))
return check_authorization_status_response
if __name__ == '__main__':
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description='自动申请IP SSL证书脚本')
parser.add_argument('-ips', type=str, required=True, help='要申请SSL证书的ip地址', dest='ips')
parser.add_argument('-email', type=str, required=False, help='用于申请SSL证书的邮箱', dest='email')
parser.add_argument('-w', type=str, help='网站根目录', dest='webroot')
parser.add_argument('--standalone', help='使用独立模式申请证书', dest='standalone', action='store_true')
parser.add_argument('--nginx', help='使用nginx模式申请证书', dest='nginx', action='store_true')
parser.add_argument('--apache', help='使用apache模式申请证书', dest='apache', action='store_true')
parser.add_argument('-path', type=str, help='证书保存路径', dest='path')
args = parser.parse_args()
if not args.standalone and not args.webroot and not args.nginx and not args.apache:
print("未检测到任何验证模式,将尝试自动选择验证模式!")
# 自动选择验证模式
# 判断80端口是否被占用
use_80 = False
result = subprocess.run(
["lsof", "-i:80"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if result.stdout:
result = subprocess.run(
["netstat -tuln | grep ':80 '"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True
)
if result.stdout:
use_80 = True
if use_80:
print("检测到80端口被占用,尝试使用Nginx或Apache模式进行验证...")
# 检查是否安装Nginx
if os.path.exists('/www/server/nginx/sbin/nginx'):
args.nginx = True
print("选择Nginx模式进行验证...")
elif os.path.exists('/www/server/apache/bin/httpd'):
args.apache = True
print("选择Apache模式进行验证...")
else:
print("[ERROR] 未检测到Nginx或Apache安装,无法使用Nginx或Apache模式进行验证!请释放80端口后重试。")
exit(1)
else:
args.standalone = True
print("80端口未被占用,选择独立模式进行验证...")
if not args.email:
# 使用默认邮箱
email = "demo@bt.cn"
else:
email = args.email
ips = args.ips.split(',')
# 先只支持单个IP申请
if len(ips) > 1 and not args.standalone:
print("[ERROR] 非独立模式下暂不支持多个IP申请SSL证书!")
exit(1)
# 先只支持IPv4
if not is_ipv4(ips[0]):
print("[ERROR] 目前仅支持IPv4地址申请SSL证书!")
exit(1)
auto_ssl = AutoApplyIPSSL()
mode = None
if args.standalone:
mode = 'standalone'
elif args.nginx:
mode = 'nginx'
elif args.apache:
mode = 'apache'
try:
cert_path, key_path = auto_ssl.apply_ip_ssl(ips, email, webroot=args.webroot, mode=mode, path=args.path)
except:
import traceback
print(f"[ERROR] 证书申请失败!错误信息: {traceback.format_exc()}")
exit(1)
exit(0)
|