[GH-ISSUE #88] [Feature] 希望支持多吉云和宝塔部署 #72

Closed
opened 2026-03-03 00:31:14 +03:00 by kerem · 1 comment
Owner

Originally created by @fcwys on GitHub (Sep 24, 2024).
Original GitHub issue: https://github.com/certimate-go/certimate/issues/88

功能描述
希望支持多吉云和宝塔部署

动机
多吉云CDN和宝塔使用范围比较广泛,多吉云官方有支持的API,宝塔也有对应API可用。

替代方案

其他信息

# -*- coding: utf-8 -*-
import hashlib
import time
import requests
import prettytable


# 宝塔面板操作类
class BtPanel:
    __BTURL = ''
    __APIKEY = ''
    __REQ = requests.session()

    # 初始化宝塔面板
    def __init__(self, host: str, apisk: str):
        '''
        :param host: 宝塔面板地址(末尾不加/)
        :param apisk: 宝塔面板API密钥
        '''
        self.__BTURL = host
        self.__APIKEY = apisk

    # 计算MD5
    def __GetMD5(self, s: str):
        '''
        计算字符串的MD5值
        :param s: 待计算的字符串
        :return: MD5值
        '''
        m = hashlib.md5()
        m.update(s.encode('utf-8'))
        return m.hexdigest()

    # 签名计算
    def __GetToken(self):
        request_time = int(time.time())    # 获取请求时间戳
        request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)),   # 生成请求签名
        return {'request_time': request_time, 'request_token': request_token}

    # 获取站点列表
    def GetSites(self, showlog=True):
        '''
        获取宝塔面板站点列表
        :showlog: 是否输出结果
        :return: 站点列表数据
        '''
        if showlog:
            print('\n### 获取站点列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'p': 1,
            'limit': 100,
            'order': 'id'
        }
        res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取站点列表失败:', res['msg'])
            return False
        # 使用prettytable输出站点列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态']
            tb.align['网站名'] = 'l'
            tb.align['备注'] = 'l'
            tb.align['SSL域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            for site in res['data']:
                site_status = '运行' if site['status'] == '1' else '停止'
                if site['ssl'] == -1:
                    sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'}
                else:
                    sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']}
                tb.add_row([site['name'], site['project_type'], site['ps'], sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status])
            print(tb)
        return res['data']

    # 设置站点SSL证书
    def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str):
        '''
        设置站点SSL证书
        :param site_name: 站点名称(域名)
        :param ssl_cert_content: ssl证书内容
        :param ssl_key_content: ssl私钥内容
        :return: 设置结果
        '''
        print('\n### 设置站点SSL证书...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'type': 0,
            'siteName': site_name,
            'key': ssl_key_content,
            'csr': ssl_cert_content
        }
        res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 设置站点SSL证书失败:', res['msg'])
            return False
        print('>>>', res['msg'])    # 输出结果
        return res['status']

    # 获取证书夹列表
    def GetCertList(self, showlog=True):
        '''
        获取证书夹列表
        :showlog: 是否输出结果
        :return: 证书夹列表数据
        '''
        if showlog:
            print('\n### 获取证书夹列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'force_refresh': 0    # 0:获取本地证书 1:获取云端证书
        }
        res = self.__REQ.post(url=self.__BTURL + '/ssl?action=get_cert_list', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取证书夹列表失败:', res['msg'])
            return False
        # 使用prettytable输出证书夹列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名']
            tb.align['域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            tb.align['可选域名'] = 'l'
            for cert in res:
                tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']])
            print(tb)
        return res

    # 删除过期SSL证书
    def DelExpiredSSL(self):
        '''
        删除过期SSL证书
        :showlog: 是否输出结果
        :return: 删除结果
        '''
        print('\n### 删除过期SSL证书...')
        certs = self.GetCertList(showlog=False)    # 获取证书列表
        if not certs:
            return False
        expiredcerts = []    # 过期证书列表
        for cert in certs:
            if cert['endtime'] < 0:    # 证书已过期
                expiredcerts.append(cert['subject'])    # 加入过期证书列表
                tk = self.__GetToken()    # 获取签名
                playload = {
                    'request_time': tk['request_time'],
                    'request_token': tk['request_token'],
                    'local': 1,
                    'ssl_hash': cert['hash']
                }
                res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json()
                # 判断请求是否成功
                if 'status' in res and 'msg' in res and not res['status']:
                    print('>>> 删除过期SSL证书失败:', res['msg'])
                    return False
                print('>>> 过期证书', cert['subject'], res['msg'])
                return res['status']
        if len(expiredcerts) == 0:
            print('>>> 证书夹内未发现过期证书')
        return expiredcerts

    # 重启Nginx
    def RestartNginx(self):
        '''
        重启Nginx
        :return: 重启结果
        '''
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'name': 'nginx',
            'type': 'restart'
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 重启Nginx失败:', res['msg'])
            return False
        return res.json()['status']

    # 获取系统基本信息
    def GetSystemInfo(self, showlog=True):
        '''
        获取系统基本信息
        :showlog: 是否输出结果
        :return: 系统基本信息
        '''
        if showlog:
            print('\n### 获取系统基本信息...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token']
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json()
        # 判断请求是否成功
        # 判断res是否存在status字段,若不存在则说明请求失败
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取系统基本信息失败:', res['msg'])
            return False
        res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100    # 内存使用率
        if showlog:
            print('>>> 面板地址:', self.__BTURL)
            print('>>> 操作系统:', res['system'])
            print('>>> 面板版本:', res['version'])
            print('>>> 运行时间:', res['time'])
            print('>>> CPU使用率:', res['cpuRealUsed'], '%')
            print('>>> 内存使用率:', res['memUtilization'], '%')
            print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB')
        return res
Originally created by @fcwys on GitHub (Sep 24, 2024). Original GitHub issue: https://github.com/certimate-go/certimate/issues/88 **功能描述** 希望支持多吉云和宝塔部署 **动机** 多吉云CDN和宝塔使用范围比较广泛,多吉云官方有支持的API,宝塔也有对应API可用。 **替代方案** 无 **其他信息** - 多吉云API:https://docs.dogecloud.com/cdn/api-cert-upload - 附:个人实现的Python版本宝塔获取站点列表、上传证书、重启Nginx代码,希望能帮上忙: ```python # -*- coding: utf-8 -*- import hashlib import time import requests import prettytable # 宝塔面板操作类 class BtPanel: __BTURL = '' __APIKEY = '' __REQ = requests.session() # 初始化宝塔面板 def __init__(self, host: str, apisk: str): ''' :param host: 宝塔面板地址(末尾不加/) :param apisk: 宝塔面板API密钥 ''' self.__BTURL = host self.__APIKEY = apisk # 计算MD5 def __GetMD5(self, s: str): ''' 计算字符串的MD5值 :param s: 待计算的字符串 :return: MD5值 ''' m = hashlib.md5() m.update(s.encode('utf-8')) return m.hexdigest() # 签名计算 def __GetToken(self): request_time = int(time.time()) # 获取请求时间戳 request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)), # 生成请求签名 return {'request_time': request_time, 'request_token': request_token} # 获取站点列表 def GetSites(self, showlog=True): ''' 获取宝塔面板站点列表 :showlog: 是否输出结果 :return: 站点列表数据 ''' if showlog: print('\n### 获取站点列表...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'p': 1, 'limit': 100, 'order': 'id' } res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 获取站点列表失败:', res['msg']) return False # 使用prettytable输出站点列表 if showlog: tb = prettytable.PrettyTable() tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态'] tb.align['网站名'] = 'l' tb.align['备注'] = 'l' tb.align['SSL域名'] = 'l' tb.align['证书品牌'] = 'l' for site in res['data']: site_status = '运行' if site['status'] == '1' else '停止' if site['ssl'] == -1: sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'} else: sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']} tb.add_row([site['name'], site['project_type'], site['ps'], sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status]) print(tb) return res['data'] # 设置站点SSL证书 def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str): ''' 设置站点SSL证书 :param site_name: 站点名称(域名) :param ssl_cert_content: ssl证书内容 :param ssl_key_content: ssl私钥内容 :return: 设置结果 ''' print('\n### 设置站点SSL证书...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'type': 0, 'siteName': site_name, 'key': ssl_key_content, 'csr': ssl_cert_content } res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 设置站点SSL证书失败:', res['msg']) return False print('>>>', res['msg']) # 输出结果 return res['status'] # 获取证书夹列表 def GetCertList(self, showlog=True): ''' 获取证书夹列表 :showlog: 是否输出结果 :return: 证书夹列表数据 ''' if showlog: print('\n### 获取证书夹列表...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'force_refresh': 0 # 0:获取本地证书 1:获取云端证书 } res = self.__REQ.post(url=self.__BTURL + '/ssl?action=get_cert_list', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 获取证书夹列表失败:', res['msg']) return False # 使用prettytable输出证书夹列表 if showlog: tb = prettytable.PrettyTable() tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名'] tb.align['域名'] = 'l' tb.align['证书品牌'] = 'l' tb.align['可选域名'] = 'l' for cert in res: tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']]) print(tb) return res # 删除过期SSL证书 def DelExpiredSSL(self): ''' 删除过期SSL证书 :showlog: 是否输出结果 :return: 删除结果 ''' print('\n### 删除过期SSL证书...') certs = self.GetCertList(showlog=False) # 获取证书列表 if not certs: return False expiredcerts = [] # 过期证书列表 for cert in certs: if cert['endtime'] < 0: # 证书已过期 expiredcerts.append(cert['subject']) # 加入过期证书列表 tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'local': 1, 'ssl_hash': cert['hash'] } res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 删除过期SSL证书失败:', res['msg']) return False print('>>> 过期证书', cert['subject'], res['msg']) return res['status'] if len(expiredcerts) == 0: print('>>> 证书夹内未发现过期证书') return expiredcerts # 重启Nginx def RestartNginx(self): ''' 重启Nginx :return: 重启结果 ''' tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'], 'name': 'nginx', 'type': 'restart' } res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json() # 判断请求是否成功 if 'status' in res and 'msg' in res and not res['status']: print('>>> 重启Nginx失败:', res['msg']) return False return res.json()['status'] # 获取系统基本信息 def GetSystemInfo(self, showlog=True): ''' 获取系统基本信息 :showlog: 是否输出结果 :return: 系统基本信息 ''' if showlog: print('\n### 获取系统基本信息...') tk = self.__GetToken() # 获取签名 playload = { 'request_time': tk['request_time'], 'request_token': tk['request_token'] } res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json() # 判断请求是否成功 # 判断res是否存在status字段,若不存在则说明请求失败 if 'status' in res and 'msg' in res and not res['status']: print('>>> 获取系统基本信息失败:', res['msg']) return False res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100 # 内存使用率 if showlog: print('>>> 面板地址:', self.__BTURL) print('>>> 操作系统:', res['system']) print('>>> 面板版本:', res['version']) print('>>> 运行时间:', res['time']) print('>>> CPU使用率:', res['cpuRealUsed'], '%') print('>>> 内存使用率:', res['memUtilization'], '%') print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB') return res ```
kerem 2026-03-03 00:31:14 +03:00
Author
Owner

@usual2970 commented on GitHub (Nov 2, 2024):

感谢反馈~ 后续没有这方面的迭代计划

欢迎提交 PR

<!-- gh-comment-id:2452835729 --> @usual2970 commented on GitHub (Nov 2, 2024): 感谢反馈~ 后续没有这方面的迭代计划 欢迎提交 PR
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/certimate#72
No description provided.